else
mode = Min(SyncRepWaitMode, SYNC_REP_WAIT_FLUSH);
- Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+ Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
Assert(WalSndCtl != NULL);
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
* assertions, but better safe than sorry).
*/
pg_read_barrier();
- Assert(SHMQueueIsDetached(&(MyProc->syncRepLinks)));
+ Assert(dlist_node_is_detached(&MyProc->syncRepLinks));
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
MyProc->waitLSN = 0;
static void
SyncRepQueueInsert(int mode)
{
- PGPROC *proc;
+ dlist_head *queue;
+ dlist_iter iter;
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
- proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
- &(WalSndCtl->SyncRepQueue[mode]),
- offsetof(PGPROC, syncRepLinks));
+ queue = &WalSndCtl->SyncRepQueue[mode];
- while (proc)
+ dlist_reverse_foreach(iter, queue)
{
+ PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
/*
- * Stop at the queue element that we should after to ensure the queue
- * is ordered by LSN.
+ * Stop at the queue element that we should insert after to ensure the
+ * queue is ordered by LSN.
*/
if (proc->waitLSN < MyProc->waitLSN)
- break;
-
- proc = (PGPROC *) SHMQueuePrev(&(WalSndCtl->SyncRepQueue[mode]),
- &(proc->syncRepLinks),
- offsetof(PGPROC, syncRepLinks));
+ {
+ dlist_insert_after(&proc->syncRepLinks, &MyProc->syncRepLinks);
+ return;
+ }
}
- if (proc)
- SHMQueueInsertAfter(&(proc->syncRepLinks), &(MyProc->syncRepLinks));
- else
- SHMQueueInsertAfter(&(WalSndCtl->SyncRepQueue[mode]), &(MyProc->syncRepLinks));
+ /*
+ * If we get here, the list was either empty, or this process needs to be
+ * at the head.
+ */
+ dlist_push_head(queue, &MyProc->syncRepLinks);
}
/*
SyncRepCancelWait(void)
{
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
- if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
- SHMQueueDelete(&(MyProc->syncRepLinks));
+ if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+ dlist_delete_thoroughly(&MyProc->syncRepLinks);
MyProc->syncRepState = SYNC_REP_NOT_WAITING;
LWLockRelease(SyncRepLock);
}
* First check if we are removed from the queue without the lock to not
* slow down backend exit.
*/
- if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
+ if (!dlist_node_is_detached(&MyProc->syncRepLinks))
{
LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
/* maybe we have just been removed, so recheck */
- if (!SHMQueueIsDetached(&(MyProc->syncRepLinks)))
- SHMQueueDelete(&(MyProc->syncRepLinks));
+ if (!dlist_node_is_detached(&MyProc->syncRepLinks))
+ dlist_delete_thoroughly(&MyProc->syncRepLinks);
LWLockRelease(SyncRepLock);
}
SyncRepWakeQueue(bool all, int mode)
{
volatile WalSndCtlData *walsndctl = WalSndCtl;
- PGPROC *proc = NULL;
- PGPROC *thisproc = NULL;
int numprocs = 0;
+ dlist_mutable_iter iter;
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE));
Assert(SyncRepQueueIsOrderedByLSN(mode));
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
- &(WalSndCtl->SyncRepQueue[mode]),
- offsetof(PGPROC, syncRepLinks));
-
- while (proc)
+ dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode])
{
+ PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
/*
* Assume the queue is ordered by LSN
*/
return numprocs;
/*
- * Move to next proc, so we can delete thisproc from the queue.
- * thisproc is valid, proc may be NULL after this.
- */
- thisproc = proc;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
- &(proc->syncRepLinks),
- offsetof(PGPROC, syncRepLinks));
-
- /*
- * Remove thisproc from queue.
+ * Remove from queue.
*/
- SHMQueueDelete(&(thisproc->syncRepLinks));
+ dlist_delete_thoroughly(&proc->syncRepLinks);
/*
* SyncRepWaitForLSN() reads syncRepState without holding the lock, so
* Set state to complete; see SyncRepWaitForLSN() for discussion of
* the various states.
*/
- thisproc->syncRepState = SYNC_REP_WAIT_COMPLETE;
+ proc->syncRepState = SYNC_REP_WAIT_COMPLETE;
/*
* Wake only when we have set state and removed from queue.
*/
- SetLatch(&(thisproc->procLatch));
+ SetLatch(&(proc->procLatch));
numprocs++;
}
static bool
SyncRepQueueIsOrderedByLSN(int mode)
{
- PGPROC *proc = NULL;
XLogRecPtr lastLSN;
+ dlist_iter iter;
Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE);
lastLSN = 0;
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
- &(WalSndCtl->SyncRepQueue[mode]),
- offsetof(PGPROC, syncRepLinks));
-
- while (proc)
+ dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode])
{
+ PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur);
+
/*
* Check the queue is ordered by LSN and that multiple procs don't
* have matching LSNs
return false;
lastLSN = proc->waitLSN;
-
- proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),
- &(proc->syncRepLinks),
- offsetof(PGPROC, syncRepLinks));
}
return true;