Introduce compact WAL record for the common case of commit (non-DDL).
authorSimon Riggs <[email protected]>
Tue, 28 Jun 2011 21:58:17 +0000 (22:58 +0100)
committerSimon Riggs <[email protected]>
Tue, 28 Jun 2011 21:58:17 +0000 (22:58 +0100)
XLOG_XACT_COMMIT_COMPACT leaves out invalidation messages and relfilenodes,
saving considerable space for the vast majority of transaction commits.
XLOG_XACT_COMMIT keeps same definition as XLOG_PAGE_MAGIC 0xD067 and earlier.

Leonardo Francalanci and Simon Riggs

src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/include/access/xact.h
src/include/access/xlog_internal.h

index 2ca1c1454993dbb9b7695ad41a96fa6c2932c503..7f01a83c4f9c9af1094c5d5bd8efd9b863c57050 100644 (file)
@@ -962,25 +962,9 @@ RecordTransactionCommit(void)
        /*
         * Begin commit critical section and insert the commit XLOG record.
         */
-       XLogRecData rdata[4];
-       int         lastrdata = 0;
-       xl_xact_commit xlrec;
-
        /* Tell bufmgr and smgr to prepare for commit */
        BufmgrCommit();
 
-       /*
-        * Set flags required for recovery processing of commits.
-        */
-       xlrec.xinfo = 0;
-       if (RelcacheInitFileInval)
-           xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
-       if (forceSyncCommit)
-           xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
-       xlrec.dbId = MyDatabaseId;
-       xlrec.tsId = MyDatabaseTableSpace;
-
        /*
         * Mark ourselves as within our "commit critical section".  This
         * forces any concurrent checkpoint to wait until we've updated
@@ -1002,43 +986,88 @@ RecordTransactionCommit(void)
        MyProc->inCommit = true;
 
        SetCurrentTransactionStopTimestamp();
-       xlrec.xact_time = xactStopTimestamp;
-       xlrec.nrels = nrels;
-       xlrec.nsubxacts = nchildren;
-       xlrec.nmsgs = nmsgs;
-       rdata[0].data = (char *) (&xlrec);
-       rdata[0].len = MinSizeOfXactCommit;
-       rdata[0].buffer = InvalidBuffer;
-       /* dump rels to delete */
-       if (nrels > 0)
-       {
-           rdata[0].next = &(rdata[1]);
-           rdata[1].data = (char *) rels;
-           rdata[1].len = nrels * sizeof(RelFileNode);
-           rdata[1].buffer = InvalidBuffer;
-           lastrdata = 1;
-       }
-       /* dump committed child Xids */
-       if (nchildren > 0)
+
+       /*
+        * Do we need the long commit record? If not, use the compact format.
+        */
+       if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
        {
-           rdata[lastrdata].next = &(rdata[2]);
-           rdata[2].data = (char *) children;
-           rdata[2].len = nchildren * sizeof(TransactionId);
-           rdata[2].buffer = InvalidBuffer;
-           lastrdata = 2;
+           XLogRecData rdata[4];
+           int         lastrdata = 0;
+           xl_xact_commit xlrec;
+           /*
+            * Set flags required for recovery processing of commits.
+            */
+           xlrec.xinfo = 0;
+           if (RelcacheInitFileInval)
+               xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+           if (forceSyncCommit)
+               xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+           xlrec.dbId = MyDatabaseId;
+           xlrec.tsId = MyDatabaseTableSpace;
+
+           xlrec.xact_time = xactStopTimestamp;
+           xlrec.nrels = nrels;
+           xlrec.nsubxacts = nchildren;
+           xlrec.nmsgs = nmsgs;
+           rdata[0].data = (char *) (&xlrec);
+           rdata[0].len = MinSizeOfXactCommit;
+           rdata[0].buffer = InvalidBuffer;
+           /* dump rels to delete */
+           if (nrels > 0)
+           {
+               rdata[0].next = &(rdata[1]);
+               rdata[1].data = (char *) rels;
+               rdata[1].len = nrels * sizeof(RelFileNode);
+               rdata[1].buffer = InvalidBuffer;
+               lastrdata = 1;
+           }
+           /* dump committed child Xids */
+           if (nchildren > 0)
+           {
+               rdata[lastrdata].next = &(rdata[2]);
+               rdata[2].data = (char *) children;
+               rdata[2].len = nchildren * sizeof(TransactionId);
+               rdata[2].buffer = InvalidBuffer;
+               lastrdata = 2;
+           }
+           /* dump shared cache invalidation messages */
+           if (nmsgs > 0)
+           {
+               rdata[lastrdata].next = &(rdata[3]);
+               rdata[3].data = (char *) invalMessages;
+               rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
+               rdata[3].buffer = InvalidBuffer;
+               lastrdata = 3;
+           }
+           rdata[lastrdata].next = NULL;
+
+           (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
        }
-       /* dump shared cache invalidation messages */
-       if (nmsgs > 0)
+       else
        {
-           rdata[lastrdata].next = &(rdata[3]);
-           rdata[3].data = (char *) invalMessages;
-           rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
-           rdata[3].buffer = InvalidBuffer;
-           lastrdata = 3;
-       }
-       rdata[lastrdata].next = NULL;
+           XLogRecData rdata[2];
+           int         lastrdata = 0;
+           xl_xact_commit_compact  xlrec;
+           xlrec.xact_time = xactStopTimestamp;
+           xlrec.nsubxacts = nchildren;
+           rdata[0].data = (char *) (&xlrec);
+           rdata[0].len = MinSizeOfXactCommitCompact;
+           rdata[0].buffer = InvalidBuffer;
+           /* dump committed child Xids */
+           if (nchildren > 0)
+           {
+               rdata[0].next = &(rdata[1]);
+               rdata[1].data = (char *) children;
+               rdata[1].len = nchildren * sizeof(TransactionId);
+               rdata[1].buffer = InvalidBuffer;
+               lastrdata = 1;
+           }
+           rdata[lastrdata].next = NULL;
 
-       (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
+           (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT, rdata);
+       }
    }
 
    /*
@@ -4441,19 +4470,17 @@ xactGetCommittedChildren(TransactionId **ptr)
  * actions for which the order of execution is critical.
  */
 static void
-xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
+xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
+                   TransactionId *sub_xids, int nsubxacts,
+                   SharedInvalidationMessage *inval_msgs, int nmsgs,
+                   RelFileNode *xnodes, int nrels,
+                   Oid dbId, Oid tsId,
+                   uint32 xinfo)
 {
-   TransactionId *sub_xids;
-   SharedInvalidationMessage *inval_msgs;
    TransactionId max_xid;
    int         i;
 
-   /* subxid array follows relfilenodes */
-   sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-   /* invalidation messages array follows subxids */
-   inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]);
-
-   max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+   max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
 
    /*
     * Make sure nextXid is beyond any XID mentioned in the record.
@@ -4476,7 +4503,7 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
        /*
         * Mark the transaction committed in pg_clog.
         */
-       TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids);
+       TransactionIdCommitTree(xid, nsubxacts, sub_xids);
    }
    else
    {
@@ -4500,41 +4527,41 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
         * bits set on changes made by transactions that haven't yet
         * recovered. It's unlikely but it's good to be safe.
         */
-       TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn);
+       TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
 
        /*
         * We must mark clog before we update the ProcArray.
         */
-       ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+       ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
 
        /*
         * Send any cache invalidations attached to the commit. We must
         * maintain the same order of invalidation then release locks as
         * occurs in CommitTransaction().
         */
-       ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs,
-                                 XactCompletionRelcacheInitFileInval(xlrec),
-                                            xlrec->dbId, xlrec->tsId);
+       ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
+                                 XactCompletionRelcacheInitFileInval(xinfo),
+                                            dbId, tsId);
 
        /*
         * Release locks, if any. We do this for both two phase and normal one
         * phase transactions. In effect we are ignoring the prepare phase and
         * just going straight to lock release.
         */
-       StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+       StandbyReleaseLockTree(xid, nsubxacts, sub_xids);
    }
 
    /* Make sure files supposed to be dropped are dropped */
-   for (i = 0; i < xlrec->nrels; i++)
+   for (i = 0; i < nrels; i++)
    {
-       SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+       SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
        ForkNumber  fork;
 
        for (fork = 0; fork <= MAX_FORKNUM; fork++)
        {
            if (smgrexists(srel, fork))
            {
-               XLogDropRelation(xlrec->xnodes[i], fork);
+               XLogDropRelation(xnodes[i], fork);
                smgrdounlink(srel, fork, true);
            }
        }
@@ -4553,8 +4580,46 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
     * to reduce that problem window, for any user that requested
     * ForceSyncCommit().
     */
-   if (XactCompletionForceSyncCommit(xlrec))
+   if (XactCompletionForceSyncCommit(xinfo))
        XLogFlush(lsn);
+
+}
+/*
+ * Utility function to call xact_redo_commit_internal after breaking down xlrec
+ */
+static void
+xact_redo_commit(xl_xact_commit *xlrec,
+                           TransactionId xid, XLogRecPtr lsn)
+{
+   TransactionId *subxacts;
+   SharedInvalidationMessage *inval_msgs;
+
+   /* subxid array follows relfilenodes */
+   subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+   /* invalidation messages array follows subxids */
+   inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
+
+   xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
+                               inval_msgs, xlrec->nmsgs,
+                               xlrec->xnodes, xlrec->nrels,
+                               xlrec->dbId,
+                               xlrec->tsId,
+                               xlrec->xinfo);
+}
+
+/*
+ * Utility function to call xact_redo_commit_internal  for compact form of message.
+ */
+static void
+xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
+                           TransactionId xid, XLogRecPtr lsn)
+{
+   xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
+                               NULL, 0,        /* inval msgs */
+                               NULL, 0,        /* relfilenodes */
+                               InvalidOid,     /* dbId */
+                               InvalidOid,     /* tsId */
+                               0);             /* xinfo */
 }
 
 /*
@@ -4655,7 +4720,13 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
    /* Backup blocks are not used in xact records */
    Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
 
