Enable parallel query with SERIALIZABLE isolation.
authorThomas Munro <[email protected]>
Fri, 15 Mar 2019 03:23:46 +0000 (16:23 +1300)
committerThomas Munro <[email protected]>
Fri, 15 Mar 2019 04:47:04 +0000 (17:47 +1300)
Previously, the SERIALIZABLE isolation level prevented parallel query
from being used.  Allow the two features to be used together by
sharing the leader's SERIALIZABLEXACT with parallel workers.

An extra per-SERIALIZABLEXACT LWLock is introduced to make it safe to
share, and new logic is introduced to coordinate the early release
of the SERIALIZABLEXACT required for the SXACT_FLAG_RO_SAFE
optimization, as follows:

The first backend to observe the SXACT_FLAG_RO_SAFE flag (set by
some other transaction) will 'partially release' the SERIALIZABLEXACT,
meaning that the conflicts and locks it holds are released, but the
SERIALIZABLEXACT itself will remain active because other backends
might still have a pointer to it.

Whenever any backend notices the SXACT_FLAG_RO_SAFE flag, it clears
its own MySerializableXact variable and frees local resources so that
it can skip SSI checks for the rest of the transaction.  In the
special case of the leader process, it transfers the SERIALIZABLEXACT
to a new variable SavedSerializableXact, so that it can be completely
released at the end of the transaction after all workers have exited.

Remove the serializable_okay flag added to CreateParallelContext() by
commit 9da0cc35, because it's now redundant.

Author: Thomas Munro
Reviewed-by: Haribabu Kommi, Robert Haas, Masahiko Sawada, Kevin Grittner
Discussion: https://postgr.es/m/CAEepm=0gXGYhtrVDWOTHS8SQQy_=S9xo+8oCxGLWZAOoeJ=yzQ@mail.gmail.com

19 files changed:
doc/src/sgml/monitoring.sgml
doc/src/sgml/parallel.sgml
src/backend/access/nbtree/nbtsort.c
src/backend/access/transam/parallel.c
src/backend/access/transam/xact.c
src/backend/executor/execParallel.c
src/backend/optimizer/plan/planner.c
src/backend/storage/lmgr/lwlock.c
src/backend/storage/lmgr/predicate.c
src/backend/utils/resowner/resowner.c
src/include/access/parallel.h
src/include/storage/lwlock.h
src/include/storage/predicate.h
src/include/storage/predicate_internals.h
src/test/isolation/expected/serializable-parallel-2.out[new file with mode: 0644]
src/test/isolation/expected/serializable-parallel.out[new file with mode: 0644]
src/test/isolation/isolation_schedule
src/test/isolation/specs/serializable-parallel-2.spec[new file with mode: 0644]
src/test/isolation/specs/serializable-parallel.spec[new file with mode: 0644]

index 60b89356f709981827d86cce8dc7379aebc3d821..ac2721c8ad7a3e463ca51b1354274c224991ff68 100644 (file)
@@ -861,7 +861,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
       <tbody>
        <row>
-        <entry morerows="63"><literal>LWLock</literal></entry>
+        <entry morerows="64"><literal>LWLock</literal></entry>
         <entry><literal>ShmemIndexLock</literal></entry>
         <entry>Waiting to find or allocate space in shared memory.</entry>
        </row>
@@ -1121,6 +1121,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
          <entry><literal>predicate_lock_manager</literal></entry>
          <entry>Waiting to add or examine predicate lock information.</entry>
         </row>
+        <row>
+         <entry><literal>serializable_xact</literal></entry>
+         <entry>Waiting to perform an operation on a serializable transaction
+         in a parallel query.</entry>
+        </row>
         <row>
          <entry><literal>parallel_query_dsa</literal></entry>
          <entry>Waiting for parallel query dynamic shared memory allocation lock.</entry>
index 1005e9fef4db4a4a57362dbdc295442335ffb18f..b0b03c54e5f2ba6b72e1dbdb7567b5d69006e0fb 100644 (file)
@@ -184,13 +184,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
         using a very large number of processes.
       </para>
     </listitem>
-
-    <listitem>
-      <para>
-        The transaction isolation level is serializable.  This is
-        a limitation of the current implementation.
-      </para>
-    </listitem>
   </itemizedlist>
 
   <para>
@@ -233,16 +226,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
         that may be suboptimal when run serially.
       </para>
     </listitem>
-
-    <listitem>
-      <para>
-        The transaction isolation level is serializable.  This situation
-        does not normally arise, because parallel query plans are not
-        generated when the transaction isolation level is serializable.
-        However, it can happen if the transaction isolation level is changed to
-        serializable after the plan is generated and before it is executed.
-      </para>
-    </listitem>
   </itemizedlist>
  </sect1>
 
