* - All transactions share this single lock (with no partitioning).
* - There is never a need for a process other than the one running
* an active transaction to walk the list of locks held by that
- * transaction.
+ * transaction, except parallel query workers sharing the leader's
+ * transaction. In the parallel case, an extra per-sxact lock is
+ * taken; see below.
* - It is relatively infrequent that another process needs to
* modify the list for a transaction, but it does happen for such
* things as index page splits for pages with predicate locks and
* than its own active transaction must acquire an exclusive
* lock.
*
+ * SERIALIZABLEXACT's member 'predicateLockListLock'
+ * - Protects the linked list of locks held by a transaction. Only
+ * needed for parallel mode, where multiple backends share the
+ * same SERIALIZABLEXACT object. Not needed if
+ * SerializablePredicateLockListLock is held exclusively.
+ *
* PredicateLockHashPartitionLock(hashcode)
* - The same lock protects a target, all locks on that target, and
* the linked list of locks on the target.
* PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
* BlockNumber newblkno)
* TransferPredicateLocksToHeapRelation(Relation relation)
- * ReleasePredicateLocks(bool isCommit)
+ * ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
*
* conflict detection (may also trigger rollback)
* CheckForSerializableConflictOut(bool visible, Relation relation,
#include "access/heapam.h"
#include "access/htup_details.h"
+#include "access/parallel.h"
#include "access/slru.h"
#include "access/subtrans.h"
#include "access/transam.h"
#define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
#define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
#define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsPartiallyReleased(sxact) (((sxact)->flags & SXACT_FLAG_PARTIALLY_RELEASED) != 0)
/*
* Compute the hash code associated with a PREDICATELOCKTARGETTAG.
static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
static bool MyXactDidWrite = false;
+/*
+ * The SXACT_FLAG_RO_UNSAFE optimization might lead us to release
+ * MySerializableXact early. If that happens in a parallel query, the leader
+ * needs to defer the destruction of the SERIALIZABLEXACT until end of
+ * transaction, because the workers still have a reference to it. In that
+ * case, the leader stores it here.
+ */
+static SERIALIZABLEXACT *SavedSerializableXact = InvalidSerializableXact;
+
/* local functions */
static SERIALIZABLEXACT *CreatePredXact(void);
static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
SERIALIZABLEXACT *writer);
+static void CreateLocalPredicateLockHash(void);
+static void ReleasePredicateLocksLocal(void);
/*------------------------------------------------------------------------*/
*/
if (SxactIsROSafe(MySerializableXact))
{
- ReleasePredicateLocks(false);
+ ReleasePredicateLocks(false, true);
return false;
}
memset(PredXact->element, 0, requestSize);
for (i = 0; i < max_table_size; i++)
{
+ LWLockInitialize(&PredXact->element[i].sxact.predicateLockListLock,
+ LWTRANCHE_SXACT);
SHMQueueInsertBefore(&(PredXact->availableList),
&(PredXact->element[i].link));
}
ereport(DEBUG2,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("deferrable snapshot was unsafe; trying a new one")));
- ReleasePredicateLocks(false);
+ ReleasePredicateLocks(false, false);
}
/*
* Now we have a safe snapshot, so we don't need to do any further checks.
*/
Assert(SxactIsROSafe(MySerializableXact));
- ReleasePredicateLocks(false);
+ ReleasePredicateLocks(false, true);
return snapshot;
}
{
Assert(IsolationIsSerializable());
+ /*
+ * If this is called by parallel.c in a parallel worker, we don't want to
+ * create a SERIALIZABLEXACT just yet because the leader's
+ * SERIALIZABLEXACT will be installed with AttachSerializableXact(). We
+ * also don't want to reject SERIALIZABLE READ ONLY DEFERRABLE in this
+ * case, because the leader has already determined that the snapshot it
+ * has passed us is safe. So there is nothing for us to do.
+ */
+ if (IsParallelWorker())
+ return;
+
/*
* We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to
* import snapshots, since there's no way to wait for a safe snapshot when
VirtualTransactionId vxid;
SERIALIZABLEXACT *sxact,
*othersxact;
- HASHCTL hash_ctl;
/* We only do this for serializable transactions. Once. */
Assert(MySerializableXact == InvalidSerializableXact);
LWLockRelease(SerializableXactHashLock);
+ CreateLocalPredicateLockHash();
+
+ return snapshot;
+}
+
+static void
+CreateLocalPredicateLockHash(void)
+{
+ HASHCTL hash_ctl;
+
/* Initialize the backend-local hash table of parent locks */
Assert(LocalPredicateLockHash == NULL);
MemSet(&hash_ctl, 0, sizeof(hash_ctl));
max_predicate_locks_per_xact,
&hash_ctl,
HASH_ELEM | HASH_BLOBS);
-
- return snapshot;
}
/*
* This implementation is assuming that the usage of each target tag field
* is uniform. No need to make this hard if we don't have to.
*
- * We aren't acquiring lightweight locks for the predicate lock or lock
+ * We acquire an LWLock in the case of parallel mode, because worker
+ * backends have access to the leader's SERIALIZABLEXACT. Otherwise,
+ * we aren't acquiring LWLocks for the predicate lock or lock
* target structures associated with this transaction unless we're going
* to modify them, because no other process is permitted to modify our
* locks.
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
sxact = MySerializableXact;
+ if (IsInParallelMode())
+ LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
predlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks),
&(sxact->predicateLocks),
predlock = nextpredlock;
}
+ if (IsInParallelMode())
+ LWLockRelease(&sxact->predicateLockListLock);
LWLockRelease(SerializablePredicateLockListLock);
}
partitionLock = PredicateLockHashPartitionLock(targettaghash);
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ if (IsInParallelMode())
+ LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/* Make sure that the target is represented. */
}
LWLockRelease(partitionLock);
+ if (IsInParallelMode())
+ LWLockRelease(&sxact->predicateLockListLock);
LWLockRelease(SerializablePredicateLockListLock);
}
PREDICATELOCK *nextpredlock;
bool found;
- Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+ Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
+ LW_EXCLUSIVE));
Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
predlock = (PREDICATELOCK *)
* covers it, or if we are absolutely certain that no one will need to
* refer to that lock in the future.
*
- * Caller must hold SerializablePredicateLockListLock.
+ * Caller must hold SerializablePredicateLockListLock exclusively.
*/
static bool
TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
bool found;
bool outOfShmem = false;
- Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+ Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
+ LW_EXCLUSIVE));
oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
* If this transaction is committing and is holding any predicate locks,
* it must be added to a list of completed serializable transactions still
* holding locks.
+ *
+ * If isReadOnlySafe is true, then predicate locks are being released before
+ * the end of the transaction because MySerializableXact has been determined
+ * to be RO_SAFE. In non-parallel mode we can release it completely, but it
+ * in parallel mode we partially release the SERIALIZABLEXACT and keep it
+ * around until the end of the transaction, allowing each backend to clear its
+ * MySerializableXact variable and benefit from the optimization in its own
+ * time.
*/
void
-ReleasePredicateLocks(bool isCommit)
+ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
{
bool needToClear;
RWConflict conflict,
*/
bool topLevelIsDeclaredReadOnly;
+ /* We can't be both committing and releasing early due to RO_SAFE. */
+ Assert(!(isCommit && isReadOnlySafe));
+
+ /* Are we at the end of a transaction, that is, a commit or abort? */
+ if (!isReadOnlySafe)
+ {
+ /*
+ * Parallel workers mustn't release predicate locks at the end of
+ * their transaction. The leader will do that at the end of its
+ * transaction.
+ */
+ if (IsParallelWorker())
+ {
+ ReleasePredicateLocksLocal();
+ return;
+ }
+
+ /*
+ * By the time the leader in a parallel query reaches end of
+ * transaction, it has waited for all workers to exit.
+ */
+ Assert(!ParallelContextActive());
+
+ /*
+ * If the leader in a parallel query earlier stashed a partially
+ * released SERIALIZABLEXACT for final clean-up at end of transaction
+ * (because workers might still have been accessing it), then it's
+ * time to restore it.
+ */
+ if (SavedSerializableXact != InvalidSerializableXact)
+ {
+ Assert(MySerializableXact == InvalidSerializableXact);
+ MySerializableXact = SavedSerializableXact;
+ SavedSerializableXact = InvalidSerializableXact;
+ Assert(SxactIsPartiallyReleased(MySerializableXact));
+ }
+ }
+
if (MySerializableXact == InvalidSerializableXact)
{
Assert(LocalPredicateLockHash == NULL);
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+ /*
+ * If the transaction is committing, but it has been partially released
+ * already, then treat this as a roll back. It was marked as rolled back.
+ */
+ if (isCommit && SxactIsPartiallyReleased(MySerializableXact))
+ isCommit = false;
+
+ /*
+ * If we're called in the middle of a transaction because we discovered
+ * that the SXACT_FLAG_RO_SAFE flag was set, then we'll partially release
+ * it (that is, release the predicate locks and conflicts, but not the
+ * SERIALIZABLEXACT itself) if we're the first backend to have noticed.
+ */
+ if (isReadOnlySafe && IsInParallelMode())
+ {
+ /*
+ * The leader needs to stash a pointer to it, so that it can
+ * completely release it at end-of-transaction.
+ */
+ if (!IsParallelWorker())
+ SavedSerializableXact = MySerializableXact;
+
+ /*
+ * The first backend to reach this condition will partially release
+ * the SERIALIZABLEXACT. All others will just clear their
+ * backend-local state so that they stop doing SSI checks for the rest
+ * of the transaction.
+ */
+ if (SxactIsPartiallyReleased(MySerializableXact))
+ {
+ LWLockRelease(SerializableXactHashLock);
+ ReleasePredicateLocksLocal();
+ return;
+ }
+ else
+ {
+ MySerializableXact->flags |= SXACT_FLAG_PARTIALLY_RELEASED;
+ /* ... and proceed to perform the partial release below. */
+ }
+ }
Assert(!isCommit || SxactIsPrepared(MySerializableXact));
Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
Assert(!SxactIsCommitted(MySerializableXact));
- Assert(!SxactIsRolledBack(MySerializableXact));
+ Assert(SxactIsPartiallyReleased(MySerializableXact)
+ || !SxactIsRolledBack(MySerializableXact));
/* may not be serializable during COMMIT/ROLLBACK PREPARED */
Assert(MySerializableXact->pid == 0 || IsolationIsSerializable());
MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
/*
- * If it's not a commit it's a rollback, and we can clear our locks
- * immediately.
+ * If it's not a commit it's either a rollback or a read-only transaction
+ * flagged SXACT_FLAG_RO_SAFE, and we can clear our locks immediately.
*/
if (isCommit)
{
* cleanup. This means it should not be considered when calculating
* SxactGlobalXmin.
*/
- MySerializableXact->flags |= SXACT_FLAG_DOOMED;
+ if (!isReadOnlySafe)
+ MySerializableXact->flags |= SXACT_FLAG_DOOMED;
MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
/*
* was launched.
*/
needToClear = false;
- if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
+ if (!isReadOnlySafe &&
+ TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
{
Assert(PredXact->SxactGlobalXminCount > 0);
if (--(PredXact->SxactGlobalXminCount) == 0)
SHMQueueInsertBefore(FinishedSerializableTransactions,
&MySerializableXact->finishedLink);
+ /*
+ * If we're releasing a RO_SAFE transaction in parallel mode, we'll only
+ * partially release it. That's necessary because other backends may have
+ * a reference to it. The leader will release the SERIALIZABLEXACT itself
+ * at the end of the transaction after workers have stopped running.
+ */
if (!isCommit)
- ReleaseOneSerializableXact(MySerializableXact, false, false);
+ ReleaseOneSerializableXact(MySerializableXact,
+ isReadOnlySafe && IsInParallelMode(),
+ false);
LWLockRelease(SerializableFinishedListLock);
if (needToClear)
ClearOldPredicateLocks();
+ ReleasePredicateLocksLocal();
+}
+
+static void
+ReleasePredicateLocksLocal(void)
+{
MySerializableXact = InvalidSerializableXact;
MyXactDidWrite = false;
* them to OldCommittedSxact if summarize is true)
*/
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ if (IsInParallelMode())
+ LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
predlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks),
&(sxact->predicateLocks),
*/
SHMQueueInit(&sxact->predicateLocks);
+ if (IsInParallelMode())
+ LWLockRelease(&sxact->predicateLockListLock);
LWLockRelease(SerializablePredicateLockListLock);
sxidtag.xid = sxact->topXid;
PREDICATELOCK *rmpredlock;
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ if (IsInParallelMode())
+ LWLockAcquire(&MySerializableXact->predicateLockListLock, LW_EXCLUSIVE);
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
LWLockRelease(SerializableXactHashLock);
LWLockRelease(partitionLock);
+ if (IsInParallelMode())
+ LWLockRelease(&MySerializableXact->predicateLockListLock);
LWLockRelease(SerializablePredicateLockListLock);
if (rmpredlock != NULL)
/* Check if someone else has already decided that we need to die */
if (SxactIsDoomed(MySerializableXact))
{
+ Assert(!SxactIsPartiallyReleased(MySerializableXact));
LWLockRelease(SerializableXactHashLock);
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
*/
LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+ /*
+ * No need to take sxact->predicateLockListLock in parallel mode because
+ * there cannot be any parallel workers running while we are preparing a
+ * transaction.
+ */
+ Assert(!IsParallelWorker() && !ParallelContextActive());
+
predlock = (PREDICATELOCK *)
SHMQueueNext(&(sxact->predicateLocks),
&(sxact->predicateLocks),
MySerializableXact = sxid->myXact;
MyXactDidWrite = true; /* conservatively assume that we wrote
* something */
- ReleasePredicateLocks(isCommit);
+ ReleasePredicateLocks(isCommit, false);
}
/*
CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
}
}
+
+/*
+ * Prepare to share the current SERIALIZABLEXACT with parallel workers.
+ * Return a handle object that can be used by AttachSerializableXact() in a
+ * parallel worker.
+ */
+SerializableXactHandle
+ShareSerializableXact(void)
+{
+ return MySerializableXact;
+}
+
+/*
+ * Allow parallel workers to import the leader's SERIALIZABLEXACT.
+ */
+void
+AttachSerializableXact(SerializableXactHandle handle)
+{
+
+ Assert(MySerializableXact == InvalidSerializableXact);
+
+ MySerializableXact = (SERIALIZABLEXACT *) handle;
+ if (MySerializableXact != InvalidSerializableXact)
+ CreateLocalPredicateLockHash();
+}