vacuumdb: enable parallel mode
authorAlvaro Herrera <[email protected]>
Fri, 23 Jan 2015 18:02:45 +0000 (15:02 -0300)
committerAlvaro Herrera <[email protected]>
Fri, 23 Jan 2015 18:02:45 +0000 (15:02 -0300)
This mode allows vacuumdb to open several server connections to vacuum
or analyze several tables simultaneously.

Author: Dilip Kumar.  Some reworking by Álvaro Herrera
Reviewed by: Jeff Janes, Amit Kapila, Magnus Hagander, Andres Freund

doc/src/sgml/ref/vacuumdb.sgml
src/bin/pg_dump/parallel.c
src/bin/scripts/common.c
src/bin/scripts/common.h
src/bin/scripts/vacuumdb.c

index 3ecd9999812080e41270540f402f2829227a193d..e38c34aea37890f57d8a49fd0f87de27715378e2 100644 (file)
@@ -203,6 +203,30 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>-j <replaceable class="parameter">njobs</replaceable></option></term>
+      <term><option>--jobs=<replaceable class="parameter">njobs</replaceable></option></term>
+      <listitem>
+       <para>
+        Execute the vacuum or analyze commands in parallel by running
+        <replaceable class="parameter">njobs</replaceable>
+        commands simultaneously.  This option reduces the time of the
+        processing but it also increases the load on the database server.
+       </para>
+       <para>
+        <application>vacuumdb</application> will open
+        <replaceable class="parameter">njobs</replaceable> connections to the
+        database, so make sure your <xref linkend="guc-max-connections">
+        setting is high enough to accommodate all connections.
+       </para>
+       <para>
+        Note that using this mode together with the <option>-f</option>
+        (<literal>FULL</literal>) option might cause deadlock failures if
+        certain system catalogs are processed in parallel.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>--analyze-in-stages</option></term>
       <listitem>
index d942a75f7cdf0d30279fd81b2f32c94afb9779b2..1bf76114c09159cb91412fb5f593cf8a8f7c28c1 100644 (file)
@@ -1160,7 +1160,7 @@ select_loop(int maxFd, fd_set *workerset)
                i = select(maxFd + 1, workerset, NULL, NULL, NULL);
 
                /*
-                * If we Ctrl-C the master process , it's likely that we interrupt
+                * If we Ctrl-C the master process, it's likely that we interrupt
                 * select() here. The signal handler will set wantAbort == true and
                 * the shutdown journey starts from here. Note that we'll come back
                 * here later when we tell all workers to terminate and read their
index 6bfe2e628b1a61bb4055951fd3196a9df9b7217e..da142aaa643eb19a9a9ca0d0ced3cacd6b7df4aa 100644 (file)
 
 #include "common.h"
 
-static void SetCancelConn(PGconn *conn);
-static void ResetCancelConn(void);
 
 static PGcancel *volatile cancelConn = NULL;
+bool CancelRequested = false;
 
 #ifdef WIN32
 static CRITICAL_SECTION cancelConnLock;
@@ -291,7 +290,7 @@ yesno_prompt(const char *question)
  *
  * Set cancelConn to point to the current database connection.
  */
-static void
+void
 SetCancelConn(PGconn *conn)
 {
        PGcancel   *oldCancelConn;
@@ -321,7 +320,7 @@ SetCancelConn(PGconn *conn)
  *
  * Free the current cancel connection, if any, and set to NULL.
  */
-static void
+void
 ResetCancelConn(void)
 {
        PGcancel   *oldCancelConn;
@@ -345,9 +344,8 @@ ResetCancelConn(void)
 
 #ifndef WIN32
 /*
- * Handle interrupt signals by canceling the current command,
- * if it's being executed through executeMaintenanceCommand(),
- * and thus has a cancelConn set.
+ * Handle interrupt signals by canceling the current command, if a cancelConn
+ * is set.
  */
 static void
 handle_sigint(SIGNAL_ARGS)
@@ -359,10 +357,15 @@ handle_sigint(SIGNAL_ARGS)
        if (cancelConn != NULL)
        {
                if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+               {
+                       CancelRequested = true;
                        fprintf(stderr, _("Cancel request sent\n"));
+               }
                else
                        fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
        }
+       else
+               CancelRequested = true;
 
        errno = save_errno;                     /* just in case the write changed it */
 }
@@ -392,10 +395,16 @@ consoleHandler(DWORD dwCtrlType)
                if (cancelConn != NULL)
                {
                        if (PQcancel(cancelConn, errbuf, sizeof(errbuf)))
+                       {
                                fprintf(stderr, _("Cancel request sent\n"));
+                               CancelRequested = true;
+                       }
                        else
                                fprintf(stderr, _("Could not send cancel request: %s"), errbuf);
                }
+               else
+                       CancelRequested = true;
+
                LeaveCriticalSection(&cancelConnLock);
 
                return TRUE;
index c0c1715bc16846335683b13f5326701acf2cf59b..b5ce1ed7444012921ec175ea2dbde00e7aebd432 100644 (file)
@@ -21,6 +21,8 @@ enum trivalue
        TRI_YES
 };
 
