* Returns true on success, false on failure.
*/
bool
-logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid,
+logicalrep_worker_launch(LogicalRepWorkerType wtype,
+ Oid dbid, Oid subid, const char *subname, Oid userid,
Oid relid, dsm_handle subworker_dsm)
{
BackgroundWorker bgw;
int nsyncworkers;
int nparallelapplyworkers;
TimestampTz now;
- bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID);
-
- /* Sanity check - tablesync worker cannot be a subworker */
- Assert(!(is_parallel_apply_worker && OidIsValid(relid)));
+ bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC);
+ bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY);
+
+ /*----------
+ * Sanity checks:
+ * - must be valid worker type
+ * - tablesync workers are only ones to have relid
+ * - parallel apply worker is the only kind of subworker
+ */
+ Assert(wtype != WORKERTYPE_UNKNOWN);
+ Assert(is_tablesync_worker == OidIsValid(relid));
+ Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
ereport(DEBUG1,
(errmsg_internal("starting logical replication worker for subscription \"%s\"",
* sync worker limit per subscription. So, just return silently as we
* might get here because of an otherwise harmless race condition.
*/
- if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription)
+ if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription)
{
LWLockRelease(LogicalRepWorkerLock);
return false;
}
/* Prepare the worker slot. */
+ worker->type = wtype;
worker->launch_time = now;
worker->in_use = true;
worker->generation++;
subid);
snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker");
}
- else if (OidIsValid(relid))
+ else if (is_tablesync_worker)
{
snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain");
snprintf(bgw.bgw_name, BGW_MAXLEN,
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
- if (w->subid == subid && OidIsValid(w->relid))
+ if (w->subid == subid && isTablesyncWorker(w))
res++;
}
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
{
ApplyLauncherSetWorkerStartTime(sub->oid, now);
- logicalrep_worker_launch(sub->dbid, sub->oid, sub->name,
+ logicalrep_worker_launch(WORKERTYPE_APPLY,
+ sub->dbid, sub->oid, sub->name,
sub->owner, InvalidOid,
DSM_HANDLE_INVALID);
}
worker_pid = worker.proc->pid;
values[0] = ObjectIdGetDatum(worker.subid);
- if (OidIsValid(worker.relid))
+ if (isTablesyncWorker(&worker))
values[1] = ObjectIdGetDatum(worker.relid);
else
nulls[1] = true;
#include "storage/shm_toc.h"
#include "storage/spin.h"
+/* Different types of worker */
+typedef enum LogicalRepWorkerType
+{
+ WORKERTYPE_UNKNOWN = 0,
+ WORKERTYPE_TABLESYNC,
+ WORKERTYPE_APPLY,
+ WORKERTYPE_PARALLEL_APPLY
+} LogicalRepWorkerType;
typedef struct LogicalRepWorker
{
+ /* What type of worker is this? */
+ LogicalRepWorkerType type;
+
/* Time at which this worker was launched. */
TimestampTz launch_time;
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running);
-extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
+extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
+ Oid dbid, Oid subid, const char *subname,
Oid userid, Oid relid,
dsm_handle subworker_dsm);
extern void logicalrep_worker_stop(Oid subid, Oid relid);
extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
XLogRecPtr remote_lsn);
-#define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
+#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY)
+#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC)
static inline bool
am_tablesync_worker(void)
{
- return OidIsValid(MyLogicalRepWorker->relid);
+ return isTablesyncWorker(MyLogicalRepWorker);
}
static inline bool
am_leader_apply_worker(void)
{
- return (!am_tablesync_worker() &&
- !isParallelApplyWorker(MyLogicalRepWorker));
+ return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
}
static inline bool