Use dlists instead of SHM_QUEUE for syncrep queue
authorAndres Freund <[email protected]>
Wed, 18 Jan 2023 20:15:05 +0000 (12:15 -0800)
committerAndres Freund <[email protected]>
Wed, 18 Jan 2023 20:15:05 +0000 (12:15 -0800)
Part of a series to remove SHM_QUEUE. ilist.h style lists are more widely used
and have an easier to use interface.

Reviewed-by: Thomas Munro <[email protected]> (in an older version)
Discussion: https://postgr.es/m/20221120055930[email protected]
Discussion: https://postgr.es/m/20200211042229[email protected]

src/backend/replication/syncrep.c
src/backend/replication/walsender.c
src/backend/storage/lmgr/proc.c
src/include/replication/walsender_private.h
src/include/storage/proc.h

index 51e92fc8c712509e0e33d602ec86619c2a8cec7f..80d681b71c84c97ee3bab8dcc3649169e670ec54 100644 (file)
@@ -182,7 +182,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
    else
        mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
 
-   Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+   Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
    Assert(WalSndCtl != NULL);
 
    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
@@ -318,7 +318,7 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
     * assertions, but better safe than sorry).
     */
    pg_read_barrier();
-   Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+   Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
    MyProc->syncRepState = SYNC_REP_NOT_WAITING;
    MyProc->waitLSN = 0;
 
@@ -339,31 +339,32 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit)
 static void
 SyncRepQueueInsert(int mode)
 {
-   PGPROC     *proc;
+   dlist_head *queue;
+   dlist_iter iter;
 
    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
-   proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
-                                  &(WalSndCtl->SyncRepQueue[mode]),
-                                  offsetof(PGPROC, syncRepLinks));
+   queue = &WalSndCtl->SyncRepQueue[mode];
 
-   while (proc)
+   dlist_reverse_foreach(iter, queue)
    {
+       PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
        /*
-        * Stop at the queue element that we should after to ensure the queue
-        * is ordered by LSN.
+        * Stop at the queue element that we should insert after to ensure the
+        * queue is ordered by LSN.
         */
        if (proc->waitLSN < MyProc->waitLSN)
-           break;
-
-       proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
-                                      &(proc->syncRepLinks),
-                                      offsetof(PGPROC, syncRepLinks));
+       {
+           dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
+           return;
+       }
    }
 
-   if (proc)
-       SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
-   else
-       SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
+   /*
+    * If we get here, the list was either empty, or this process needs to be
+    * at the head.
+    */
+   dlist_push_head(queue, &MyProc->syncRepLinks);
 }
 
 /*
@@ -373,8 +374,8 @@ static void
 SyncRepCancelWait(void)
 {
    LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
-   if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
-       SHMQueueDelete(&(MyProc->syncRepLinks));
+   if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+       dlist_delete_thoroughly(&MyProc->syncRepLinks);
    MyProc->syncRepState = SYNC_REP_NOT_WAITING;
    LWLockRelease(SyncRepLock);
 }
@@ -386,13 +387,13 @@ SyncRepCleanupAtProcExit(void)
     * First check if we are removed from the queue without the lock to not
     * slow down backend exit.
     */
-   if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
+   if (!dlist_node_is_detached(&MyProc->syncRepLinks))
    {
        LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
 
        /* maybe we have just been removed, so recheck */
-       if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
-           SHMQueueDelete(&(MyProc->syncRepLinks));
+       if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+           dlist_delete_thoroughly(&MyProc->syncRepLinks);
 
        LWLockRelease(SyncRepLock);
    }
@@ -879,20 +880,17 @@ static int
 SyncRepWakeQueue(bool all, int mode)
 {
    volatile WalSndCtlData *walsndctl = WalSndCtl;
-   PGPROC     *proc = NULL;
-   PGPROC     *thisproc = NULL;
    int         numprocs = 0;
+   dlist_mutable_iter iter;
 
    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
    Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
    Assert(SyncRepQueueIsOrderedByLSN(mode));
 
-   proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-                                  &(WalSndCtl->SyncRepQueue[mode]),
-                                  offsetof(PGPROC, syncRepLinks));
-
-   while (proc)
+   dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
    {
+       PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
        /*
         * Assume the queue is ordered by LSN
         */
@@ -900,18 +898,9 @@ SyncRepWakeQueue(bool all, int mode)
            return numprocs;
 
        /*
-        * Move to next proc, so we can delete thisproc from the queue.
-        * thisproc is valid, proc may be NULL after this.
-        */
-       thisproc = proc;
-       proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-                                      &(proc->syncRepLinks),
-                                      offsetof(PGPROC, syncRepLinks));
-
-       /*
-        * Remove thisproc from queue.
+        * Remove from queue.
         */