+extern bool CancelRequested;
+
 typedef void (*help_handler) (const char *progname);
 
 extern void handle_help_version_opts(int argc, char *argv[],
@@ -49,4 +51,8 @@ extern bool yesno_prompt(const char *question);
 
 extern void setup_cancel_handler(void);
 
+extern void SetCancelConn(PGconn *conn);
+extern void ResetCancelConn(void);
+
+
 #endif   /* COMMON_H */
index 957fdb6e189e1725f32de6cac725921d718792ae..506cdc7def27adbfa5a925b1becbfeca131eeefb 100644 (file)
  */
 
 #include "postgres_fe.h"
+
 #include "common.h"
 #include "dumputils.h"
 
 
-static void vacuum_one_database(const char *dbname, bool full, bool verbose,
-       bool and_analyze, bool analyze_only, bool analyze_in_stages, int stage, bool freeze,
-                                       const char *table, const char *host, const char *port,
+#define ERRCODE_UNDEFINED_TABLE  "42P01"
+
+/* Parallel vacuuming stuff */
+typedef struct ParallelSlot
+{
+       PGconn     *connection;
+       pgsocket        sock;
+       bool            isFree;
+} ParallelSlot;
+
+/* vacuum options controlled by user flags */
+typedef struct vacuumingOptions
+{
+       bool            analyze_only;
+       bool            verbose;
+       bool            and_analyze;
+       bool            full;
+       bool            freeze;
+} vacuumingOptions;
+
+
+static void vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
+                                       int stage,
+                                       SimpleStringList *tables,
+                                       const char *host, const char *port,
                                        const char *username, enum trivalue prompt_password,
+                                       int concurrentCons,
                                        const char *progname, bool echo, bool quiet);
-static void vacuum_all_databases(bool full, bool verbose, bool and_analyze,
-                                        bool analyze_only, bool analyze_in_stages, bool freeze,
+
+static void vacuum_all_databases(vacuumingOptions *vacopts,
+                                        bool analyze_in_stages,
                                         const char *maintenance_db,
                                         const char *host, const char *port,
                                         const char *username, enum trivalue prompt_password,
+                                        int concurrentCons,
                                         const char *progname, bool echo, bool quiet);
 
+static void prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
+                                          vacuumingOptions *vacopts, const char *table);
+
+static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
+                                  const char *dbname, const char *table,
+                                  const char *progname, bool async);
+
+static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
+                       const char *dbname, const char *progname);
+
+static bool GetQueryResult(PGconn *conn, const char *dbname,
+                          const char *progname);
+
+static void DisconnectDatabase(ParallelSlot *slot);
+
+static int     select_loop(int maxFd, fd_set *workerset, bool *aborting);
+
+static void init_slot(ParallelSlot *slot, PGconn *conn);
+
 static void help(const char *progname);
 
+/* For analyze-in-stages mode */
+#define ANALYZE_NO_STAGE       -1
+#define ANALYZE_NUM_STAGES     3
+
 
 int
 main(int argc, char *argv[])
@@ -49,6 +98,7 @@ main(int argc, char *argv[])
                {"table", required_argument, NULL, 't'},
                {"full", no_argument, NULL, 'f'},
                {"verbose", no_argument, NULL, 'v'},
+               {"jobs", required_argument, NULL, 'j'},
                {"maintenance-db", required_argument, NULL, 2},
                {"analyze-in-stages", no_argument, NULL, 3},
                {NULL, 0, NULL, 0}
@@ -57,7 +107,6 @@ main(int argc, char *argv[])
        const char *progname;
        int                     optindex;
        int                     c;
-
        const char *dbname = NULL;
        const char *maintenance_db = NULL;
        char       *host = NULL;
@@ -66,21 +115,23 @@ main(int argc, char *argv[])
        enum trivalue prompt_password = TRI_DEFAULT;
        bool            echo = false;
        bool            quiet = false;
-       bool            and_analyze = false;
-       bool            analyze_only = false;
+       vacuumingOptions vacopts;
        bool            analyze_in_stages = false;
-       bool            freeze = false;
        bool            alldb = false;
-       bool            full = false;
-       bool            verbose = false;
        SimpleStringList tables = {NULL, NULL};
+       int                     concurrentCons = 1;
+       int                     tbl_count = 0;
+
+       /* initialize options to all false */
+       memset(&vacopts, 0, sizeof(vacopts));
 
        progname = get_progname(argv[0]);
+
        set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
 
        handle_help_version_opts(argc, argv, "vacuumdb", help);
 
-       while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fv", long_options, &optindex)) != -1)
+       while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zZFat:fvj:", long_options, &optindex)) != -1)
        {
                switch (c)
                {
@@ -109,31 +160,49 @@ main(int argc, char *argv[])
                                dbname = pg_strdup(optarg);
                                break;
                        case 'z':
-                               and_analyze = true;
+                               vacopts.and_analyze = true;
                                break;
                        case 'Z':
-                               analyze_only = true;
+                               vacopts.analyze_only = true;
                                break;
                        case 'F':
-                               freeze = true;
+                               vacopts.freeze = true;
                                break;
                        case 'a':
                                alldb = true;
                                break;
                        case 't':
-                               simple_string_list_append(&tables, optarg);
-                               break;
+                               {
+                                       simple_string_list_append(&tables, optarg);
+                                       tbl_count++;
+                                       break;
+                               }
                        case 'f':
-                               full = true;
+                               vacopts.full = true;
                                break;
                        case 'v':
-                               verbose = true;
+                               vacopts.verbose = true;
+                               break;
+                       case 'j':
+                               concurrentCons = atoi(optarg);
+                               if (concurrentCons <= 0)
+                               {
+                                       fprintf(stderr, _("%s: number of parallel \"jobs\" must be at least 1\n"),
+                                                       progname);
+                                       exit(1);
+                               }
+                               if (concurrentCons > FD_SETSIZE - 1)
+                               {
+                                       fprintf(stderr, _("%s: too many parallel jobs requested (maximum: %d)\n"),
+                                                       progname, FD_SETSIZE - 1);
+                                       exit(1);
+                               }
                                break;
                        case 2:
                                maintenance_db = pg_strdup(optarg);
                                break;
                        case 3:
-                               analyze_in_stages = analyze_only = true;
+                               analyze_in_stages = vacopts.analyze_only = true;
                                break;
                        default:
                                fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
@@ -141,7 +210,6 @@ main(int argc, char *argv[])
                }
        }
 
-
        /*
         * Non-option argument specifies database name as long as it wasn't
         * already specified with -d / --dbname
@@ -160,18 +228,18 @@ main(int argc, char *argv[])
                exit(1);
        }
 
-       if (analyze_only)
+       if (vacopts.analyze_only)
        {
-               if (full)
+               if (vacopts.full)
                {
-                       fprintf(stderr, _("%s: cannot use the \"full\" option when performing only analyze\n"),
-                                       progname);
+                       fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
+                                       progname, "full");
                        exit(1);
                }
-               if (freeze)
+               if (vacopts.freeze)
                {
-                       fprintf(stderr, _("%s: cannot use the \"freeze\" option when performing only analyze\n"),
-                                       progname);
+                       fprintf(stderr, _("%s: cannot use the \"%s\" option when performing only analyze\n"),
+                                       progname, "freeze");
                        exit(1);
                }
                /* allow 'and_analyze' with 'analyze_only' */
@@ -179,6 +247,10 @@ main(int argc, char *argv[])
 
        setup_cancel_handler();
 
+       /* Avoid opening extra connections. */
+       if (tbl_count && (concurrentCons > tbl_count))
+               concurrentCons = tbl_count;
+
        if (alldb)
        {
                if (dbname)
@@ -194,9 +266,12 @@ main(int argc, char *argv[])
                        exit(1);
                }
 
-               vacuum_all_databases(full, verbose, and_analyze, analyze_only, analyze_in_stages, freeze,
-                                                        maintenance_db, host, port, username,
-                                                        prompt_password, progname, echo, quiet);
+               vacuum_all_databases(&vacopts,
+                                                        analyze_in_stages,
+                                                        maintenance_db,
+                                                        host, port, username, prompt_password,
+                                                        concurrentCons,
+                                                        progname, echo, quiet);
        }
        else
        {
@@ -210,213 +285,628 @@ main(int argc, char *argv[])
                                dbname = get_user_name_or_exit(progname);
                }
 
-               if (tables.head != NULL)
+               if (analyze_in_stages)
                {
-                       SimpleStringListCell *cell;
+                       int                     stage;
 
-                       for (cell = tables.head; cell; cell = cell->next)
+                       for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
                        {
-                               vacuum_one_database(dbname, full, verbose, and_analyze,
-                                                                       analyze_only, analyze_in_stages, -1,
-                                                                       freeze, cell->val,
+                               vacuum_one_database(dbname, &vacopts,
+                                                                       stage,
+                                                                       &tables,
                                                                        host, port, username, prompt_password,
+                                                                       concurrentCons,
                                                                        progname, echo, quiet);
                        }
                }
                else
-                       vacuum_one_database(dbname, full, verbose, and_analyze,
-                                                               analyze_only, analyze_in_stages, -1,
-                                                               freeze, NULL,
+                       vacuum_one_database(dbname, &vacopts,
+                                                               ANALYZE_NO_STAGE,
+                                                               &tables,
                                                                host, port, username, prompt_password,
+                                                               concurrentCons,
                                                                progname, echo, quiet);
        }
 
        exit(0);
 }
 
-
+/*
+ * vacuum_one_database
+ *
+ * Process tables in the given database.  If the 'tables' list is empty,
+ * process all tables in the database.
+ *
+ * Note that this function is only concerned with running exactly one stage
+ * when in analyze-in-stages mode; caller must iterate on us if necessary.
+ *
+ * If concurrentCons is > 1, multiple connections are used to vacuum tables
+ * in parallel.  In this case and if the table list is empty, we first obtain
+ * a list of tables from the database.
+ */
 static void
-run_vacuum_command(PGconn *conn, const char *sql, bool echo, const char *dbname, const char *table, const char *progname)
+vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
+                                       int stage,
+                                       SimpleStringList *tables,
+                                       const char *host, const char *port,
+                                       const char *username, enum trivalue prompt_password,
+                                       int concurrentCons,
+                                       const char *progname, bool echo, bool quiet)
 {
-       if (!executeMaintenanceCommand(conn, sql, echo))
+       PQExpBufferData sql;
+       PGconn     *conn;
+       SimpleStringListCell *cell;
+       ParallelSlot *slots = NULL;
+       SimpleStringList dbtables = {NULL, NULL};
+       int                     i;
+       bool            result = 0;
+       bool            parallel = concurrentCons > 1;
+       const char *stage_commands[] = {
+               "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
+               "SET default_statistics_target=10; RESET vacuum_cost_delay;",
+               "RESET default_statistics_target;"
+       };
+       const char *stage_messages[] = {
+               gettext_noop("Generating minimal optimizer statistics (1 target)"),
+               gettext_noop("Generating medium optimizer statistics (10 targets)"),
+               gettext_noop("Generating default (full) optimizer statistics")
+       };
+
+       Assert(stage == ANALYZE_NO_STAGE ||
+                  (stage >= 0 && stage < ANALYZE_NUM_STAGES));
+
+       if (!quiet)
        {
-               if (table)
-                       fprintf(stderr, _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
-                                       progname, table, dbname, PQerrorMessage(conn));
+               if (stage != ANALYZE_NO_STAGE)
+                       printf(_("%s: processing database \"%s\": %s\n"), progname, dbname,
+                                  stage_messages[stage]);
                else
-                       fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
-                                       progname, dbname, PQerrorMessage(conn));
-               PQfinish(conn);
-               exit(1);
+                       printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname);
+               fflush(stdout);
        }
-}
 
+       conn = connectDatabase(dbname, host, port, username, prompt_password,
+                                                  progname, false);
+
+       initPQExpBuffer(&sql);
+
+       /*
+        * If a table list is not provided and we're using multiple connections,
+        * prepare the list of tables by querying the catalogs.
+        */
+       if (parallel && (!tables || !tables->head))
+       {
+               PQExpBufferData buf;
+               PGresult   *res;
+               int                     ntups;
+               int                     i;
+
+               initPQExpBuffer(&buf);
 
+               res = executeQuery(conn,
+                       "SELECT c.relname, ns.nspname FROM pg_class c, pg_namespace ns\n"
+                        " WHERE relkind IN (\'r\', \'m\') AND c.relnamespace = ns.oid\n"
+                                                  " ORDER BY c.relpages DESC;",
+                                                  progname, echo);
+
+               ntups = PQntuples(res);
+               for (i = 0; i < ntups; i++)
+               {
+                       appendPQExpBuffer(&buf, "%s",
+                                                         fmtQualifiedId(PQserverVersion(conn),
+                                                                                        PQgetvalue(res, i, 1),
+                                                                                        PQgetvalue(res, i, 0)));
+
+                       simple_string_list_append(&dbtables, buf.data);
+                       resetPQExpBuffer(&buf);
+               }
+
+               termPQExpBuffer(&buf);
+               tables = &dbtables;
+
+               /*
+                * If there are more connections than vacuumable relations, we don't
+                * need to use them all.
+                */
+               if (concurrentCons > ntups)
+                       concurrentCons = ntups;
+               if (concurrentCons <= 1)
+                       parallel = false;
+       }
+
+       /*
+        * Setup the database connections. We reuse the connection we already have
+        * for the first slot.  If not in parallel mode, the first slot in the
+        * array contains the connection.
+        */
+       slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
+       init_slot(slots, conn);
+       if (parallel)
+       {
+               for (i = 1; i < concurrentCons; i++)
+               {
+                       conn = connectDatabase(dbname, host, port, username, prompt_password,
+                                                                  progname, false);
+                       init_slot(slots + i, conn);
+               }
+       }
+
+       /*
+        * Prepare all the connections to run the appropriate analyze stage, if
+        * caller requested that mode.
+        */
+       if (stage != ANALYZE_NO_STAGE)
+       {
+               int                     j;
+
+               /* We already emitted the message above */
+
+               for (j = 0; j < concurrentCons; j++)
+                       executeCommand((slots + j)->connection,
+                                                  stage_commands[stage], progname, echo);
+       }
+
+       cell = tables ? tables->head : NULL;
+       do
+       {
+               ParallelSlot *free_slot;
+               const char *tabname = cell ? cell->val : NULL;
+
+               prepare_vacuum_command(&sql, conn, vacopts, tabname);
+
+               if (CancelRequested)
+               {
+                       result = -1;
+                       goto finish;
+               }
+
+               /*
+                * Get the connection slot to use.  If in parallel mode, here we wait
+                * for one connection to become available if none already is.  In
+                * non-parallel mode we simply use the only slot we have, which we
+                * know to be free.
+                */
+               if (parallel)
+               {
+                       /*
+                        * Get a free slot, waiting until one becomes free if none
+                        * currently is.
+                        */
+                       free_slot = GetIdleSlot(slots, concurrentCons, dbname, progname);
+                       if (!free_slot)
+                       {
+                               result = -1;
+                               goto finish;
+                       }
+
+                       free_slot->isFree = false;
+               }
+               else
+                       free_slot = slots;
+
+               run_vacuum_command(free_slot->connection, sql.data,
+                                                  echo, dbname, tabname, progname, parallel);
+
+               if (cell)
+                       cell = cell->next;
+       } while (cell != NULL);
+
+       if (parallel)
+       {
+               int                     j;
+
+               for (j = 0; j < concurrentCons; j++)
+               {
+                       /* wait for all connection to return the results */
+                       if (!GetQueryResult((slots + j)->connection, dbname, progname))
+                               goto finish;
+
+                       (slots + j)->isFree = true;
+               }
+       }
+
+finish:
+       for (i = 0; i < concurrentCons; i++)
+               DisconnectDatabase(slots + i);
+       pfree(slots);
+
+       termPQExpBuffer(&sql);
+
+       if (result == -1)
+               exit(1);
+}
+
+/*
+ * Vacuum/analyze all connectable databases.
+ *
+ * In analyze-in-stages mode, we process all databases in one stage before
+ * moving on to the next stage.  That ensure minimal stats are available
+ * quickly everywhere before generating more detailed ones.
+ */
 static void
-vacuum_one_database(const char *dbname, bool full, bool verbose, bool and_analyze,
-       bool analyze_only, bool analyze_in_stages, int stage, bool freeze, const char *table,
-                                       const char *host, const char *port,
-                                       const char *username, enum trivalue prompt_password,
-                                       const char *progname, bool echo, bool quiet)
+vacuum_all_databases(vacuumingOptions *vacopts,
+                                        bool analyze_in_stages,
+                                        const char *maintenance_db, const char *host,
+                                        const char *port, const char *username,
+                                        enum trivalue prompt_password,
+                                        int concurrentCons,
+                                        const char *progname, bool echo, bool quiet)
 {
-       PQExpBufferData sql;
-
        PGconn     *conn;
+       PGresult   *result;
+       int                     stage;
+       int                     i;
 
-       initPQExpBuffer(&sql);
+       conn = connectMaintenanceDatabase(maintenance_db, host, port,
+                                                                         username, prompt_password, progname);
+       result = executeQuery(conn,
+                       "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;",
+                                                 progname, echo);
+       PQfinish(conn);
 
-       conn = connectDatabase(dbname, host, port, username, prompt_password,
-                                                  progname, false);
+       if (analyze_in_stages)
+       {
+               /*
+                * When analyzing all databases in stages, we analyze them all in the
+                * fastest stage first, so that initial statistics become available
+                * for all of them as soon as possible.
+                *
+                * This means we establish several times as many connections, but
+                * that's a secondary consideration.
+                */
+               for (stage = 0; stage < ANALYZE_NUM_STAGES; stage++)
+               {
+                       for (i = 0; i < PQntuples(result); i++)
+                       {
+                               const char *dbname;
+
+                               dbname = PQgetvalue(result, i, 0);
+                               vacuum_one_database(dbname, vacopts,
+                                                                       stage,
+                                                                       NULL,
+                                                                       host, port, username, prompt_password,
+                                                                       concurrentCons,
+                                                                       progname, echo, quiet);
+                       }
+               }
+       }
+       else
+       {
+               for (i = 0; i < PQntuples(result); i++)
+               {
+                       const char *dbname;
 
-       if (analyze_only)
+                       dbname = PQgetvalue(result, i, 0);
+                       vacuum_one_database(dbname, vacopts,
+                                                               ANALYZE_NO_STAGE,
+                                                               NULL,
+                                                               host, port, username, prompt_password,
+                                                               concurrentCons,
+                                                               progname, echo, quiet);
+               }
+       }
+
+       PQclear(result);
+}
+
+/*
+ * Construct a vacuum/analyze command to run based on the given options, in the
+ * given string buffer, which may contain previous garbage.
+ *
+ * An optional table name can be passed; this must be already be properly
+ * quoted.  The command is semicolon-terminated.
+ */
+static void
+prepare_vacuum_command(PQExpBuffer sql, PGconn *conn, vacuumingOptions *vacopts,
+                                          const char *table)
+{
+       resetPQExpBuffer(sql);
+
+       if (vacopts->analyze_only)
        {
-               appendPQExpBufferStr(&sql, "ANALYZE");
-               if (verbose)
-                       appendPQExpBufferStr(&sql, " VERBOSE");
+               appendPQExpBufferStr(sql, "ANALYZE");
+               if (vacopts->verbose)
+                       appendPQExpBufferStr(sql, " VERBOSE");
        }
        else
        {
-               appendPQExpBufferStr(&sql, "VACUUM");
+               appendPQExpBufferStr(sql, "VACUUM");
                if (PQserverVersion(conn) >= 90000)
                {
                        const char *paren = " (";
                        const char *comma = ", ";
                        const char *sep = paren;
 
-                       if (full)
+                       if (vacopts->full)
                        {
-                               appendPQExpBuffer(&sql, "%sFULL", sep);
+                               appendPQExpBuffer(sql, "%sFULL", sep);
                                sep = comma;
                        }
-                       if (freeze)
+                       if (vacopts->freeze)
                        {
-                               appendPQExpBuffer(&sql, "%sFREEZE", sep);
+                               appendPQExpBuffer(sql, "%sFREEZE", sep);
                                sep = comma;
                        }
-                       if (verbose)
+                       if (vacopts->verbose)
                        {
-                               appendPQExpBuffer(&sql, "%sVERBOSE", sep);
+                               appendPQExpBuffer(sql, "%sVERBOSE", sep);
                                sep = comma;
                        }
-                       if (and_analyze)
+                       if (vacopts->and_analyze)
                        {
-                               appendPQExpBuffer(&sql, "%sANALYZE", sep);
+                               appendPQExpBuffer(sql, "%sANALYZE", sep);
                                sep = comma;
                        }
                        if (sep != paren)
-                               appendPQExpBufferStr(&sql, ")");
+                               appendPQExpBufferStr(sql, ")");
                }
                else
                {
-                       if (full)
-                               appendPQExpBufferStr(&sql, " FULL");
-                       if (freeze)
-                               appendPQExpBufferStr(&sql, " FREEZE");
-                       if (verbose)
-                               appendPQExpBufferStr(&sql, " VERBOSE");
-                       if (and_analyze)
-                               appendPQExpBufferStr(&sql, " ANALYZE");
+                       if (vacopts->full)
+                               appendPQExpBufferStr(sql, " FULL");
+                       if (vacopts->freeze)
+                               appendPQExpBufferStr(sql, " FREEZE");
+                       if (vacopts->verbose)
+                               appendPQExpBufferStr(sql, " VERBOSE");
+                       if (vacopts->and_analyze)
+                               appendPQExpBufferStr(sql, " ANALYZE");
                }
        }
+
        if (table)
-               appendPQExpBuffer(&sql, " %s", table);
-       appendPQExpBufferStr(&sql, ";");
+               appendPQExpBuffer(sql, " %s", table);
+       appendPQExpBufferChar(sql, ';');
+}
 
-       if (analyze_in_stages)
+/*
+ * Execute a vacuum/analyze command to the server.
+ *
+ * Result status is checked only if 'async' is false.
+ */
+static void
+run_vacuum_command(PGconn *conn, const char *sql, bool echo,
+                                  const char *dbname, const char *table,
+                                  const char *progname, bool async)
+{
+       if (async)
+       {
+               if (echo)
+                       printf("%s\n", sql);
+
+               PQsendQuery(conn, sql);
+       }
+       else if (!executeMaintenanceCommand(conn, sql, echo))
+       {
+               if (table)
+                       fprintf(stderr,
+                       _("%s: vacuuming of table \"%s\" in database \"%s\" failed: %s"),
+                                       progname, table, dbname, PQerrorMessage(conn));
+               else
+                       fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+                                       progname, dbname, PQerrorMessage(conn));
+               PQfinish(conn);
+               exit(1);
+       }
+}
+
+/*
+ * GetIdleSlot
+ *             Return a connection slot that is ready to execute a command.
+ *
+ * We return the first slot we find that is marked isFree, if one is;
+ * otherwise, we loop on select() until one socket becomes available.  When
+ * this happens, we read the whole set and mark as free all sockets that become
+ * available.
+ *
+ * Process the slot list, if any free slot is available then return the slotid
+ * else perform the select on all the socket's and wait until at least one slot
+ * becomes available.
+ *
+ * If an error occurs, NULL is returned.
+ */
+static ParallelSlot *
+GetIdleSlot(ParallelSlot slots[], int numslots, const char *dbname,
+                       const char *progname)
+{
+       int                     i;
+       int                     firstFree = -1;
+       fd_set          slotset;
+       pgsocket        maxFd;
+
+       for (i = 0; i < numslots; i++)
+               if ((slots + i)->isFree)
+                       return slots + i;
+
+       FD_ZERO(&slotset);
+
+       maxFd = slots->sock;
+       for (i = 0; i < numslots; i++)
+       {
+               FD_SET((slots + i)->sock, &slotset);
+               if ((slots + i)->sock > maxFd)
+                       maxFd = (slots + i)->sock;
+       }
+
+       /*
+        * No free slot found, so wait until one of the connections has finished
+        * its task and return the available slot.
+        */
+       for (firstFree = -1; firstFree < 0;)
        {
-               const char *stage_commands[] = {
-                       "SET default_statistics_target=1; SET vacuum_cost_delay=0;",
-                       "SET default_statistics_target=10; RESET vacuum_cost_delay;",
-                       "RESET default_statistics_target;"
-               };
-               const char *stage_messages[] = {
-                       gettext_noop("Generating minimal optimizer statistics (1 target)"),
-                       gettext_noop("Generating medium optimizer statistics (10 targets)"),
-                       gettext_noop("Generating default (full) optimizer statistics")
-               };
-
-               if (stage == -1)
+               bool            aborting;
+
+               SetCancelConn(slots->connection);
+               i = select_loop(maxFd, &slotset, &aborting);
+               ResetCancelConn();
+
+               if (aborting)
                {
-                       int             i;
+                       /*
+                        * We set the cancel-receiving connection to the one in the zeroth
+                        * slot above, so fetch the error from there.
+                        */
+                       GetQueryResult(slots->connection, dbname, progname);
+                       return NULL;
+               }
+               Assert(i != 0);
 
-                       /* Run all stages. */
-                       for (i = 0; i < 3; i++)
-                       {
-                               if (!quiet)
-                               {
-                                       puts(gettext(stage_messages[i]));
-                                       fflush(stdout);
-                               }
-                               executeCommand(conn, stage_commands[i], progname, echo);
-                               run_vacuum_command(conn, sql.data, echo, dbname, table, progname);
-                       }
+               for (i = 0; i < numslots; i++)
+               {
+                       if (!FD_ISSET((slots + i)->sock, &slotset))
+                               continue;
+
+                       PQconsumeInput((slots + i)->connection);
+                       if (PQisBusy((slots + i)->connection))
+                               continue;
+
+                       (slots + i)->isFree = true;
+
+                       if (!GetQueryResult((slots + i)->connection, dbname, progname))
+                               return NULL;
+
+                       if (firstFree < 0)
+                               firstFree = i;
                }
-               else
+       }
+
+       return slots + firstFree;
+}
+
+/*
+ * GetQueryResult
+ *
+ * Process the query result.  Returns true if there's no error, false
+ * otherwise -- but errors about trying to vacuum a missing relation are
+ * reported and subsequently ignored.
+ */
+static bool
+GetQueryResult(PGconn *conn, const char *dbname, const char *progname)
+{
+       PGresult   *result;
+
+       SetCancelConn(conn);
+       while ((result = PQgetResult(conn)) != NULL)
+       {
+               /*
+                * If errors are found, report them.  Errors about a missing table are
+                * harmless so we continue processing; but die for other errors.
+                */
+               if (PQresultStatus(result) != PGRES_COMMAND_OK)
                {
-                       /* Otherwise, we got a stage from vacuum_all_databases(), so run
-                        * only that one. */
-                       if (!quiet)
+                       char       *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
+
+                       fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
+                                       progname, dbname, PQerrorMessage(conn));
+
+                       if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
                        {
-                               puts(gettext(stage_messages[stage]));
-                               fflush(stdout);
+                               PQclear(result);
+                               return false;
                        }
-                       executeCommand(conn, stage_commands[stage], progname, echo);
-                       run_vacuum_command(conn, sql.data, echo, dbname, table, progname);
                }
 
+               PQclear(result);
        }
-       else
-               run_vacuum_command(conn, sql.data, echo, dbname, NULL, progname);
+       ResetCancelConn();
 
-       PQfinish(conn);
-       termPQExpBuffer(&sql);
+       return true;
 }
 
-
+/*
+ * DisconnectDatabase
+ *             Disconnect the connection associated with the given slot
+ */
 static void
-vacuum_all_databases(bool full, bool verbose, bool and_analyze, bool analyze_only,
-                        bool analyze_in_stages, bool freeze, const char *maintenance_db,
-                                        const char *host, const char *port,
-                                        const char *username, enum trivalue prompt_password,
-                                        const char *progname, bool echo, bool quiet)
+DisconnectDatabase(ParallelSlot *slot)
 {
-       PGconn     *conn;
-       PGresult   *result;
-       int                     stage;
+       char            errbuf[256];
 
-       conn = connectMaintenanceDatabase(maintenance_db, host, port,
-                                                                         username, prompt_password, progname);
-       result = executeQuery(conn, "SELECT datname FROM pg_database WHERE datallowconn ORDER BY 1;", progname, echo);
-       PQfinish(conn);
+       if (!slot->connection)
+               return;
 
-       /* If analyzing in stages, then run through all stages.  Otherwise just
-        * run once, passing -1 as the stage. */
-       for (stage = (analyze_in_stages ? 0 : -1);
-                stage < (analyze_in_stages ? 3 : 0);
-                stage++)
+       if (PQtransactionStatus(slot->connection) == PQTRANS_ACTIVE)
        {
-               int                     i;
+               PGcancel   *cancel;
 
-               for (i = 0; i < PQntuples(result); i++)
+               if ((cancel = PQgetCancel(slot->connection)))
                {
-                       char       *dbname = PQgetvalue(result, i, 0);
+                       PQcancel(cancel, errbuf, sizeof(errbuf));
+                       PQfreeCancel(cancel);
+               }
+       }
 
-                       if (!quiet)
-                       {
-                               printf(_("%s: vacuuming database \"%s\"\n"), progname, dbname);
-                               fflush(stdout);
-                       }
+       PQfinish(slot->connection);
+       slot->connection = NULL;
+}
 
-                       vacuum_one_database(dbname, full, verbose, and_analyze, analyze_only,
-                                                               analyze_in_stages, stage,
-                                                       freeze, NULL, host, port, username, prompt_password,
-                                                               progname, echo, quiet);
+/*
+ * Loop on select() until a descriptor from the given set becomes readable.
+ *
+ * If we get a cancel request while we're waiting, we forego all further
+ * processing and set the *aborting flag to true.  The return value must be
+ * ignored in this case.  Otherwise, *aborting is set to false.
+ */
+static int
+select_loop(int maxFd, fd_set *workerset, bool *aborting)
+{
+       int                     i;
+       fd_set          saveSet = *workerset;
+
+       if (CancelRequested)
+       {
+               *aborting = true;
+               return -1;
+       }
+       else
+               *aborting = false;
+
+       for (;;)
+       {
+               /*
+                * On Windows, we need to check once in a while for cancel requests;
+                * on other platforms we rely on select() returning when interrupted.
+                */
+               struct timeval *tvp;
+#ifdef WIN32
+               struct timeval tv = {0, 1000000};
+
+               tvp = &tv;
+#else
+               tvp = NULL;
+#endif
+
+               *workerset = saveSet;
+               i = select(maxFd + 1, workerset, NULL, NULL, tvp);
+
+#ifdef WIN32
+               if (i == SOCKET_ERROR)
+               {
+                       i = -1;
+
+                       if (WSAGetLastError() == WSAEINTR)
+                               errno == EINTR;
                }
+#endif
+
+               if (i < 0 && errno == EINTR)
+                       continue;                       /* ignore this */
+               if (i < 0 || CancelRequested)
+                       *aborting = true;       /* but not this */
+               if (i == 0)
+                       continue;                       /* timeout (Win32 only) */
+               break;
        }
 
-       PQclear(result);
+       return i;
 }
 
+static void
+init_slot(ParallelSlot *slot, PGconn *conn)
+{
+       slot->connection = conn;
+       slot->isFree = true;
+       slot->sock = PQsocket(conn);
+}
 
 static void
 help(const char *progname)
@@ -436,6 +926,7 @@ help(const char *progname)
        printf(_("  -V, --version                   output version information, then exit\n"));
        printf(_("  -z, --analyze                   update optimizer statistics\n"));
        printf(_("  -Z, --analyze-only              only update optimizer statistics\n"));
+       printf(_("  -j, --jobs=NUM                  use this many concurrent connections to vacuum\n"));
        printf(_("      --analyze-in-stages         only update optimizer statistics, in multiple\n"
                   "                                  stages for faster results\n"));
        printf(_("  -?, --help                      show this help, then exit\n"));