-   if (info == XLOG_XACT_COMMIT)
+   if (info == XLOG_XACT_COMMIT_COMPACT)
+   {
+       xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
+
+       xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+   }
+   else if (info == XLOG_XACT_COMMIT)
    {
        xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
 
@@ -4703,9 +4774,9 @@ static void
 xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 {
    int         i;
-   TransactionId *xacts;
+   TransactionId *subxacts;
 
-   xacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+   subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
 
    appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
 
@@ -4724,15 +4795,15 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
    {
        appendStringInfo(buf, "; subxacts:");
        for (i = 0; i < xlrec->nsubxacts; i++)
-           appendStringInfo(buf, " %u", xacts[i]);
+           appendStringInfo(buf, " %u", subxacts[i]);
    }
    if (xlrec->nmsgs > 0)
    {
        SharedInvalidationMessage *msgs;
 
-       msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts];
+       msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
 
-       if (XactCompletionRelcacheInitFileInval(xlrec))
+       if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
            appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
                             xlrec->dbId, xlrec->tsId);
 
@@ -4758,6 +4829,21 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
    }
 }
 
+static void
+xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
+{
+   int         i;
+
+   appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
+
+   if (xlrec->nsubxacts > 0)
+   {
+       appendStringInfo(buf, "; subxacts:");
+       for (i = 0; i < xlrec->nsubxacts; i++)
+           appendStringInfo(buf, " %u", xlrec->subxacts[i]);
+   }
+}
+
 static void
 xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
 {
@@ -4802,7 +4888,14 @@ xact_desc(StringInfo buf, uint8 xl_info, char *rec)
 {
    uint8       info = xl_info & ~XLR_INFO_MASK;
 
-   if (info == XLOG_XACT_COMMIT)
+   if (info == XLOG_XACT_COMMIT_COMPACT)
+   {
+       xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
+
+       appendStringInfo(buf, "commit: ");
+       xact_desc_commit_compact(buf, xlrec);
+   }
+   else if (info == XLOG_XACT_COMMIT)
    {
        xl_xact_commit *xlrec = (xl_xact_commit *) rec;
 
index 4952d223cdf7e6fe69c7f8b1b89df57d6949077f..178a458eb787c1efa88742825272a793a485d2c4 100644 (file)
@@ -5593,7 +5593,14 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
    if (record->xl_rmid != RM_XACT_ID && record->xl_rmid != RM_XLOG_ID)
        return false;
    record_info = record->xl_info & ~XLR_INFO_MASK;
-   if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+   if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
+   {
+       xl_xact_commit_compact *recordXactCommitData;
+
+       recordXactCommitData = (xl_xact_commit_compact *) XLogRecGetData(record);
+       recordXtime = recordXactCommitData->xact_time;
+   }
+   else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
    {
        xl_xact_commit *recordXactCommitData;
 
@@ -5680,7 +5687,7 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
        recoveryStopTime = recordXtime;
        recoveryStopAfter = *includeThis;
 
-       if (record_info == XLOG_XACT_COMMIT)
+       if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
        {
            if (recoveryStopAfter)
                ereport(LOG,
index cb440d41f1401f69698481e2c175eb903f879fed..c585752b0043023480545bc0fc8de6a721021547 100644 (file)
@@ -106,6 +106,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED  0x30
 #define XLOG_XACT_ABORT_PREPARED   0x40
 #define XLOG_XACT_ASSIGNMENT       0x50
+#define XLOG_XACT_COMMIT_COMPACT   0x60
 
 typedef struct xl_xact_assignment
 {
@@ -116,6 +117,16 @@ typedef struct xl_xact_assignment
 
 #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
 
+typedef struct xl_xact_commit_compact
+{
+   TimestampTz xact_time;      /* time of commit */
+   int         nsubxacts;      /* number of subtransaction XIDs */
+   /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
+   TransactionId subxacts[1];  /* VARIABLE LENGTH ARRAY */
+} xl_xact_commit_compact;
+
+#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
+
 typedef struct xl_xact_commit
 {
    TimestampTz xact_time;      /* time of commit */
@@ -145,8 +156,8 @@ typedef struct xl_xact_commit
 #define XACT_COMPLETION_FORCE_SYNC_COMMIT      0x02
 
 /* Access macros for above flags */
-#define XactCompletionRelcacheInitFileInval(xlrec) ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
-#define XactCompletionForceSyncCommit(xlrec)       ((xlrec)->xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
+#define XactCompletionRelcacheInitFileInval(xinfo) (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
+#define XactCompletionForceSyncCommit(xinfo)       (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
 
 typedef struct xl_xact_abort
 {
index 34316fffeba0342637b84b4e9a865a1d29ab1354..4eaa243948be5023707e9d35d04504a0ebdf6416 100644 (file)
@@ -71,7 +71,7 @@ typedef struct XLogContRecord
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD067 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD068 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {