Simplify determining logical replication worker types.
authorAmit Kapila <[email protected]>
Mon, 14 Aug 2023 03:08:03 +0000 (08:38 +0530)
committerAmit Kapila <[email protected]>
Mon, 14 Aug 2023 03:08:03 +0000 (08:38 +0530)
We deduce a LogicalRepWorker's type from the values of several different
fields ('relid' and 'leader_pid') whenever logic needs to know it.

In fact, the logical replication worker type is already known at the time
of launching the LogicalRepWorker and it never changes for the lifetime of
that process. Instead of deducing the type, it is simpler to just store it
one time, and access it directly thereafter.

Author: Peter Smith
Reviewed-by: Amit Kapila, Bharath Rupireddy
Discussion: http://postgr.es/m/CAHut+PttPSuP0yoZ=9zLDXKqTJ=d0bhxwKaEaNcaym1XqcvDEg@mail.gmail.com

src/backend/replication/logical/applyparallelworker.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/tablesync.c
src/include/replication/worker_internal.h
src/tools/pgindent/typedefs.list

index 1d4e83c4c1f59c24801b95bffde4628172d82e30..4e8ee2973e0a115caae16625df3376860d05ee7e 100644 (file)
@@ -435,7 +435,8 @@ pa_launch_parallel_worker(void)
        return NULL;
    }
 
-   launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+   launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
+                                       MyLogicalRepWorker->dbid,
                                        MySubscription->oid,
                                        MySubscription->name,
                                        MyLogicalRepWorker->userid,
index e231fa7f9516215c1054c247fd78f8f8f8211767..7cc0a16d3bc29ebd917b7761c3f15b29cd219923 100644 (file)
@@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running)
  * Returns true on success, false on failure.
  */
 bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+                        Oid dbid, Oid subid, const char *subname, Oid userid,
                         Oid relid, dsm_handle subworker_dsm)
 {
    BackgroundWorker bgw;
@@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
    int         nsyncworkers;
    int         nparallelapplyworkers;
    TimestampTz now;
-   bool        is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
-
-   /* Sanity check - tablesync worker cannot be a subworker */
-   Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+   bool        is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+   bool        is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+
+   /*----------
+    * Sanity checks:
+    * - must be valid worker type
+    * - tablesync workers are only ones to have relid
+    * - parallel apply worker is the only kind of subworker
+    */
+   Assert(wtype != WORKERTYPE_UNKNOWN);
+   Assert(is_tablesync_worker == OidIsValid(relid));
+   Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
 
    ereport(DEBUG1,
            (errmsg_internal("starting logical replication worker for subscription \"%s\"",
@@ -393,7 +402,7 @@ retry:
     * sync worker limit per subscription. So, just return silently as we
     * might get here because of an otherwise harmless race condition.
     */
-   if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+   if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
    {
        LWLockRelease(LogicalRepWorkerLock);
        return false;
@@ -427,6 +436,7 @@ retry:
    }
 
    /* Prepare the worker slot. */
+   worker->type = wtype;
    worker->launch_time = now;
    worker->in_use = true;
    worker->generation++;
@@ -466,7 +476,7 @@ retry:
                 subid);
        snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
    }
-   else if (OidIsValid(relid))
+   else if (is_tablesync_worker)
    {
        snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
        snprintf(bgw.bgw_name, BGW_MAXLEN,
@@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid)
    {
        LogicalRepWorker *w = &LogicalRepCtx->workers[i];
 
-       if (w->subid == subid && OidIsValid(w->relid))
+       if (w->subid == subid && isTablesyncWorker(w))
            res++;
    }
 
@@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg)
                (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
            {
                ApplyLauncherSetWorkerStartTime(sub->oid, now);
-               logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+               logicalrep_worker_launch(WORKERTYPE_APPLY,
+                                        sub->dbid, sub->oid, sub->name,
                                         sub->owner, InvalidOid,
                                         DSM_HANDLE_INVALID);
            }
@@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS)
        worker_pid = worker.proc->pid;
 
        values[0] = ObjectIdGetDatum(worker.subid);
-       if (OidIsValid(worker.relid))
+       if (isTablesyncWorker(&worker))
            values[1] = ObjectIdGetDatum(worker.relid);
        else
            nulls[1] = true;
index 651a7750653e90f0a914bd5b2e0b7ccc3c34c9a5..67bdd14095e2d15611d19a9757d7d21661b9ad8d 100644 (file)
@@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                        TimestampDifferenceExceeds(hentry->last_start_time, now,
                                                   wal_retrieve_retry_interval))
                    {
-                       logicalrep_worker_launch(MyLogicalRepWorker->dbid,
+                       logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+                                                MyLogicalRepWorker->dbid,
                                                 MySubscription->oid,
                                                 MySubscription->name,
                                                 MyLogicalRepWorker->userid,
index 672a7117c0c4f774a9227be7bcf7e041624e66a5..a428663859b04bb338c672ffc8f68e698b59ad8a 100644 (file)
 #include "storage/shm_toc.h"
 #include "storage/spin.h"
 
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+   WORKERTYPE_UNKNOWN = 0,
+   WORKERTYPE_TABLESYNC,
+   WORKERTYPE_APPLY,
+   WORKERTYPE_PARALLEL_APPLY
+} LogicalRepWorkerType;
 
 typedef struct LogicalRepWorker
 {
+   /* What type of worker is this? */
+   LogicalRepWorkerType type;
+
    /* Time at which this worker was launched. */
    TimestampTz launch_time;
 
@@ -232,7 +243,8 @@ extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                                bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
+                                    Oid dbid, Oid subid, const char *subname,
                                     Oid userid, Oid relid,
                                     dsm_handle subworker_dsm);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
@@ -315,19 +327,19 @@ extern void pa_decr_and_wait_stream_block(void);
 extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
                           XLogRecPtr remote_lsn);
 
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
 
 static inline bool
 am_tablesync_worker(void)
 {
-   return OidIsValid(MyLogicalRepWorker->relid);
+   return isTablesyncWorker(MyLogicalRepWorker);
 }
 
 static inline bool
 am_leader_apply_worker(void)
 {
-   return (!am_tablesync_worker() &&
-           !isParallelApplyWorker(MyLogicalRepWorker));
+   return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
 }
 
 static inline bool
index 66823bc2a771bf9c4dfab6a84362b131816f5f3f..52a8789cc4d64ce7635c91d6624c4746bba3acc7 100644 (file)
@@ -1500,6 +1500,7 @@ LogicalRepStreamAbortData
 LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
+LogicalRepWorkerType
 LogicalRewriteMappingData
 LogicalTape
 LogicalTapeSet