index 28c1aeefabb770547e9b8c3ccf2ffefca7abdbe8..363dceb5b1c21be023e59cc4accfa055c8d61a5e 100644 (file)
@@ -1265,7 +1265,7 @@ _bt_begin_parallel(BTBuildState *buildstate, bool isconcurrent, int request)
    EnterParallelMode();
    Assert(request > 0);
    pcxt = CreateParallelContext("postgres", "_bt_parallel_build_main",
-                                request, true);
+                                request);
    scantuplesortstates = leaderparticipates ? request + 1 : request;
 
    /*
index ce2b61631db56e481a2d819ec94ad5f05a7ef1b5..55d129a64f7f344b3136e79609377974e073006a 100644 (file)
@@ -31,6 +31,7 @@
 #include "optimizer/optimizer.h"
 #include "pgstat.h"
 #include "storage/ipc.h"
+#include "storage/predicate.h"
 #include "storage/sinval.h"
 #include "storage/spin.h"
 #include "tcop/tcopprot.h"
@@ -91,6 +92,7 @@ typedef struct FixedParallelState
    BackendId   parallel_master_backend_id;
    TimestampTz xact_ts;
    TimestampTz stmt_ts;
+   SerializableXactHandle serializable_xact_handle;
 
    /* Mutex protects remaining fields. */
    slock_t     mutex;
@@ -155,7 +157,7 @@ static void ParallelWorkerShutdown(int code, Datum arg);
  */
 ParallelContext *
 CreateParallelContext(const char *library_name, const char *function_name,
-                     int nworkers, bool serializable_okay)
+                     int nworkers)
 {
    MemoryContext oldcontext;
    ParallelContext *pcxt;
@@ -166,16 +168,6 @@ CreateParallelContext(const char *library_name, const char *function_name,
    /* Number of workers should be non-negative. */
    Assert(nworkers >= 0);
 
-   /*
-    * If we are running under serializable isolation, we can't use parallel
-    * workers, at least not until somebody enhances that mechanism to be
-    * parallel-aware.  Utility statement callers may ask us to ignore this
-    * restriction because they're always able to safely ignore the fact that
-    * SIREAD locks do not work with parallelism.
-    */
-   if (IsolationIsSerializable() && !serializable_okay)
-       nworkers = 0;
-
    /* We might be running in a short-lived memory context. */
    oldcontext = MemoryContextSwitchTo(TopTransactionContext);
 
@@ -327,6 +319,7 @@ InitializeParallelDSM(ParallelContext *pcxt)
    fps->parallel_master_backend_id = MyBackendId;
    fps->xact_ts = GetCurrentTransactionStartTimestamp();
    fps->stmt_ts = GetCurrentStatementStartTimestamp();
+   fps->serializable_xact_handle = ShareSerializableXact();
    SpinLockInit(&fps->mutex);
    fps->last_xlog_end = 0;
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
@@ -1422,6 +1415,9 @@ ParallelWorkerMain(Datum main_arg)
                                        false);
    RestoreEnumBlacklist(enumblacklistspace);
 
+   /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
+   AttachSerializableXact(fps->serializable_xact_handle);
+
    /*
     * We've initialized all of our state now; nothing should change
     * hereafter.
index e93262975d3d276c4c2e227162fb912bbb0943d1..6e5891749b4a1caf9c3a0a4de349484881faad4f 100644 (file)
@@ -2024,9 +2024,12 @@ CommitTransaction(void)
    /*
     * Mark serializable transaction as complete for predicate locking
     * purposes.  This should be done as late as we can put it and still allow
-    * errors to be raised for failure patterns found at commit.
+    * errors to be raised for failure patterns found at commit.  This is not
+    * appropriate in a parallel worker however, because we aren't committing
+    * the leader's transaction and its serializable state will live on.
     */
-   PreCommit_CheckForSerializationFailure();
+   if (!is_parallel_worker)
+       PreCommit_CheckForSerializationFailure();
 
    /*
     * Insert notifications sent by NOTIFY commands into the queue.  This
index b79be91655b19404d4ba40b9ad815cb222281fa0..3d4b01cb4d69ec32223a5462558a44d6e23d503c 100644 (file)
@@ -604,7 +604,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate,
    pstmt_data = ExecSerializePlan(planstate->plan, estate);
 
    /* Create a parallel context. */
-   pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers, false);
+   pcxt = CreateParallelContext("postgres", "ParallelQueryMain", nworkers);
    pei->pcxt = pcxt;
 
    /*
index 9bb068a52e9d69da8a0187c821f4679dc2e4e745..e408e77d6fbdc8aed9775c8c7ea9931417508a35 100644 (file)
@@ -337,22 +337,13 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
     * parallel worker.  We might eventually be able to relax this
     * restriction, but for now it seems best not to have parallel workers
     * trying to create their own parallel workers.
-    *
-    * We can't use parallelism in serializable mode because the predicate
-    * locking code is not parallel-aware.  It's not catastrophic if someone
-    * tries to run a parallel plan in serializable mode; it just won't get
-    * any workers and will run serially.  But it seems like a good heuristic
-    * to assume that the same serialization level will be in effect at plan
-    * time and execution time, so don't generate a parallel plan if we're in
-    * serializable mode.
     */
    if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 &&
        IsUnderPostmaster &&
        parse->commandType == CMD_SELECT &&
        !parse->hasModifyingCTE &&
        max_parallel_workers_per_gather > 0 &&