-       SHMQueueDelete(&(thisproc->syncRepLinks));
+       dlist_delete_thoroughly(&proc->syncRepLinks);
 
        /*
         * SyncRepWaitForLSN() reads syncRepState without holding the lock, so
@@ -924,12 +913,12 @@ SyncRepWakeQueue(bool all, int mode)
         * Set state to complete; see SyncRepWaitForLSN() for discussion of
         * the various states.
         */
-       thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
+       proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
 
        /*
         * Wake only when we have set state and removed from queue.
         */
-       SetLatch(&(thisproc->procLatch));
+       SetLatch(&(proc->procLatch));
 
        numprocs++;
    }
@@ -983,19 +972,17 @@ SyncRepUpdateSyncStandbysDefined(void)
 static bool
 SyncRepQueueIsOrderedByLSN(int mode)
 {
-   PGPROC     *proc = NULL;
    XLogRecPtr  lastLSN;
+   dlist_iter  iter;
 
    Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
 
    lastLSN = 0;
 
-   proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-                                  &(WalSndCtl->SyncRepQueue[mode]),
-                                  offsetof(PGPROC, syncRepLinks));
-
-   while (proc)
+   dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
    {
+       PGPROC     *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
        /*
         * Check the queue is ordered by LSN and that multiple procs don't
         * have matching LSNs
@@ -1004,10 +991,6 @@ SyncRepQueueIsOrderedByLSN(int mode)
            return false;
 
        lastLSN = proc->waitLSN;
-
-       proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
-                                      &(proc->syncRepLinks),
-                                      offsetof(PGPROC, syncRepLinks));
    }
 
    return true;
index 015ae2995d5371461652643bb14533c7e8ef84b6..4ed3747e3f9a15b3d683e359214c30c8ea8ec8f2 100644 (file)
@@ -3275,7 +3275,7 @@ WalSndShmemInit(void)
        MemSet(WalSndCtl, 0, WalSndShmemSize());
 
        for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
-           SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
+           dlist_init(&(WalSndCtl->SyncRepQueue[i]));
 
        for (i = 0; i < max_wal_senders; i++)
        {
index e2adf493ca920ce88aec69f7e3a99c151d8abc05..f8ac4edd6ffe00450603bd3c02aa6d613f82ecc4 100644 (file)
@@ -410,7 +410,7 @@ InitProcess(void)
    /* Initialize fields for sync rep */
    MyProc->waitLSN = 0;
    MyProc->syncRepState = SYNC_REP_NOT_WAITING;
-   SHMQueueElemInit(&(MyProc->syncRepLinks));
+   dlist_node_init(&MyProc->syncRepLinks);
 
    /* Initialize fields for group XID clearing. */
    MyProc->procArrayGroupMember = false;
index e5e779aeb7dc55d4a2ae53cef8cdc8b65449ca08..5310e054c488777f12ea9d3e60967401d8584f93 100644 (file)
@@ -13,6 +13,7 @@
 #define _WALSENDER_PRIVATE_H
 
 #include "access/xlog.h"
+#include "lib/ilist.h"
 #include "nodes/nodes.h"
 #include "replication/syncrep.h"
 #include "storage/latch.h"
@@ -89,7 +90,7 @@ typedef struct
     * Synchronous replication queue with one queue per request type.
     * Protected by SyncRepLock.
     */
-   SHM_QUEUE   SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
+   dlist_head  SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];
 
    /*
     * Current location of the head of the queue. All waiters should have a
index 6eb46a7ee8e04cdbb31a9d14134b5dbd3d3195e7..dd45b8ee9b7424a9b56d08fa19054352c5753f9e 100644 (file)
@@ -248,7 +248,7 @@ struct PGPROC
     */
    XLogRecPtr  waitLSN;        /* waiting for this LSN or higher */
    int         syncRepState;   /* wait state for sync rep */
-   SHM_QUEUE   syncRepLinks;   /* list link if process is in syncrep queue */
+   dlist_node  syncRepLinks;   /* list link if process is in syncrep queue */
 
    /*
     * All PROCLOCK objects for locks held or awaited by this backend are