Improve the code to decide and process the apply action.
authorAmit Kapila <[email protected]>
Tue, 17 Jan 2023 05:58:22 +0000 (11:28 +0530)
committerAmit Kapila <[email protected]>
Tue, 17 Jan 2023 05:58:22 +0000 (11:28 +0530)
The code that decides the apply action missed to handle non-transactional
messages and we didn't catch it in our testing as currently such messages
are simply ignored by the apply worker. This was introduced by changes in
commit 216a784829.

While testing this, I noticed that we forgot to reset stream_xid after
processing the stream stop message which could also result in the wrong
apply action after the fix for non-transactional messages.

In passing, change assert to elog for unexpected apply action in some of
the routines so as to catch the problems in the production environment, if
any.

Reported-by: Tomas Vondra
Author: Amit Kapila
Reviewed-by: Tomas Vondra, Sawada Masahiko, Hou Zhijie
Discussion: https://postgr.es/m/984ff689-adde-9977-affe-cd6029e850be@enterprisedb.com
Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com

src/backend/replication/logical/worker.c

index d8b8a374c6293618ccddd3a2f42c968ac5301eaf..a0084c7ef69d1cffca891eba1ec537d45d6ebac0 100644 (file)
@@ -247,8 +247,10 @@ typedef struct ApplyErrorCallbackArg
  * The action to be taken for the changes in the transaction.
  *
  * TRANS_LEADER_APPLY:
- * This action means that we are in the leader apply worker and changes of the
- * transaction are applied directly by the worker.
+ * This action means that we are in the leader apply worker or table sync
+ * worker. The changes of the transaction are either directly applied or
+ * are read from temporary files (for  transactions) and then
+ * applied by the worker.
  *
  * TRANS_LEADER_SERIALIZE:
  * This action means that we are in the leader apply worker or table sync
@@ -1004,6 +1006,9 @@ apply_handle_begin(StringInfo s)
 {
    LogicalRepBeginData begin_data;
 
+   /* There must not be an active  transaction. */
+   Assert(!TransactionIdIsValid(stream_xid));
+
    logicalrep_read_begin(s, &begin_data);
    set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
 
@@ -1058,6 +1063,9 @@ apply_handle_begin_prepare(StringInfo s)
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                 errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
 
+   /* There must not be an active  transaction. */
+   Assert(!TransactionIdIsValid(stream_xid));
+
    logicalrep_read_begin_prepare(s, &begin_data);
    set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
 
@@ -1301,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s)
 
    switch (apply_action)
    {
-       case TRANS_LEADER_SERIALIZE:
+       case TRANS_LEADER_APPLY:
 
            /*
             * The transaction has been serialized to file, so replay all the
@@ -1384,7 +1392,7 @@ apply_handle_stream_prepare(StringInfo s)
            break;
 
        default:
-           Assert(false);
+           elog(ERROR, "unexpected apply action: %d", (int) apply_action);
            break;
    }
 
@@ -1484,6 +1492,9 @@ apply_handle_stream_start(StringInfo s)
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                 errmsg_internal("duplicate STREAM START message")));
 
+   /* There must not be an active  transaction. */
+   Assert(!TransactionIdIsValid(stream_xid));
+
    /* notify handle methods we're processing a remote transaction */
    in_streamed_transaction = true;
 
@@ -1589,7 +1600,7 @@ apply_handle_stream_start(StringInfo s)
            break;
 
        default:
-           Assert(false);
+           elog(ERROR, "unexpected apply action: %d", (int) apply_action);
            break;
    }
 
@@ -1705,11 +1716,12 @@ apply_handle_stream_stop(StringInfo s)
            break;
 
        default:
-           Assert(false);
+           elog(ERROR, "unexpected apply action: %d", (int) apply_action);
            break;
    }
 
    in_streamed_transaction = false;
+   stream_xid = InvalidTransactionId;
 
    /*
     * The parallel apply worker could be in a transaction in which case we
@@ -1842,7 +1854,7 @@ apply_handle_stream_abort(StringInfo s)
 
    switch (apply_action)
    {
-       case TRANS_LEADER_SERIALIZE:
+       case TRANS_LEADER_APPLY:
 
            /*
             * We are in the leader apply worker and the transaction has been
@@ -1957,7 +1969,7 @@ apply_handle_stream_abort(StringInfo s)
            break;
 
        default:
-           Assert(false);
+           elog(ERROR, "unexpected apply action: %d", (int) apply_action);
            break;
    }
 
@@ -2154,7 +2166,7 @@ apply_handle_stream_commit(StringInfo s)
 
    switch (apply_action)
    {
-       case TRANS_LEADER_SERIALIZE:
+       case TRANS_LEADER_APPLY:
 
            /*
             * The transaction has been serialized to file, so replay all the
@@ -2226,7 +2238,7 @@ apply_handle_stream_commit(StringInfo s)
            break;
 
        default:
-           Assert(false);
+           elog(ERROR, "unexpected apply action: %d", (int) apply_action);
            break;
    }
 
@@ -4204,7 +4216,6 @@ stream_close_file(void)
 
    BufFileClose(stream_fd);
 
-   stream_xid = InvalidTransactionId;
    stream_fd = NULL;
 }
 
@@ -4977,10 +4988,12 @@ set_apply_error_context_origin(char *originname)
 }
 
 /*
- * Return the action to be taken for the given transaction. *winfo is
- * assigned to the destination parallel worker info when the leader apply
- * worker has to pass all the transaction's changes to the parallel apply
- * worker.
+ * Return the action to be taken for the given transaction. See
+ * TransApplyAction for information on each of the actions.
+ *
+ * *winfo is assigned to the destination parallel worker info when the leader
+ * apply worker has to pass all the transaction's changes to the parallel
+ * apply worker.
  */
 static TransApplyAction
 get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
@@ -4991,27 +5004,35 @@ get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
    {
        return TRANS_PARALLEL_APPLY;
    }
-   else if (in_remote_transaction)
-   {
-       return TRANS_LEADER_APPLY;
-   }
 
    /*
-    * Check if we are processing this transaction using a parallel apply
-    * worker.
+    * If we are processing this transaction using a parallel apply worker then
+    * either we send the changes to the parallel worker or if the worker is busy
+    * then serialize the changes to the file which will later be processed by
+    * the parallel worker.
     */
    *winfo = pa_find_worker(xid);
 
-   if (!*winfo)
+   if (*winfo && (*winfo)->serialize_changes)
    {
-       return TRANS_LEADER_SERIALIZE;
+       return TRANS_LEADER_PARTIAL_SERIALIZE;
    }
-   else if ((*winfo)->serialize_changes)
+   else if (*winfo)
    {
-       return TRANS_LEADER_PARTIAL_SERIALIZE;
+       return TRANS_LEADER_SEND_TO_PARALLEL;
+   }
+
+   /*
+    * If there is no parallel worker involved to process this transaction then
+    * we either directly apply the change or serialize it to a file which will
+    * later be applied when the transaction finish message is processed.
+    */
+   else if (in_streamed_transaction)
+   {
+       return TRANS_LEADER_SERIALIZE;
    }
    else
    {
-       return TRANS_LEADER_SEND_TO_PARALLEL;
+       return TRANS_LEADER_APPLY;
    }
 }