-       !IsParallelWorker() &&
-       !IsolationIsSerializable())
+       !IsParallelWorker())
    {
        /* all the cheap tests pass, so scan the query tree */
        glob->maxParallelHazard = max_parallel_hazard(parse);
index 81dac45ae57f1e76bf556c2e43af74fa3093dc27..bc1aa88322b0c8fbf40676a8e4ce27d304ac0f02 100644 (file)
@@ -521,6 +521,7 @@ RegisterLWLockTranches(void)
    LWLockRegisterTranche(LWTRANCHE_TBM, "tbm");
    LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append");
    LWLockRegisterTranche(LWTRANCHE_PARALLEL_HASH_JOIN, "parallel_hash_join");
+   LWLockRegisterTranche(LWTRANCHE_SXACT, "serializable_xact");
 
    /* Register named tranches. */
    for (i = 0; i < NamedLWLockTrancheRequests; i++)
index 6fc11f26f0eb3daf81fc98da540c815c9d9ca3ac..92beaab5663bc4e5d0fea5340c4279ab582fe551 100644 (file)
@@ -97,7 +97,9 @@
  *     - All transactions share this single lock (with no partitioning).
  *     - There is never a need for a process other than the one running
  *         an active transaction to walk the list of locks held by that
- *         transaction.
+ *         transaction, except parallel query workers sharing the leader's
+ *         transaction.  In the parallel case, an extra per-sxact lock is
+ *         taken; see below.
  *     - It is relatively infrequent that another process needs to
  *         modify the list for a transaction, but it does happen for such
  *         things as index page splits for pages with predicate locks and
  *         than its own active transaction must acquire an exclusive
  *         lock.
  *
+ * SERIALIZABLEXACT's member 'predicateLockListLock'
+ *     - Protects the linked list of locks held by a transaction.  Only
+ *         needed for parallel mode, where multiple backends share the
+ *         same SERIALIZABLEXACT object.  Not needed if
+ *         SerializablePredicateLockListLock is held exclusively.
+ *
  * PredicateLockHashPartitionLock(hashcode)
  *     - The same lock protects a target, all locks on that target, and
  *         the linked list of locks on the target.
  *     PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
  *                              BlockNumber newblkno)
  *     TransferPredicateLocksToHeapRelation(Relation relation)
- *     ReleasePredicateLocks(bool isCommit)
+ *     ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
  *
  * conflict detection (may also trigger rollback)
  *     CheckForSerializableConflictOut(bool visible, Relation relation,
 
 #include "access/heapam.h"
 #include "access/htup_details.h"
+#include "access/parallel.h"
 #include "access/slru.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
+#define SxactIsPartiallyReleased(sxact) (((sxact)->flags & SXACT_FLAG_PARTIALLY_RELEASED) != 0)
 
 /*
  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
@@ -409,6 +419,15 @@ static HTAB *LocalPredicateLockHash = NULL;
 static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
 static bool MyXactDidWrite = false;
 
+/*
+ * The SXACT_FLAG_RO_UNSAFE optimization might lead us to release
+ * MySerializableXact early.  If that happens in a parallel query, the leader
+ * needs to defer the destruction of the SERIALIZABLEXACT until end of
+ * transaction, because the workers still have a reference to it.  In that
+ * case, the leader stores it here.
+ */
+static SERIALIZABLEXACT *SavedSerializableXact = InvalidSerializableXact;
+
 /* local functions */
 
 static SERIALIZABLEXACT *CreatePredXact(void);
@@ -465,6 +484,8 @@ static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
 static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
 static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
                                        SERIALIZABLEXACT *writer);
+static void CreateLocalPredicateLockHash(void);
+static void ReleasePredicateLocksLocal(void);
 
 
 /*------------------------------------------------------------------------*/
@@ -521,7 +542,7 @@ SerializationNeededForRead(Relation relation, Snapshot snapshot)
     */
    if (SxactIsROSafe(MySerializableXact))
    {
-       ReleasePredicateLocks(false);
+       ReleasePredicateLocks(false, true);
        return false;
    }
 
@@ -1168,6 +1189,8 @@ InitPredicateLocks(void)
        memset(PredXact->element, 0, requestSize);
        for (i = 0; i < max_table_size; i++)
        {
+           LWLockInitialize(&PredXact->element[i].sxact.predicateLockListLock,
+                            LWTRANCHE_SXACT);
            SHMQueueInsertBefore(&(PredXact->availableList),
                                 &(PredXact->element[i].link));
        }
@@ -1513,14 +1536,14 @@ GetSafeSnapshot(Snapshot origSnapshot)
        ereport(DEBUG2,
                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
                 errmsg("deferrable snapshot was unsafe; trying a new one")));
-       ReleasePredicateLocks(false);
+       ReleasePredicateLocks(false, false);
    }
 
    /*
     * Now we have a safe snapshot, so we don't need to do any further checks.
     */
    Assert(SxactIsROSafe(MySerializableXact));
