#define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2
#define PARALLEL_VACUUM_KEY_QUERY_TEXT 3
#define PARALLEL_VACUUM_KEY_BUFFER_USAGE 4
+#define PARALLEL_VACUUM_KEY_WAL_USAGE 5
/*
* Macro to check if we are in a parallel vacuum. If true, we are in the
/* Points to buffer usage area in DSM */
BufferUsage *buffer_usage;
+ /* Points to WAL usage area in DSM */
+ WalUsage *wal_usage;
+
/*
* The number of indexes that support parallel index bulk-deletion and
* parallel index cleanup respectively.
vacrelstats->dead_tuples, nindexes, vacrelstats);
/*
- * Next, accumulate buffer usage. (This must wait for the workers to
- * finish, or we might get incomplete data.)
+ * Next, accumulate buffer and WAL usage. (This must wait for the workers
+ * to finish, or we might get incomplete data.)
*/
if (nworkers > 0)
{
WaitForParallelWorkersToFinish(lps->pcxt);
for (i = 0; i < lps->pcxt->nworkers_launched; i++)
- InstrAccumParallelQuery(&lps->buffer_usage[i]);
+ InstrAccumParallelQuery(&lps->buffer_usage[i], &lps->wal_usage[i]);
}
/*
LVShared *shared;
LVDeadTuples *dead_tuples;
BufferUsage *buffer_usage;
+ WalUsage *wal_usage;
bool *can_parallel_vacuum;
long maxtuples;
char *sharedquery;
shm_toc_estimate_keys(&pcxt->estimator, 1);
/*
- * Estimate space for BufferUsage -- PARALLEL_VACUUM_KEY_BUFFER_USAGE.
+ * Estimate space for BufferUsage and WalUsage --
+ * PARALLEL_VACUUM_KEY_BUFFER_USAGE and PARALLEL_VACUUM_KEY_WAL_USAGE.
*
* If there are no extensions loaded that care, we could skip this. We
- * have no way of knowing whether anyone's looking at pgBufferUsage, so do
- * it unconditionally.
+ * have no way of knowing whether anyone's looking at pgBufferUsage or
+ * pgWalUsage, so do it unconditionally.
*/
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
/* Finally, estimate PARALLEL_VACUUM_KEY_QUERY_TEXT space */
querylen = strlen(debug_query_string);
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_DEAD_TUPLES, dead_tuples);
vacrelstats->dead_tuples = dead_tuples;
- /* Allocate space for each worker's BufferUsage; no need to initialize */
+ /*
+ * Allocate space for each worker's BufferUsage and WalUsage; no need to
+ * initialize
+ */
buffer_usage = shm_toc_allocate(pcxt->toc,
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, buffer_usage);
lps->buffer_usage = buffer_usage;
+ wal_usage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_WAL_USAGE, wal_usage);
+ lps->wal_usage = wal_usage;
/* Store query string for workers */
sharedquery = (char *) shm_toc_allocate(pcxt->toc, querylen + 1);
LVShared *lvshared;
LVDeadTuples *dead_tuples;
BufferUsage *buffer_usage;
+ WalUsage *wal_usage;
int nindexes;
char *sharedquery;
IndexBulkDeleteResult **stats;
parallel_vacuum_index(indrels, stats, lvshared, dead_tuples, nindexes,
&vacrelstats);
- /* Report buffer usage during parallel execution */
+ /* Report buffer/WAL usage during parallel execution */
buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false);
- InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
+ wal_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_WAL_USAGE, false);
+ InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
+ &wal_usage[ParallelWorkerNumber]);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
#include "access/xloginsert.h"
#include "catalog/index.h"
#include "commands/progress.h"
+#include "executor/instrument.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/smgr.h"
#define PARALLEL_KEY_TUPLESORT UINT64CONST(0xA000000000000002)
#define PARALLEL_KEY_TUPLESORT_SPOOL2 UINT64CONST(0xA000000000000003)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xA000000000000004)
+#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xA000000000000005)
/*
* DISABLE_LEADER_PARTICIPATION disables the leader's participation in
Sharedsort *sharedsort;
Sharedsort *sharedsort2;
Snapshot snapshot;
+ WalUsage *walusage;
} BTLeader;
/*
Sharedsort *sharedsort2;
BTSpool *btspool = buildstate->spool;
BTLeader *btleader = (BTLeader *) palloc0(sizeof(BTLeader));
+ WalUsage *walusage;
bool leaderparticipates = true;
char *sharedquery;
int querylen;
shm_toc_estimate_keys(&pcxt->estimator, 3);
}
+ /*
+ * Estimate space for WalUsage -- PARALLEL_KEY_WAL_USAGE
+ *
+ * WalUsage during execution of maintenance command can be used by an
+ * extension that reports the WAL usage, such as pg_stat_statements. We
+ * have no way of knowing whether anyone's looking at pgWalUsage, so do it
+ * unconditionally.
+ */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Finally, estimate PARALLEL_KEY_QUERY_TEXT space */
querylen = strlen(debug_query_string);
shm_toc_estimate_chunk(&pcxt->estimator, querylen + 1);
memcpy(sharedquery, debug_query_string, querylen + 1);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_QUERY_TEXT, sharedquery);
+ /* Allocate space for each worker's WalUsage; no need to initialize */
+ walusage = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage);
+
/* Launch workers, saving status for leader/caller */
LaunchParallelWorkers(pcxt);
btleader->pcxt = pcxt;
btleader->sharedsort = sharedsort;
btleader->sharedsort2 = sharedsort2;
btleader->snapshot = snapshot;
+ btleader->walusage = walusage;
/* If no workers were successfully launched, back out (do serial build) */
if (pcxt->nworkers_launched == 0)
static void
_bt_end_parallel(BTLeader *btleader)
{
+ int i;
+
/* Shutdown worker processes */
WaitForParallelWorkersToFinish(btleader->pcxt);
+
+ /*
+ * Next, accumulate WAL usage. (This must wait for the workers to finish,
+ * or we might get incomplete data.)
+ */
+ for (i = 0; i < btleader->pcxt->nworkers_launched; i++)
+ InstrAccumParallelQuery(NULL, &btleader->walusage[i]);
+
/* Free last reference to MVCC snapshot, if one was used */
if (IsMVCCSnapshot(btleader->snapshot))
UnregisterSnapshot(btleader->snapshot);
Relation indexRel;
LOCKMODE heapLockmode;
LOCKMODE indexLockmode;
+ WalUsage *walusage;
int sortmem;
#ifdef BTREE_BUILD_STATS
tuplesort_attach_shared(sharedsort2, seg);
}
+ /* Prepare to track buffer usage during parallel execution */
+ InstrStartParallelQuery();
+
/* Perform sorting of spool, and possibly a spool2 */
sortmem = maintenance_work_mem / btshared->scantuplesortstates;
_bt_parallel_scan_and_sort(btspool, btspool2, btshared, sharedsort,
sharedsort2, sortmem, false);
+ /* Report WAL usage during parallel execution */
+ walusage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+ InstrEndParallelQuery(NULL, &walusage[ParallelWorkerNumber]);
+
#ifdef BTREE_BUILD_STATS
if (log_btree_build_stats)
{
#include "commands/progress.h"
#include "commands/tablespace.h"
#include "common/controldata_utils.h"
+#include "executor/instrument.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
XLogRecPtr
XLogInsertRecord(XLogRecData *rdata,
XLogRecPtr fpw_lsn,
- uint8 flags)
+ uint8 flags,
+ int num_fpw)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
pg_crc32c rdata_crc;
ProcLastRecPtr = StartPos;
XactLastRecEnd = EndPos;
+ /* Report WAL traffic to the instrumentation. */
+ if (inserted)
+ {
+ pgWalUsage.wal_bytes += rechdr->xl_tot_len;
+ pgWalUsage.wal_records++;
+ pgWalUsage.wal_num_fpw += num_fpw;
+ }
+
return EndPos;
}
#include "access/xloginsert.h"
#include "catalog/pg_control.h"
#include "common/pg_lzcompress.h"
+#include "executor/instrument.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "replication/origin.h"
static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
- XLogRecPtr *fpw_lsn);
+ XLogRecPtr *fpw_lsn, int *num_fpw);
static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
uint16 hole_length, char *dest, uint16 *dlen);
bool doPageWrites;
XLogRecPtr fpw_lsn;
XLogRecData *rdt;
+ int num_fpw = 0;
/*
* Get values needed to decide whether to do full-page writes. Since
GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
- &fpw_lsn);
+ &fpw_lsn, &num_fpw);
- EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
+ EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpw);
} while (EndPos == InvalidXLogRecPtr);
XLogResetInsertion();
static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
- XLogRecPtr *fpw_lsn)
+ XLogRecPtr *fpw_lsn, int *num_fpw)
{
XLogRecData *rdt;
uint32 total_len = 0;
*/
bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
+ /* Report a full page image constructed for the WAL record */
+ *num_fpw += 1;
+
/*
* Construct XLogRecData entries for the page content.
*/
* workers and ensuring that their state generally matches that of the
* leader; see src/backend/access/transam/README.parallel for details.
* However, we must save and restore relevant executor state, such as
- * any ParamListInfo associated with the query, buffer usage info, and
+ * any ParamListInfo associated with the query, buffer/WAL usage info, and
* the actual plan to be passed down to the worker.
*
* IDENTIFICATION
#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009)
+#define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
char *pstmt_space;
char *paramlistinfo_space;
BufferUsage *bufusage_space;
+ WalUsage *walusage_space;
SharedExecutorInstrumentation *instrumentation = NULL;
SharedJitInstrumentation *jit_instrumentation = NULL;
int pstmt_len;
mul_size(sizeof(BufferUsage), pcxt->nworkers));
shm_toc_estimate_keys(&pcxt->estimator, 1);
+ /*
+ * Same thing for WalUsage.
+ */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Estimate space for tuple queues. */
shm_toc_estimate_chunk(&pcxt->estimator,
mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers));
shm_toc_insert(pcxt->toc, PARALLEL_KEY_BUFFER_USAGE, bufusage_space);
pei->buffer_usage = bufusage_space;
+ /* Same for WalUsage. */
+ walusage_space = shm_toc_allocate(pcxt->toc,
+ mul_size(sizeof(WalUsage), pcxt->nworkers));
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_WAL_USAGE, walusage_space);
+ pei->wal_usage = walusage_space;
+
/* Set up the tuple queues that the workers will write into. */
pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false);
/*
* Finish parallel execution. We wait for parallel workers to finish, and
- * accumulate their buffer usage.
+ * accumulate their buffer/WAL usage.
*/
void
ExecParallelFinish(ParallelExecutorInfo *pei)
WaitForParallelWorkersToFinish(pei->pcxt);
/*
- * Next, accumulate buffer usage. (This must wait for the workers to
+ * Next, accumulate buffer/WAL usage. (This must wait for the workers to
* finish, or we might get incomplete data.)
*/
for (i = 0; i < nworkers; i++)
- InstrAccumParallelQuery(&pei->buffer_usage[i]);
+ InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]);
pei->finished = true;
}
{
FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage;
+ WalUsage *wal_usage;
DestReceiver *receiver;
QueryDesc *queryDesc;
SharedExecutorInstrumentation *instrumentation;
ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
/*
- * Prepare to track buffer usage during query execution.
+ * Prepare to track buffer/WAL usage during query execution.
*
* We do this after starting up the executor to match what happens in the
- * leader, which also doesn't count buffer accesses that occur during
- * executor startup.
+ * leader, which also doesn't count buffer accesses and WAL activity that
+ * occur during executor startup.
*/
InstrStartParallelQuery();
/* Shut down the executor */
ExecutorFinish(queryDesc);
- /* Report buffer usage during parallel execution. */
+ /* Report buffer/WAL usage during parallel execution. */
buffer_usage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);
- InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber]);
+ wal_usage = shm_toc_lookup(toc, PARALLEL_KEY_WAL_USAGE, false);
+ InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber],
+ &wal_usage[ParallelWorkerNumber]);
/* Report instrumentation data if any instrumentation options are set. */
if (instrumentation != NULL)
BufferUsage pgBufferUsage;
static BufferUsage save_pgBufferUsage;
+WalUsage pgWalUsage;
+static WalUsage save_pgWalUsage;
static void BufferUsageAdd(BufferUsage *dst, const BufferUsage *add);
+static void WalUsageAdd(WalUsage *dst, WalUsage *add);
/* Allocate new instrumentation structure(s) */
/* initialize all fields to zeroes, then modify as needed */
instr = palloc0(n * sizeof(Instrumentation));
- if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER))
+ if (instrument_options & (INSTRUMENT_BUFFERS | INSTRUMENT_TIMER | INSTRUMENT_WAL))
{
bool need_buffers = (instrument_options & INSTRUMENT_BUFFERS) != 0;
+ bool need_wal = (instrument_options & INSTRUMENT_WAL) != 0;
bool need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
int i;
for (i = 0; i < n; i++)
{
instr[i].need_bufusage = need_buffers;
+ instr[i].need_walusage = need_wal;
instr[i].need_timer = need_timer;
}
}
{
memset(instr, 0, sizeof(Instrumentation));
instr->need_bufusage = (instrument_options & INSTRUMENT_BUFFERS) != 0;
+ instr->need_walusage = (instrument_options & INSTRUMENT_WAL) != 0;
instr->need_timer = (instrument_options & INSTRUMENT_TIMER) != 0;
}
/* save buffer usage totals at node entry, if needed */
if (instr->need_bufusage)
instr->bufusage_start = pgBufferUsage;
+
+ if (instr->need_walusage)
+ instr->walusage_start = pgWalUsage;
}
/* Exit from a plan node */
BufferUsageAccumDiff(&instr->bufusage,
&pgBufferUsage, &instr->bufusage_start);
+ if (instr->need_walusage)
+ WalUsageAccumDiff(&instr->walusage,
+ &pgWalUsage, &instr->walusage_start);
+
/* Is this the first tuple of this cycle? */
if (!instr->running)
{
/* Add delta of buffer usage since entry to node's totals */
if (dst->need_bufusage)
BufferUsageAdd(&dst->bufusage, &add->bufusage);
+
+ if (dst->need_walusage)
+ WalUsageAdd(&dst->walusage, &add->walusage);
}
/* note current values during parallel executor startup */
InstrStartParallelQuery(void)
{
save_pgBufferUsage = pgBufferUsage;
+ save_pgWalUsage = pgWalUsage;
}
/* report usage after parallel executor shutdown */
void
-InstrEndParallelQuery(BufferUsage *result)
+InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
{
- memset(result, 0, sizeof(BufferUsage));
- BufferUsageAccumDiff(result, &pgBufferUsage, &save_pgBufferUsage);
+ if (bufusage)
+ {
+ memset(bufusage, 0, sizeof(BufferUsage));
+ BufferUsageAccumDiff(bufusage, &pgBufferUsage, &save_pgBufferUsage);
+ }
+ memset(walusage, 0, sizeof(WalUsage));
+ WalUsageAccumDiff(walusage, &pgWalUsage, &save_pgWalUsage);
}
/* accumulate work done by workers in leader's stats */
void
-InstrAccumParallelQuery(BufferUsage *result)
+InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage)
{
- BufferUsageAdd(&pgBufferUsage, result);
+ if (bufusage)
+ BufferUsageAdd(&pgBufferUsage, bufusage);
+ WalUsageAdd(&pgWalUsage, walusage);
}
/* dst += add */
INSTR_TIME_ACCUM_DIFF(dst->blk_write_time,
add->blk_write_time, sub->blk_write_time);
}
+
+/* helper functions for WAL usage accumulation */
+static void
+WalUsageAdd(WalUsage *dst, WalUsage *add)
+{
+ dst->wal_bytes += add->wal_bytes;
+ dst->wal_records += add->wal_records;
+ dst->wal_num_fpw += add->wal_num_fpw;
+}
+
+void
+WalUsageAccumDiff(WalUsage *dst, const WalUsage *add, const WalUsage *sub)
+{
+ dst->wal_bytes += add->wal_bytes - sub->wal_bytes;
+ dst->wal_records += add->wal_records - sub->wal_records;
+ dst->wal_num_fpw += add->wal_num_fpw - sub->wal_num_fpw;
+}
extern XLogRecPtr XLogInsertRecord(struct XLogRecData *rdata,
XLogRecPtr fpw_lsn,
- uint8 flags);
+ uint8 flags,
+ int num_fpw);
extern void XLogFlush(XLogRecPtr RecPtr);
extern bool XLogBackgroundFlush(void);
extern bool XLogNeedsFlush(XLogRecPtr RecPtr);
PlanState *planstate; /* plan subtree we're running in parallel */
ParallelContext *pcxt; /* parallel context we're using */
BufferUsage *buffer_usage; /* points to bufusage area in DSM */
+ WalUsage *wal_usage; /* walusage area in DSM */
SharedExecutorInstrumentation *instrumentation; /* optional */
struct SharedJitInstrumentation *jit_instrumentation; /* optional */
dsa_area *area; /* points to DSA area in DSM */
instr_time blk_write_time; /* time spent writing */
} BufferUsage;
+typedef struct WalUsage
+{
+ long wal_records; /* # of WAL records produced */
+ long wal_num_fpw; /* # of WAL full page image writes produced */
+ uint64 wal_bytes; /* size of WAL records produced */
+} WalUsage;
+
/* Flag bits included in InstrAlloc's instrument_options bitmask */
typedef enum InstrumentOption
{
INSTRUMENT_TIMER = 1 << 0, /* needs timer (and row counts) */
INSTRUMENT_BUFFERS = 1 << 1, /* needs buffer usage */
INSTRUMENT_ROWS = 1 << 2, /* needs row count */
+ INSTRUMENT_WAL = 1 << 3, /* needs WAL usage */
INSTRUMENT_ALL = PG_INT32_MAX
} InstrumentOption;
/* Parameters set at node creation: */
bool need_timer; /* true if we need timer data */
bool need_bufusage; /* true if we need buffer usage data */
+ bool need_walusage; /* true if we need WAL usage data */
/* Info about current plan cycle: */
bool running; /* true if we've completed first tuple */
instr_time starttime; /* start time of current iteration of node */
double firsttuple; /* time for first tuple of this cycle */
double tuplecount; /* # of tuples emitted so far this cycle */
BufferUsage bufusage_start; /* buffer usage at start */
+ WalUsage walusage_start; /* WAL usage at start */
/* Accumulated statistics across all completed cycles: */
double startup; /* total startup time (in seconds) */
double total; /* total time (in seconds) */
double nfiltered1; /* # of tuples removed by scanqual or joinqual */
double nfiltered2; /* # of tuples removed by "other" quals */
BufferUsage bufusage; /* total buffer usage */
+ WalUsage walusage; /* total WAL usage */
} Instrumentation;
typedef struct WorkerInstrumentation
} WorkerInstrumentation;
extern PGDLLIMPORT BufferUsage pgBufferUsage;
+extern PGDLLIMPORT WalUsage pgWalUsage;
extern Instrumentation *InstrAlloc(int n, int instrument_options);
extern void InstrInit(Instrumentation *instr, int instrument_options);
extern void InstrEndLoop(Instrumentation *instr);
extern void InstrAggNode(Instrumentation *dst, Instrumentation *add);
extern void InstrStartParallelQuery(void);
-extern void InstrEndParallelQuery(BufferUsage *result);
-extern void InstrAccumParallelQuery(BufferUsage *result);
+extern void InstrEndParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
+extern void InstrAccumParallelQuery(BufferUsage *bufusage, WalUsage *walusage);
extern void BufferUsageAccumDiff(BufferUsage *dst,
const BufferUsage *add, const BufferUsage *sub);
+extern void WalUsageAccumDiff(WalUsage *dst, const WalUsage *add,
+ const WalUsage *sub);
#endif /* INSTRUMENT_H */
WalSndSendDataCallback
WalSndState
WalTimeSample
+WalUsage
WalWriteMethod
Walfile
WindowAgg