* The action to be taken for the changes in the transaction.
*
* TRANS_LEADER_APPLY:
- * This action means that we are in the leader apply worker and changes of the
- * transaction are applied directly by the worker.
+ * This action means that we are in the leader apply worker or table sync
+ * worker. The changes of the transaction are either directly applied or
+ * are read from temporary files (for transactions) and then
+ * applied by the worker.
*
* TRANS_LEADER_SERIALIZE:
* This action means that we are in the leader apply worker or table sync
{
LogicalRepBeginData begin_data;
+ /* There must not be an active transaction. */
+ Assert(!TransactionIdIsValid(stream_xid));
+
logicalrep_read_begin(s, &begin_data);
set_apply_error_context_xact(begin_data.xid, begin_data.final_lsn);
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
+ /* There must not be an active transaction. */
+ Assert(!TransactionIdIsValid(stream_xid));
+
logicalrep_read_begin_prepare(s, &begin_data);
set_apply_error_context_xact(begin_data.xid, begin_data.prepare_lsn);
switch (apply_action)
{
- case TRANS_LEADER_SERIALIZE:
+ case TRANS_LEADER_APPLY:
/*
* The transaction has been serialized to file, so replay all the
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("duplicate STREAM START message")));
+ /* There must not be an active transaction. */
+ Assert(!TransactionIdIsValid(stream_xid));
+
/* notify handle methods we're processing a remote transaction */
in_streamed_transaction = true;
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
in_streamed_transaction = false;
+ stream_xid = InvalidTransactionId;
/*
* The parallel apply worker could be in a transaction in which case we
switch (apply_action)
{
- case TRANS_LEADER_SERIALIZE:
+ case TRANS_LEADER_APPLY:
/*
* We are in the leader apply worker and the transaction has been
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
switch (apply_action)
{
- case TRANS_LEADER_SERIALIZE:
+ case TRANS_LEADER_APPLY:
/*
* The transaction has been serialized to file, so replay all the
break;
default:
- Assert(false);
+ elog(ERROR, "unexpected apply action: %d", (int) apply_action);
break;
}
BufFileClose(stream_fd);
- stream_xid = InvalidTransactionId;
stream_fd = NULL;
}
}
/*
- * Return the action to be taken for the given transaction. *winfo is
- * assigned to the destination parallel worker info when the leader apply
- * worker has to pass all the transaction's changes to the parallel apply
- * worker.
+ * Return the action to be taken for the given transaction. See
+ * TransApplyAction for information on each of the actions.
+ *
+ * *winfo is assigned to the destination parallel worker info when the leader
+ * apply worker has to pass all the transaction's changes to the parallel
+ * apply worker.
*/
static TransApplyAction
get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo)
{
return TRANS_PARALLEL_APPLY;
}
- else if (in_remote_transaction)
- {
- return TRANS_LEADER_APPLY;
- }
/*
- * Check if we are processing this transaction using a parallel apply
- * worker.
+ * If we are processing this transaction using a parallel apply worker then
+ * either we send the changes to the parallel worker or if the worker is busy
+ * then serialize the changes to the file which will later be processed by
+ * the parallel worker.
*/
*winfo = pa_find_worker(xid);
- if (!*winfo)
+ if (*winfo && (*winfo)->serialize_changes)
{
- return TRANS_LEADER_SERIALIZE;
+ return TRANS_LEADER_PARTIAL_SERIALIZE;
}
- else if ((*winfo)->serialize_changes)
+ else if (*winfo)
{
- return TRANS_LEADER_PARTIAL_SERIALIZE;
+ return TRANS_LEADER_SEND_TO_PARALLEL;
+ }
+
+ /*
+ * If there is no parallel worker involved to process this transaction then
+ * we either directly apply the change or serialize it to a file which will
+ * later be applied when the transaction finish message is processed.
+ */
+ else if (in_streamed_transaction)
+ {
+ return TRANS_LEADER_SERIALIZE;
}
else
{
- return TRANS_LEADER_SEND_TO_PARALLEL;
+ return TRANS_LEADER_APPLY;
}
}