-   ReleasePredicateLocks(false);
+   ReleasePredicateLocks(false, true);
 
    return snapshot;
 }
@@ -1633,6 +1656,17 @@ SetSerializableTransactionSnapshot(Snapshot snapshot,
 {
    Assert(IsolationIsSerializable());
 
+   /*
+    * If this is called by parallel.c in a parallel worker, we don't want to
+    * create a SERIALIZABLEXACT just yet because the leader's
+    * SERIALIZABLEXACT will be installed with AttachSerializableXact().  We
+    * also don't want to reject SERIALIZABLE READ ONLY DEFERRABLE in this
+    * case, because the leader has already determined that the snapshot it
+    * has passed us is safe.  So there is nothing for us to do.
+    */
+   if (IsParallelWorker())
+       return;
+
    /*
     * We do not allow SERIALIZABLE READ ONLY DEFERRABLE transactions to
     * import snapshots, since there's no way to wait for a safe snapshot when
@@ -1666,7 +1700,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
    VirtualTransactionId vxid;
    SERIALIZABLEXACT *sxact,
               *othersxact;
-   HASHCTL     hash_ctl;
 
    /* We only do this for serializable transactions.  Once. */
    Assert(MySerializableXact == InvalidSerializableXact);
@@ -1813,6 +1846,16 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
 
    LWLockRelease(SerializableXactHashLock);
 
+   CreateLocalPredicateLockHash();
+
+   return snapshot;
+}
+
+static void
+CreateLocalPredicateLockHash(void)
+{
+   HASHCTL     hash_ctl;
+
    /* Initialize the backend-local hash table of parent locks */
    Assert(LocalPredicateLockHash == NULL);
    MemSet(&hash_ctl, 0, sizeof(hash_ctl));
@@ -1822,8 +1865,6 @@ GetSerializableTransactionSnapshotInt(Snapshot snapshot,
                                         max_predicate_locks_per_xact,
                                         &hash_ctl,
                                         HASH_ELEM | HASH_BLOBS);
-
-   return snapshot;
 }
 
 /*
@@ -2078,7 +2119,9 @@ RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
  * This implementation is assuming that the usage of each target tag field
  * is uniform.  No need to make this hard if we don't have to.
  *
- * We aren't acquiring lightweight locks for the predicate lock or lock
+ * We acquire an LWLock in the case of parallel mode, because worker
+ * backends have access to the leader's SERIALIZABLEXACT.  Otherwise,
+ * we aren't acquiring LWLocks for the predicate lock or lock
  * target structures associated with this transaction unless we're going
  * to modify them, because no other process is permitted to modify our
  * locks.
@@ -2091,6 +2134,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
 
    LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
    sxact = MySerializableXact;
+   if (IsInParallelMode())
+       LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
    predlock = (PREDICATELOCK *)
        SHMQueueNext(&(sxact->predicateLocks),
                     &(sxact->predicateLocks),
@@ -2144,6 +2189,8 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
 
        predlock = nextpredlock;
    }
+   if (IsInParallelMode())
+       LWLockRelease(&sxact->predicateLockListLock);
    LWLockRelease(SerializablePredicateLockListLock);
 }
 
@@ -2342,6 +2389,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
    partitionLock = PredicateLockHashPartitionLock(targettaghash);
 
    LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+   if (IsInParallelMode())
+       LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
    LWLockAcquire(partitionLock, LW_EXCLUSIVE);
 
    /* Make sure that the target is represented. */
@@ -2379,6 +2428,8 @@ CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
    }
 
    LWLockRelease(partitionLock);
+   if (IsInParallelMode())
+       LWLockRelease(&sxact->predicateLockListLock);
    LWLockRelease(SerializablePredicateLockListLock);
 }
 
@@ -2566,7 +2617,8 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
    PREDICATELOCK *nextpredlock;
    bool        found;
 
-   Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+   Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
+                               LW_EXCLUSIVE));
    Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
 
    predlock = (PREDICATELOCK *)
@@ -2626,7 +2678,7 @@ DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
  * covers it, or if we are absolutely certain that no one will need to
  * refer to that lock in the future.
  *
- * Caller must hold SerializablePredicateLockListLock.
+ * Caller must hold SerializablePredicateLockListLock exclusively.
  */
 static bool
 TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
@@ -2641,7 +2693,8 @@ TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
    bool        found;
    bool        outOfShmem = false;
 
-   Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
+   Assert(LWLockHeldByMeInMode(SerializablePredicateLockListLock,
+                               LW_EXCLUSIVE));
 
    oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
    newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
@@ -3217,9 +3270,17 @@ SetNewSxactGlobalXmin(void)
  * If this transaction is committing and is holding any predicate locks,
  * it must be added to a list of completed serializable transactions still
  * holding locks.
+ *
+ * If isReadOnlySafe is true, then predicate locks are being released before
+ * the end of the transaction because MySerializableXact has been determined
+ * to be RO_SAFE.  In non-parallel mode we can release it completely, but it
+ * in parallel mode we partially release the SERIALIZABLEXACT and keep it
+ * around until the end of the transaction, allowing each backend to clear its
+ * MySerializableXact variable and benefit from the optimization in its own
+ * time.
  */
 void
-ReleasePredicateLocks(bool isCommit)
+ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe)
 {
    bool        needToClear;
    RWConflict  conflict,
@@ -3238,6 +3299,44 @@ ReleasePredicateLocks(bool isCommit)
     */
    bool        topLevelIsDeclaredReadOnly;
 
+   /* We can't be both committing and releasing early due to RO_SAFE. */
+   Assert(!(isCommit && isReadOnlySafe));
+
+   /* Are we at the end of a transaction, that is, a commit or abort? */
+   if (!isReadOnlySafe)
+   {
+       /*
+        * Parallel workers mustn't release predicate locks at the end of
+        * their transaction.  The leader will do that at the end of its
+        * transaction.
+        */
+       if (IsParallelWorker())
+       {
+           ReleasePredicateLocksLocal();
+           return;
+       }
+
+       /*
+        * By the time the leader in a parallel query reaches end of
+        * transaction, it has waited for all workers to exit.
+        */
+       Assert(!ParallelContextActive());
+
+       /*
+        * If the leader in a parallel query earlier stashed a partially
+        * released SERIALIZABLEXACT for final clean-up at end of transaction
+        * (because workers might still have been accessing it), then it's
+        * time to restore it.
+        */
+       if (SavedSerializableXact != InvalidSerializableXact)
+       {
+           Assert(MySerializableXact == InvalidSerializableXact);
+           MySerializableXact = SavedSerializableXact;
+           SavedSerializableXact = InvalidSerializableXact;
+           Assert(SxactIsPartiallyReleased(MySerializableXact));
+       }
+   }
+
    if (MySerializableXact == InvalidSerializableXact)
    {
        Assert(LocalPredicateLockHash == NULL);
@@ -3246,10 +3345,51 @@ ReleasePredicateLocks(bool isCommit)
 
    LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
+   /*
+    * If the transaction is committing, but it has been partially released
+    * already, then treat this as a roll back.  It was marked as rolled back.
+    */
+   if (isCommit && SxactIsPartiallyReleased(MySerializableXact))
+       isCommit = false;
+
+   /*
+    * If we're called in the middle of a transaction because we discovered
+    * that the SXACT_FLAG_RO_SAFE flag was set, then we'll partially release
+    * it (that is, release the predicate locks and conflicts, but not the
+    * SERIALIZABLEXACT itself) if we're the first backend to have noticed.
+    */
+   if (isReadOnlySafe && IsInParallelMode())
+   {
+       /*
+        * The leader needs to stash a pointer to it, so that it can
+        * completely release it at end-of-transaction.
+        */
+       if (!IsParallelWorker())
+           SavedSerializableXact = MySerializableXact;
+
+       /*
+        * The first backend to reach this condition will partially release
+        * the SERIALIZABLEXACT.  All others will just clear their
+        * backend-local state so that they stop doing SSI checks for the rest
+        * of the transaction.
+        */
+       if (SxactIsPartiallyReleased(MySerializableXact))
+       {
+           LWLockRelease(SerializableXactHashLock);
+           ReleasePredicateLocksLocal();
+           return;
+       }
+       else
+       {
+           MySerializableXact->flags |= SXACT_FLAG_PARTIALLY_RELEASED;
+           /* ... and proceed to perform the partial release below. */
+       }
+   }
    Assert(!isCommit || SxactIsPrepared(MySerializableXact));
    Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
    Assert(!SxactIsCommitted(MySerializableXact));
-   Assert(!SxactIsRolledBack(MySerializableXact));
+   Assert(SxactIsPartiallyReleased(MySerializableXact)
+          || !SxactIsRolledBack(MySerializableXact));
 
    /* may not be serializable during COMMIT/ROLLBACK PREPARED */
    Assert(MySerializableXact->pid == 0 || IsolationIsSerializable());
@@ -3273,8 +3413,8 @@ ReleasePredicateLocks(bool isCommit)
    MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
 
    /*
-    * If it's not a commit it's a rollback, and we can clear our locks
-    * immediately.
+    * If it's not a commit it's either a rollback or a read-only transaction
+    * flagged SXACT_FLAG_RO_SAFE, and we can clear our locks immediately.
     */
    if (isCommit)
    {
@@ -3298,7 +3438,8 @@ ReleasePredicateLocks(bool isCommit)
         * cleanup. This means it should not be considered when calculating
         * SxactGlobalXmin.
         */
-       MySerializableXact->flags |= SXACT_FLAG_DOOMED;
+       if (!isReadOnlySafe)
+           MySerializableXact->flags |= SXACT_FLAG_DOOMED;
        MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
 
        /*
@@ -3494,7 +3635,8 @@ ReleasePredicateLocks(bool isCommit)
     * was launched.
     */
    needToClear = false;
-   if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
+   if (!isReadOnlySafe &&
+       TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
    {
        Assert(PredXact->SxactGlobalXminCount > 0);
        if (--(PredXact->SxactGlobalXminCount) == 0)
@@ -3513,14 +3655,28 @@ ReleasePredicateLocks(bool isCommit)
        SHMQueueInsertBefore(FinishedSerializableTransactions,
                             &MySerializableXact->finishedLink);
 
+   /*
+    * If we're releasing a RO_SAFE transaction in parallel mode, we'll only
+    * partially release it.  That's necessary because other backends may have
+    * a reference to it.  The leader will release the SERIALIZABLEXACT itself
+    * at the end of the transaction after workers have stopped running.
+    */
    if (!isCommit)
-       ReleaseOneSerializableXact(MySerializableXact, false, false);
+       ReleaseOneSerializableXact(MySerializableXact,
+                                  isReadOnlySafe && IsInParallelMode(),
+                                  false);
 
    LWLockRelease(SerializableFinishedListLock);
 
    if (needToClear)
        ClearOldPredicateLocks();
 
+   ReleasePredicateLocksLocal();
+}
+
+static void
+ReleasePredicateLocksLocal(void)
+{
    MySerializableXact = InvalidSerializableXact;
    MyXactDidWrite = false;
 
@@ -3712,6 +3868,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
     * them to OldCommittedSxact if summarize is true)
     */
    LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+   if (IsInParallelMode())
+       LWLockAcquire(&sxact->predicateLockListLock, LW_EXCLUSIVE);
    predlock = (PREDICATELOCK *)
        SHMQueueNext(&(sxact->predicateLocks),
                     &(sxact->predicateLocks),
@@ -3791,6 +3949,8 @@ ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
     */
    SHMQueueInit(&sxact->predicateLocks);
 
+   if (IsInParallelMode())
+       LWLockRelease(&sxact->predicateLockListLock);
    LWLockRelease(SerializablePredicateLockListLock);
 
    sxidtag.xid = sxact->topXid;
@@ -4213,6 +4373,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
        PREDICATELOCK *rmpredlock;
 
        LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
+       if (IsInParallelMode())
+           LWLockAcquire(&MySerializableXact->predicateLockListLock, LW_EXCLUSIVE);
        LWLockAcquire(partitionLock, LW_EXCLUSIVE);
        LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
@@ -4247,6 +4409,8 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
 
        LWLockRelease(SerializableXactHashLock);
        LWLockRelease(partitionLock);
+       if (IsInParallelMode())
+           LWLockRelease(&MySerializableXact->predicateLockListLock);
        LWLockRelease(SerializablePredicateLockListLock);
 
        if (rmpredlock != NULL)
@@ -4677,6 +4841,7 @@ PreCommit_CheckForSerializationFailure(void)
    /* Check if someone else has already decided that we need to die */
    if (SxactIsDoomed(MySerializableXact))
    {
+       Assert(!SxactIsPartiallyReleased(MySerializableXact));
        LWLockRelease(SerializableXactHashLock);
        ereport(ERROR,
                (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
@@ -4795,6 +4960,13 @@ AtPrepare_PredicateLocks(void)
     */
    LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
 
+   /*
+    * No need to take sxact->predicateLockListLock in parallel mode because
+    * there cannot be any parallel workers running while we are preparing a
+    * transaction.
+    */
+   Assert(!IsParallelWorker() && !ParallelContextActive());
+
    predlock = (PREDICATELOCK *)
        SHMQueueNext(&(sxact->predicateLocks),
                     &(sxact->predicateLocks),
@@ -4867,7 +5039,7 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
    MySerializableXact = sxid->myXact;
    MyXactDidWrite = true;      /* conservatively assume that we wrote
                                 * something */
-   ReleasePredicateLocks(isCommit);
+   ReleasePredicateLocks(isCommit, false);
 }
 
 /*
@@ -5003,3 +5175,28 @@ predicatelock_twophase_recover(TransactionId xid, uint16 info,
        CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
    }
 }
+
+/*
+ * Prepare to share the current SERIALIZABLEXACT with parallel workers.
+ * Return a handle object that can be used by AttachSerializableXact() in a
+ * parallel worker.
+ */
+SerializableXactHandle
+ShareSerializableXact(void)
+{
+   return MySerializableXact;
+}
+
+/*
+ * Allow parallel workers to import the leader's SERIALIZABLEXACT.
+ */
+void
+AttachSerializableXact(SerializableXactHandle handle)
+{
+
+   Assert(MySerializableXact == InvalidSerializableXact);
+
+   MySerializableXact = (SERIALIZABLEXACT *) handle;
+   if (MySerializableXact != InvalidSerializableXact)
+       CreateLocalPredicateLockHash();
+}
index f7597b0991bb60933d4822060ed5a3496a9bb4d2..64aafef311409df29394f43104318a5d9ec06301 100644 (file)
@@ -566,7 +566,7 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
            if (owner == TopTransactionResourceOwner)
            {
                ProcReleaseLocks(isCommit);
-               ReleasePredicateLocks(isCommit);
+               ReleasePredicateLocks(isCommit, false);
            }
        }
        else
index fc220df533d0f7e6533021471d91007c3801f6ea..e650bb2eef19d350a44587635daccc046b60a065 100644 (file)
@@ -60,8 +60,7 @@ extern PGDLLIMPORT bool InitializingParallelWorker;
 #define        IsParallelWorker()      (ParallelWorkerNumber >= 0)
 
 extern ParallelContext *CreateParallelContext(const char *library_name,
-                     const char *function_name, int nworkers,
-                     bool serializable_okay);
+                     const char *function_name, int nworkers);
 extern void InitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelDSM(ParallelContext *pcxt);
 extern void LaunchParallelWorkers(ParallelContext *pcxt);
index 96c773200667e775922270dd7027e9da5b3b88da..08e0dc8144bb947d8b8f7f241b12638cc351d043 100644 (file)
@@ -219,6 +219,7 @@ typedef enum BuiltinTrancheIds
    LWTRANCHE_SHARED_TUPLESTORE,
    LWTRANCHE_TBM,
    LWTRANCHE_PARALLEL_APPEND,
+   LWTRANCHE_SXACT,
    LWTRANCHE_FIRST_USER_DEFINED
 }          BuiltinTrancheIds;
 
index 3d87a631db9e2e04d1495cc2a326147b12f0c962..23980c6ede75f8d1c00f867055dd02b5f4c8b731 100644 (file)
@@ -30,6 +30,11 @@ extern int   max_predicate_locks_per_page;
 /* Number of SLRU buffers to use for predicate locking */
 #define NUM_OLDSERXID_BUFFERS  16
 
+/*
+ * A handle used for sharing SERIALIZABLEXACT objects between the participants
+ * in a parallel query.
+ */
+typedef void *SerializableXactHandle;
 
 /*
  * function s
@@ -56,7 +61,7 @@ extern void PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snap
 extern void PredicateLockPageSplit(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
 extern void PredicateLockPageCombine(Relation relation, BlockNumber oldblkno, BlockNumber newblkno);
 extern void TransferPredicateLocksToHeapRelation(Relation relation);
-extern void ReleasePredicateLocks(bool isCommit);
+extern void ReleasePredicateLocks(bool isCommit, bool isReadOnlySafe);
 
 /* conflict detection (may also trigger rollback) */
 extern void CheckForSerializableConflictOut(bool valid, Relation relation, HeapTuple tuple,
@@ -74,4 +79,8 @@ extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
 extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
                               void *recdata, uint32 len);
 
+/* parallel query support */
+extern SerializableXactHandle ShareSerializableXact(void);
+extern void AttachSerializableXact(SerializableXactHandle handle);
+
 #endif                         /* PREDICATE_H */
index 7814d7e72191cf8b10b6ef279b5acc023ef73e6f..451ec0eecdf71a0952e0ca9e2d0104e74eaf828c 100644 (file)
@@ -15,6 +15,7 @@
 #define PREDICATE_INTERNALS_H
 
 #include "storage/lock.h"
+#include "storage/lwlock.h"
 
 /*
  * Commit number.
@@ -91,6 +92,9 @@ typedef struct SERIALIZABLEXACT
    SHM_QUEUE   finishedLink;   /* list link in
                                 * FinishedSerializableTransactions */
 
+   LWLock      predicateLockListLock;  /* protects predicateLocks in parallel
+                                        * mode */
+
    /*
     * for r/o transactions: list of concurrent r/w transactions that we could
     * potentially have conflicts with, and vice versa for r/w transactions
@@ -123,6 +127,12 @@ typedef struct SERIALIZABLEXACT
 #define SXACT_FLAG_RO_UNSAFE           0x00000100
 #define SXACT_FLAG_SUMMARY_CONFLICT_IN 0x00000200
 #define SXACT_FLAG_SUMMARY_CONFLICT_OUT 0x00000400
+/*
+ * The following flag means the transaction has been partially released
+ * already, but is being preserved because parallel workers might have a
+ * reference to it.  It'll be recycled by the leader at end-of-transaction.
+ */
+#define SXACT_FLAG_PARTIALLY_RELEASED  0x00000800
 
 /*
  * The following types are used to provide an ad hoc list for holding
diff --git a/src/test/isolation/expected/serializable-parallel-2.out b/src/test/isolation/expected/serializable-parallel-2.out
new file mode 100644 (file)
index 0000000..9a693c4
--- /dev/null
@@ -0,0 +1,44 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s1r s2r1 s1c s2r2 s2c
+step s1r: SELECT * FROM foo;
+a              
+
+1              
+2              
+3              
+4              
+5              
+6              
+7              
+8              
+9              
+10             
+step s2r1: SELECT * FROM foo;
+a              
+
+1              
+2              
+3              
+4              
+5              
+6              
+7              
+8              
+9              
+10             
+step s1c: COMMIT;
+step s2r2: SELECT * FROM foo;
+a              
+
+1              
+2              
+3              
+4              
+5              
+6              
+7              
+8              
+9              
+10             
+step s2c: COMMIT;
diff --git a/src/test/isolation/expected/serializable-parallel.out b/src/test/isolation/expected/serializable-parallel.out
new file mode 100644 (file)
index 0000000..f43aa6a
--- /dev/null
@@ -0,0 +1,44 @@
+Parsed test spec with 3 sessions
+
+starting permutation: s2rx s2ry s1ry s1wy s1c s2wx s2c s3c
+step s2rx: SELECT balance FROM bank_account WHERE id = 'X';
+balance        
+
+0              
+step s2ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance        
+
+0              
+step s1ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance        
+
+0              
+step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y';
+step s1c: COMMIT;
+step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X';
+step s2c: COMMIT;
+step s3c: COMMIT;
+
+starting permutation: s2rx s2ry s1ry s1wy s1c s3r s3c s2wx
+step s2rx: SELECT balance FROM bank_account WHERE id = 'X';
+balance        
+
+0              
+step s2ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance        
+
+0              
+step s1ry: SELECT balance FROM bank_account WHERE id = 'Y';
+balance        
+
+0              
+step s1wy: UPDATE bank_account SET balance = 20 WHERE id = 'Y';
+step s1c: COMMIT;
+step s3r: SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id;
+id             balance        
+
+X              0              
+Y              20             
+step s3c: COMMIT;
+step s2wx: UPDATE bank_account SET balance = -11 WHERE id = 'X';
+ERROR:  could not serialize access due to read/write dependencies among transactions
index 91d9d90135b512bd579dff1f7e5a98414d5f4c17..70d47b3e687b81def01a70e10d18dc58d7362b74 100644 (file)
@@ -78,3 +78,5 @@ test: partition-key-update-3
 test: partition-key-update-4
 test: plpgsql-toast
 test: truncate-conflict
+test: serializable-parallel
+test: serializable-parallel-2
diff --git a/src/test/isolation/specs/serializable-parallel-2.spec b/src/test/isolation/specs/serializable-parallel-2.spec
new file mode 100644 (file)
index 0000000..7f90f75
--- /dev/null
@@ -0,0 +1,30 @@
+# Exercise the case where a read-only serializable transaction has
+# SXACT_FLAG_RO_SAFE set in a parallel query.
+
+setup
+{
+   CREATE TABLE foo AS SELECT generate_series(1, 10)::int a;
+   ALTER TABLE foo SET (parallel_workers = 2);
+}
+
+teardown
+{
+   DROP TABLE foo;
+}
+
+session "s1"
+setup      { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; }
+step "s1r" { SELECT * FROM foo; }
+step "s1c"     { COMMIT; }
+
+session "s2"
+setup      {
+             BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE READ ONLY;
+             SET parallel_setup_cost = 0;
+             SET parallel_tuple_cost = 0;
+           }
+step "s2r1"    { SELECT * FROM foo; }
+step "s2r2"    { SELECT * FROM foo; }
+step "s2c" { COMMIT; }
+
+permutation "s1r" "s2r1" "s1c" "s2r2" "s2c"
diff --git a/src/test/isolation/specs/serializable-parallel.spec b/src/test/isolation/specs/serializable-parallel.spec
new file mode 100644 (file)
index 0000000..a4f488a
--- /dev/null
@@ -0,0 +1,47 @@
+# The example from the paper "A read-only transaction anomaly under snapshot
+# isolation"[1].
+#
+# Here we test that serializable snapshot isolation (SERIALIZABLE) doesn't
+# suffer from the anomaly, because s2 is aborted upon detection of a cycle.
+# In this case the read only query s3 happens to be running in a parallel
+# worker.
+#
+# [1] http://www.cs.umb.edu/~poneil/ROAnom.pdf
+
+setup
+{
+   CREATE TABLE bank_account (id TEXT PRIMARY KEY, balance DECIMAL NOT NULL);
+   INSERT INTO bank_account (id, balance) VALUES ('X', 0), ('Y', 0);
+}
+
+teardown
+{
+   DROP TABLE bank_account;
+}
+
+session "s1"
+setup      { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; }
+step "s1ry"    { SELECT balance FROM bank_account WHERE id = 'Y'; }
+step "s1wy"    { UPDATE bank_account SET balance = 20 WHERE id = 'Y'; }
+step "s1c"     { COMMIT; }
+
+session "s2"
+setup      { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; }
+step "s2rx"    { SELECT balance FROM bank_account WHERE id = 'X'; }
+step "s2ry"    { SELECT balance FROM bank_account WHERE id = 'Y'; }
+step "s2wx"    { UPDATE bank_account SET balance = -11 WHERE id = 'X'; }
+step "s2c" { COMMIT; }
+
+session "s3"
+setup      {
+             BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE;
+             SET force_parallel_mode = on;
+           }
+step "s3r" { SELECT id, balance FROM bank_account WHERE id IN ('X', 'Y') ORDER BY id; }
+step "s3c" { COMMIT; }
+
+# without s3, s1 and s2 commit
+permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s2wx" "s2c" "s3c"
+
+# once s3 observes the data committed by s1, a cycle is created and s2 aborts
+permutation "s2rx" "s2ry" "s1ry" "s1wy" "s1c" "s3r" "s3c" "s2wx"