<entry><literal>WALWrite</literal></entry>
<entry>Waiting for a write to a WAL file.</entry>
</row>
+ <row>
+ <entry><literal>LogicalChangesRead</literal></entry>
+ <entry>Waiting for a read from a logical changes file.</entry>
+ </row>
+ <row>
+ <entry><literal>LogicalChangesWrite</literal></entry>
+ <entry>Waiting for a write to a logical changes file.</entry>
+ </row>
+ <row>
+ <entry><literal>LogicalSubxactRead</literal></entry>
+ <entry>Waiting for a read from a logical subxact file.</entry>
+ </row>
+ <row>
+ <entry><literal>LogicalSubxactWrite</literal></entry>
+ <entry>Waiting for a write to a logical subxact file.</entry>
+ </row>
</tbody>
</tgroup>
</table>
<xref linkend="sql-createsubscription"/>. See there for more
information. The parameters that can be altered
are <literal>slot_name</literal>,
- <literal>synchronous_commit</literal>, and
- <literal>binary</literal>.
+ <literal>synchronous_commit</literal>,
+ <literal>binary</literal>, and
+ <literal></literal>.
</para>
</listitem>
</varlistentry>
</para>
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal></literal> (<type>boolean</type>)</term>
+ <listitem>
+ <para>
+ Specifies whether of in-progress transactions should
+ be enabled for this subscription. By default, all transactions
+ are fully decoded on the publisher, and only then sent to the
+ subscriber as a whole.
+ </para>
+ </listitem>
+ </varlistentry>
</variablelist></para>
</listitem>
</varlistentry>
sub->owner = subform->subowner;
sub->enabled = subform->subenabled;
sub->binary = subform->subbinary;
+ sub->stream = subform->substream;
/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
-- All columns of pg_subscription except subconninfo are readable.
REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
+GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subslotname, subpublications)
ON pg_subscription TO public;
bool *copy_data,
char **synchronous_commit,
bool *refresh,
- bool *binary_given, bool *binary)
+ bool *binary_given, bool *binary,
+ bool *_given, bool *)
{
ListCell *lc;
bool connect_given = false;
*binary_given = false;
*binary = false;
}
+ if ()
+ {
+ *_given = false;
+ * = false;
+ }
/* Parse options */
foreach(lc, options)
*binary_given = true;
*binary = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "") == 0 && )
+ {
+ if (*_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+
+ *_given = true;
+ * = defGetBoolean(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
bool enabled_given;
bool enabled;
bool copy_data;
+ bool ;
+ bool _given;
char *synchronous_commit;
char *conninfo;
char *slotname;
©_data,
&synchronous_commit,
NULL, /* no "refresh" */
- &binary_given, &binary);
+ &binary_given, &binary,
+ &_given, &);
/*
* Since creating a replication slot is not transactional, rolling back
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
+ values[Anum_pg_subscription_substream - 1] = BoolGetDatum();
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (slotname)
char *synchronous_commit;
bool binary_given;
bool binary;
+ bool _given;
+ bool ;
parse_subscription_options(stmt->options,
NULL, /* no "connect" */
NULL, /* no "copy_data" */
&synchronous_commit,
NULL, /* no "refresh" */
- &binary_given, &binary);
+ &binary_given, &binary,
+ &_given, &);
if (slotname_given)
{
replaces[Anum_pg_subscription_subbinary - 1] = true;
}
+ if (_given)
+ {
+ values[Anum_pg_subscription_substream - 1] =
+ BoolGetDatum();
+ replaces[Anum_pg_subscription_substream - 1] = true;
+ }
+
update_tuple = true;
break;
}
NULL, /* no "copy_data" */
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
- NULL, NULL); /* no "binary" */
+ NULL, NULL, /* no "binary" */
+ NULL, NULL); /* no */
Assert(enabled_given);
if (!sub->slotname && enabled)
©_data,
NULL, /* no "synchronous_commit" */
&refresh,
- NULL, NULL); /* no "binary" */
-
+ NULL, NULL, /* no "binary" */
+ NULL, NULL); /* no "" */
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
replaces[Anum_pg_subscription_subpublications - 1] = true;
©_data,
NULL, /* no "synchronous_commit" */
NULL, /* no "refresh" */
- NULL, NULL); /* no "binary" */
+ NULL, NULL, /* no "binary" */
+ NULL, NULL); /* no "" */
AlterSubscription_refresh(sub, copy_data);
case WAIT_EVENT_WAL_WRITE:
event_name = "WALWrite";
break;
+ case WAIT_EVENT_LOGICAL_CHANGES_READ:
+ event_name = "LogicalChangesRead";
+ break;
+ case WAIT_EVENT_LOGICAL_CHANGES_WRITE:
+ event_name = "LogicalChangesWrite";
+ break;
+ case WAIT_EVENT_LOGICAL_SUBXACT_READ:
+ event_name = "LogicalSubxactRead";
+ break;
+ case WAIT_EVENT_LOGICAL_SUBXACT_WRITE:
+ event_name = "LogicalSubxactWrite";
+ break;
/* no default case, so that compiler will warn */
}
appendStringInfo(&cmd, "proto_version '%u'",
options->proto.logical.proto_version);
+ if (options->proto.logical. &&
+ PQserverVersion(conn->streamConn) >= 140000)
+ appendStringInfo(&cmd, ", 'on'");
+
pubnames = options->proto.logical.publication_names;
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
if (!pubnames_str)
* Write INSERT to the output stream.
*/
void
-logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
+logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
+ HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'I'); /* action INSERT */
+ /* transaction ID (if not valid, we're not ) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
* Write UPDATE to the output stream.
*/
void
-logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
- HeapTuple newtuple, bool binary)
+logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+ HeapTuple oldtuple, HeapTuple newtuple, bool binary)
{
pq_sendbyte(out, 'U'); /* action UPDATE */
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+ /* transaction ID (if not valid, we're not ) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
* Write DELETE to the output stream.
*/
void
-logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
+logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+ HeapTuple oldtuple, bool binary)
{
Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
pq_sendbyte(out, 'D'); /* action DELETE */
+ /* transaction ID (if not valid, we're not ) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
*/
void
logicalrep_write_truncate(StringInfo out,
+ TransactionId xid,
int nrelids,
Oid relids[],
bool cascade, bool restart_seqs)
pq_sendbyte(out, 'T'); /* action TRUNCATE */
+ /* transaction ID (if not valid, we're not ) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
pq_sendint32(out, nrelids);
/* encode and send truncate flags */
* Write relation description to the output stream.
*/
void
-logicalrep_write_rel(StringInfo out, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
{
char *relname;
pq_sendbyte(out, 'R'); /* sending RELATION */
+ /* transaction ID (if not valid, we're not ) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
/* use Oid as relation identifier */
pq_sendint32(out, RelationGetRelid(rel));
* This function will always write base type info.
*/
void
-logicalrep_write_typ(StringInfo out, Oid typoid)
+logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
{
Oid basetypoid = getBaseType(typoid);
HeapTuple tup;
pq_sendbyte(out, 'Y'); /* sending TYPE */
+ /* transaction ID (if not valid, we're not ) */
+ if (TransactionIdIsValid(xid))
+ pq_sendint32(out, xid);
+
tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for type %u", basetypoid);
return nspname;
}
+
+/*
+ * Write the information for the start stream message to the output stream.
+ */
+void
+logicalrep_write_stream_start(StringInfo out,
+ TransactionId xid, bool first_segment)
+{
+ pq_sendbyte(out, 'S'); /* action STREAM START */
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* transaction ID (we're starting to stream, so must be valid) */
+ pq_sendint32(out, xid);
+
+ /* 1 if this is the first segment for this xid */
+ pq_sendbyte(out, first_segment ? 1 : 0);
+}
+
+/*
+ * Read the information about the start stream message from output stream.
+ */
+TransactionId
+logicalrep_read_stream_start(StringInfo in, bool *first_segment)
+{
+ TransactionId xid;
+
+ Assert(first_segment);
+
+ xid = pq_getmsgint(in, 4);
+ *first_segment = (pq_getmsgbyte(in) == 1);
+
+ return xid;
+}
+
+/*
+ * Write the stop stream message to the output stream.
+ */
+void
+logicalrep_write_stream_stop(StringInfo out)
+{
+ pq_sendbyte(out, 'E'); /* action STREAM END */
+}
+
+/*
+ * Write STREAM COMMIT to the output stream.
+ */
+void
+logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ uint8 flags = 0;
+
+ pq_sendbyte(out, 'c'); /* action STREAM COMMIT */
+
+ Assert(TransactionIdIsValid(txn->xid));
+
+ /* transaction ID */
+ pq_sendint32(out, txn->xid);
+
+ /* send the flags field (unused for now) */
+ pq_sendbyte(out, flags);
+
+ /* send fields */
+ pq_sendint64(out, commit_lsn);
+ pq_sendint64(out, txn->end_lsn);
+ pq_sendint64(out, txn->commit_time);
+}
+
+/*
+ * Read STREAM COMMIT from the output stream.
+ */
+TransactionId
+logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
+{
+ TransactionId xid;
+ uint8 flags;
+
+ xid = pq_getmsgint(in, 4);
+
+ /* read flags (unused for now) */
+ flags = pq_getmsgbyte(in);
+
+ if (flags != 0)
+ elog(ERROR, "unrecognized flags %u in commit message", flags);
+
+ /* read fields */
+ commit_data->commit_lsn = pq_getmsgint64(in);
+ commit_data->end_lsn = pq_getmsgint64(in);
+ commit_data->committime = pq_getmsgint64(in);
+
+ return xid;
+}
+
+/*
+ * Write STREAM ABORT to the output stream. Note that xid and subxid will be
+ * same for the top-level transaction abort.
+ */
+void
+logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
+ TransactionId subxid)
+{
+ pq_sendbyte(out, 'A'); /* action STREAM ABORT */
+
+ Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
+
+ /* transaction ID */
+ pq_sendint32(out, xid);
+ pq_sendint32(out, subxid);
+}
+
+/*
+ * Read STREAM ABORT from the output stream.
+ */
+void
+logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
+ TransactionId *subxid)
+{
+ Assert(xid && subxid);
+
+ *xid = pq_getmsgint(in, 4);
+ *subxid = pq_getmsgint(in, 4);
+}
* This module includes server facing code and shares libpqwalreceiver
* module with walreceiver for providing the libpq specific functionality.
*
+ *
+ * STREAMED TRANSACTIONS
+ * ---------------------
+ * Streamed transactions (large transactions exceeding a memory limit on the
+ * upstream) are not applied immediately, but instead, the data is written
+ * to temporary files and then applied at once when the final commit arrives.
+ *
+ * Unlike the regular (non-streamed) case, handling streamed transactions has
+ * to handle aborts of both the toplevel transaction and subtransactions. This
+ * is achieved by tracking offsets for subtransactions, which is then used
+ * to truncate the file with serialized changes.
+ *
+ * The files are placed in tmp file directory by default, and the filenames
+ * include both the XID of the toplevel transaction and OID of the
+ * subscription. This is necessary so that different workers processing a
+ * remote transaction with the same XID doesn't interfere.
+ *
+ * We use BufFiles instead of using normal temporary files because (a) the
+ * BufFile infrastructure supports temporary files that exceed the OS file size
+ * limit, (b) provides a way for automatic clean up on the error and (c) provides
+ * a way to survive these files across local transactions and allow to open and
+ * close at stream start and close. We decided to use SharedFileSet
+ * infrastructure as without that it deletes the files on the closure of the
+ * file and if we decide to keep stream files open across the start/stop stream
+ * then it will consume a lot of memory (more than 8K for each BufFile and
+ * there could be multiple such BufFiles as the subscriber could receive
+ * multiple start/stop streams for different transactions before getting the
+ * commit). Moreover, if we don't use SharedFileSet then we also need to invent
+ * a new way to pass filenames to BufFile APIs so that we are allowed to open
+ * the file we desired across multiple stream-open calls for the same
+ * transaction.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include <sys/stat.h>
+#include <unistd.h>
+
#include "access/table.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_subscription.h"
#include "catalog/pg_subscription_rel.h"
+#include "catalog/pg_tablespace.h"
#include "commands/tablecmds.h"
+#include "commands/tablespace.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/execPartition.h"
#include "replication/walreceiver.h"
#include "replication/worker_internal.h"
#include "rewrite/rewriteHandler.h"
+#include "storage/buffile.h"
#include "storage/bufmgr.h"
+#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
+#include "utils/dynahash.h"
#include "utils/datum.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
int remote_attnum;
} SlotErrCallbackArg;
+/*
+ * Stream xid hash entry. Whenever we see a new xid we create this entry in the
+ * xidhash and along with it create the file and store the fileset handle.
+ * The subxact file is created iff there is any subxact info under this xid. This
+ * entry is used on the subsequent streams for the xid to get the corresponding
+ * fileset handles, so storing them in hash makes the search faster.
+ */
+typedef struct StreamXidHash
+{
+ TransactionId xid; /* xid is the hash key and must be first */
+ SharedFileSet *stream_fileset; /* shared file set for stream data */
+ SharedFileSet *subxact_fileset; /* shared file set for subxact info */
+} StreamXidHash;
+
static MemoryContext ApplyMessageContext = NULL;
MemoryContext ApplyContext = NULL;
+/* per stream context for transactions */
+static MemoryContext LogicalContext = NULL;
+
WalReceiverConn *wrconn = NULL;
Subscription *MySubscription = NULL;
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
+/* fields valid only when processing streamed transaction */
+bool in_streamed_transaction = false;
+
+static TransactionId stream_xid = InvalidTransactionId;
+
+/*
+ * Hash table for storing the xid information along with shared file
+ * set for and subxact files.
+ */
+static HTAB *xidhash = NULL;
+
+/* BufFile handle of the current file */
+static BufFile *stream_fd = NULL;
+
+typedef struct SubXactInfo
+{
+ TransactionId xid; /* XID of the subxact */
+ int fileno; /* file number in the buffile */
+ off_t offset; /* offset in the file */
+} SubXactInfo;
+
+/* Sub-transaction data for the current transaction */
+typedef struct ApplySubXactData
+{
+ uint32 nsubxacts; /* number of sub-transactions */
+ uint32 nsubxacts_max; /* current capacity of subxacts */
+ TransactionId subxact_last; /* xid of the last sub-transaction */
+ SubXactInfo *subxacts; /* sub-xact offset in changes file */
+} ApplySubXactData;
+
+static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
+
+static void subxact_filename(char *path, Oid subid, TransactionId xid);
+static void changes_filename(char *path, Oid subid, TransactionId xid);
+
+/*
+ * Information about subtransactions of a given toplevel transaction.
+ */
+static void subxact_info_write(Oid subid, TransactionId xid);
+static void subxact_info_read(Oid subid, TransactionId xid);
+static void subxact_info_add(TransactionId xid);
+static inline void cleanup_subxact_info(void);
+
+/*
+ * Serialize and deserialize changes for a toplevel transaction.
+ */
+static void stream_cleanup_files(Oid subid, TransactionId xid);
+static void stream_open_file(Oid subid, TransactionId xid, bool first);
+static void stream_write_change(char action, StringInfo s);
+static void stream_close_file(void);
+
static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
static void store_flush_position(XLogRecPtr remote_lsn);
static void maybe_reread_subscription(void);
+/* needed because of stream_commit */
+static void apply_dis(StringInfo s);
+
static void apply_handle_insert_internal(ResultRelInfo *relinfo,
EState *estate, TupleTableSlot *remoteslot);
static void apply_handle_update_internal(ResultRelInfo *relinfo,
return true;
}
+/*
+ * Handle streamed transactions.
+ *
+ * If in mode (receiving a block of streamed transaction), we
+ * simply redirect it to a file for the proper toplevel transaction.
+ *
+ * Returns true for streamed transactions, false otherwise (regular mode).
+ */
+static bool
+handle_streamed_transaction(const char action, StringInfo s)
+{
+ TransactionId xid;
+
+ /* not in mode */
+ if (!in_streamed_transaction)
+ return false;
+
+ Assert(stream_fd != NULL);
+ Assert(TransactionIdIsValid(stream_xid));
+
+ /*
+ * We should have received XID of the subxact as the first part of the
+ * message, so extract it.
+ */
+ xid = pq_getmsgint(s, 4);
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* Add the new subxact to the array (unless already there). */
+ subxact_info_add(xid);
+
+ /* write the change to the current file */
+ stream_write_change(action, s);
+
+ return true;
+}
/*
* Executor state preparation for evaluation of constraint expressions,
apply_handle_origin(StringInfo s)
{
/*
- * ORIGIN message can only come inside remote transaction and before any
- * actual writes.
+ * ORIGIN message can only come inside transaction or inside
+ * remote transaction and before any actual writes.
*/
- if (!in_remote_transaction ||
- (IsTransactionState() && !am_tablesync_worker()))
+ if (!in_streamed_transaction &&
+ (!in_remote_transaction ||
+ (IsTransactionState() && !am_tablesync_worker())))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg("ORIGIN message sent out of order")));
}
+/*
+ * Handle STREAM START message.
+ */
+static void
+apply_handle_stream_start(StringInfo s)
+{
+ bool first_segment;
+ HASHCTL hash_ctl;
+
+ Assert(!in_streamed_transaction);
+
+ /*
+ * Start a transaction on stream start, this transaction will be committed
+ * on the stream stop. We need the transaction for handling the buffile,
+ * used for serializing the data and subxact info.
+ */
+ ensure_transaction();
+
+ /* notify handle methods we're processing a remote transaction */
+ in_streamed_transaction = true;
+
+ /* extract XID of the top-level transaction */
+ stream_xid = logicalrep_read_stream_start(s, &first_segment);
+
+ /*
+ * Initialize the xidhash table if we haven't yet. This will be used for
+ * the entire duration of the apply worker so create it in permanent
+ * context.
+ */
+ if (xidhash == NULL)
+ {
+ hash_ctl.keysize = sizeof(TransactionId);
+ hash_ctl.entrysize = sizeof(StreamXidHash);
+ hash_ctl.hcxt = ApplyContext;
+ xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
+ HASH_ELEM | HASH_CONTEXT);
+ }
+
+ /* open the spool file for this transaction */
+ stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+
+ /* if this is not the first segment, open existing subxact file */
+ if (!first_segment)
+ subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
+
+ pgstat_report_activity(STATE_RUNNING, NULL);
+}
+
+/*
+ * Handle STREAM STOP message.
+ */
+static void
+apply_handle_stream_stop(StringInfo s)
+{
+ Assert(in_streamed_transaction);
+
+ /*
+ * Close the file with serialized changes, and serialize information about
+ * subxacts for the toplevel transaction.
+ */
+ subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
+ stream_close_file();
+
+ /* We must be in a valid transaction state */
+ Assert(IsTransactionState());
+
+ /* Commit the per-stream transaction */
+ CommitTransactionCommand();
+
+ in_streamed_transaction = false;
+
+ /* Reset per-stream context */
+ MemoryContextReset(LogicalContext);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle STREAM abort message.
+ */
+static void
+apply_handle_stream_abort(StringInfo s)
+{
+ TransactionId xid;
+ TransactionId subxid;
+
+ Assert(!in_streamed_transaction);
+
+ logicalrep_read_stream_abort(s, &xid, &subxid);
+
+ /*
+ * If the two XIDs are the same, it's in fact abort of toplevel xact, so
+ * just delete the files with serialized info.
+ */
+ if (xid == subxid)
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+ else
+ {
+ /*
+ * OK, so it's a subxact. We need to read the subxact file for the
+ * toplevel transaction, determine the offset tracked for the subxact,
+ * and truncate the file with changes. We also remove the subxacts
+ * with higher offsets (or rather higher XIDs).
+ *
+ * We intentionally scan the array from the tail, because we're likely
+ * aborting a change for the most recent subtransactions.
+ *
+ * We can't use the binary search here as subxact XIDs won't
+ * necessarily arrive in sorted order, consider the case where we have
+ * released the savepoint for multiple subtransactions and then
+ * performed rollback to savepoint for one of the earlier
+ * sub-transaction.
+ */
+
+ int64 i;
+ int64 subidx;
+ BufFile *fd;
+ bool found = false;
+ char path[MAXPGPATH];
+ StreamXidHash *ent;
+
+ subidx = -1;
+ ensure_transaction();
+ subxact_info_read(MyLogicalRepWorker->subid, xid);
+
+ for (i = subxact_data.nsubxacts; i > 0; i--)
+ {
+ if (subxact_data.subxacts[i - 1].xid == subxid)
+ {
+ subidx = (i - 1);
+ found = true;
+ break;
+ }
+ }
+
+ /*
+ * If it's an empty sub-transaction then we will not find the subxid
+ * here so just cleanup the subxact info and return.
+ */
+ if (!found)
+ {
+ /* Cleanup the subxact info */
+ cleanup_subxact_info();
+ CommitTransactionCommand();
+ return;
+ }
+
+ Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
+
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ &found);
+ Assert(found);
+
+ /* open the changes file */
+ changes_filename(path, MyLogicalRepWorker->subid, xid);
+ fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+
+ /* OK, truncate the file at the right offset */
+ BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
+ subxact_data.subxacts[subidx].offset);
+ BufFileClose(fd);
+
+ /* discard the subxacts added later */
+ subxact_data.nsubxacts = subidx;
+
+ /* write the updated subxact list */
+ subxact_info_write(MyLogicalRepWorker->subid, xid);
+ CommitTransactionCommand();
+ }
+}
+
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+ TransactionId xid;
+ StringInfoData s2;
+ int nchanges;
+ char path[MAXPGPATH];
+ char *buffer = NULL;
+ bool found;
+ LogicalRepCommitData commit_data;
+ StreamXidHash *ent;
+ MemoryContext oldcxt;
+ BufFile *fd;
+
+ Assert(!in_streamed_transaction);
+
+ xid = logicalrep_read_stream_commit(s, &commit_data);
+
+ elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+ ensure_transaction();
+
+ /*
+ * Allocate file handle and memory required to process all the messages in
+ * TopTransactionContext to avoid them getting reset after each message is
+ * processed.
+ */
+ oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+
+ /* open the spool file for the committed transaction */
+ changes_filename(path, MyLogicalRepWorker->subid, xid);
+ elog(DEBUG1, "replaying changes from file \"%s\"", path);
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ &found);
+ Assert(found);
+ fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
+
+ buffer = palloc(BLCKSZ);
+ initStringInfo(&s2);
+
+ MemoryContextSwitchTo(oldcxt);
+
+ remote_final_lsn = commit_data.commit_lsn;
+
+ /*
+ * Make sure the handle apply_dis methods are aware we're in a remote
+ * transaction.
+ */
+ in_remote_transaction = true;
+ pgstat_report_activity(STATE_RUNNING, NULL);
+
+ /*
+ * Read the entries one by one and pass them through the same logic as in
+ * apply_dis.
+ */
+ nchanges = 0;
+ while (true)
+ {
+ int nbytes;
+ int len;
+
+ CHECK_FOR_INTERRUPTS();
+
+ /* read length of the on-disk record */
+ nbytes = BufFileRead(fd, &len, sizeof(len));
+
+ /* have we reached end of the file? */
+ if (nbytes == 0)
+ break;
+
+ /* do we have a correct length? */
+ if (nbytes != sizeof(len))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from transaction's changes file \"%s\": %m",
+ path)));
+
+ Assert(len > 0);
+
+ /* make sure we have sufficiently large buffer */
+ buffer = repalloc(buffer, len);
+
+ /* and finally read the data into the buffer */
+ if (BufFileRead(fd, buffer, len) != len)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from transaction's changes file \"%s\": %m",
+ path)));
+
+ /* copy the buffer to the stringinfo and call apply_dis */
+ resetStringInfo(&s2);
+ appendBinaryStringInfo(&s2, buffer, len);
+
+ /* Ensure we are reading the data into our memory context. */
+ oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
+
+ apply_dis(&s2);
+
+ MemoryContextReset(ApplyMessageContext);
+
+ MemoryContextSwitchTo(oldcxt);
+
+ nchanges++;
+
+ if (nchanges % 1000 == 0)
+ elog(DEBUG1, "replayed %d changes from file '%s'",
+ nchanges, path);
+ }
+
+ BufFileClose(fd);
+
+ /*
+ * Update origin state so we can restart from correct position
+ * in case of crash.
+ */
+ replorigin_session_origin_lsn = commit_data.end_lsn;
+ replorigin_session_origin_timestamp = commit_data.committime;
+
+ pfree(buffer);
+ pfree(s2.data);
+
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ store_flush_position(commit_data.end_lsn);
+
+ elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
+ nchanges, path);
+
+ in_remote_transaction = false;
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(commit_data.end_lsn);
+
+ /* unlink the files with serialized changes and subxact info */
+ stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
/*
* Handle RELATION message.
*
{
LogicalRepRelation *rel;
+ if (handle_streamed_transaction('R', s))
+ return;
+
rel = logicalrep_read_rel(s);
logicalrep_relmap_update(rel);
}
{
LogicalRepTyp typ;
+ if (handle_streamed_transaction('Y', s))
+ return;
+
logicalrep_read_typ(s, &typ);
logicalrep_typmap_update(&typ);
}
TupleTableSlot *remoteslot;
MemoryContext oldctx;
+ if (handle_streamed_transaction('I', s))
+ return;
+
ensure_transaction();
relid = logicalrep_read_insert(s, &newtup);
RangeTblEntry *target_rte;
MemoryContext oldctx;
+ if (handle_streamed_transaction('U', s))
+ return;
+
ensure_transaction();
relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
TupleTableSlot *remoteslot;
MemoryContext oldctx;
+ if (handle_streamed_transaction('D', s))
+ return;
+
ensure_transaction();
relid = logicalrep_read_delete(s, &oldtup);
List *relids_logged = NIL;
ListCell *lc;
+ if (handle_streamed_transaction('T', s))
+ return;
+
ensure_transaction();
remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
case 'O':
apply_handle_origin(s);
break;
+ /* STREAM START */
+ case 'S':
+ apply_handle_stream_start(s);
+ break;
+ /* STREAM END */
+ case 'E':
+ apply_handle_stream_stop(s);
+ break;
+ /* STREAM ABORT */
+ case 'A':
+ apply_handle_stream_abort(s);
+ break;
+ /* STREAM COMMIT */
+ case 'c':
+ apply_handle_stream_commit(s);
+ break;
default:
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
"ApplyMessageContext",
ALLOCSET_DEFAULT_SIZES);
+ /*
+ * This memory context is used for per-stream data when the mode
+ * is enabled. This context is reset on each stream stop.
+ */
+ LogicalContext = AllocSetContextCreate(ApplyContext,
+ "LogicalContext",
+ ALLOCSET_DEFAULT_SIZES);
+
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
/* confirm all writes so far */
send_feedback(last_received, false, false);
- if (!in_remote_transaction)
+ if (!in_remote_transaction && !in_streamed_transaction)
{
/*
* If we didn't get any transactions for a while there might be
strcmp(newsub->name, MySubscription->name) != 0 ||
strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
newsub->binary != MySubscription->binary ||
+ newsub->stream != MySubscription->stream ||
!equal(newsub->publications, MySubscription->publications))
{
ereport(LOG,
MySubscriptionValid = false;
}
+/*
+ * subxact_info_write
+ * Store information about subxacts for a toplevel transaction.
+ *
+ * For each subxact we store offset of it's first change in the main file.
+ * The file is always over-written as a whole.
+ *
+ * XXX We should only store subxacts that were not aborted yet.
+ */
+static void
+subxact_info_write(Oid subid, TransactionId xid)
+{
+ char path[MAXPGPATH];
+ bool found;
+ Size len;
+ StreamXidHash *ent;
+ BufFile *fd;
+
+ Assert(TransactionIdIsValid(xid));
+
+ /* find the xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ &found);
+ /* we must found the entry for its top transaction by this time */
+ Assert(found);
+
+ /*
+ * If there is no subtransaction then nothing to do, but if already have
+ * subxact file then delete that.
+ */
+ if (subxact_data.nsubxacts == 0)
+ {
+ if (ent->subxact_fileset)
+ {
+ cleanup_subxact_info();
+ SharedFileSetDeleteAll(ent->subxact_fileset);
+ pfree(ent->subxact_fileset);
+ ent->subxact_fileset = NULL;
+ }
+ return;
+ }
+
+ subxact_filename(path, subid, xid);
+
+ /*
+ * Create the subxact file if it not already created, otherwise open the
+ * existing file.
+ */
+ if (ent->subxact_fileset == NULL)
+ {
+ MemoryContext oldctx;
+
+ /*
+ * We need to maintain shared fileset across multiple stream
+ * start/stop calls. So, need to allocate it in a persistent context.
+ */
+ oldctx = MemoryContextSwitchTo(ApplyContext);
+ ent->subxact_fileset = palloc(sizeof(SharedFileSet));
+ SharedFileSetInit(ent->subxact_fileset, NULL);
+ MemoryContextSwitchTo(oldctx);
+
+ fd = BufFileCreateShared(ent->subxact_fileset, path);
+ }
+ else
+ fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
+
+ len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
+
+ /* Write the subxact count and subxact info */
+ BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
+ BufFileWrite(fd, subxact_data.subxacts, len);
+
+ BufFileClose(fd);
+
+ /* free the memory allocated for subxact info */
+ cleanup_subxact_info();
+}
+
+/*
+ * subxact_info_read
+ * Restore information about subxacts of a streamed transaction.
+ *
+ * Read information about subxacts into the structure subxact_data that can be
+ * used later.
+ */
+static void
+subxact_info_read(Oid subid, TransactionId xid)
+{
+ char path[MAXPGPATH];
+ bool found;
+ Size len;
+ BufFile *fd;
+ StreamXidHash *ent;
+ MemoryContext oldctx;
+
+ Assert(TransactionIdIsValid(xid));
+ Assert(!subxact_data.subxacts);
+ Assert(subxact_data.nsubxacts == 0);
+ Assert(subxact_data.nsubxacts_max == 0);
+
+ /* Find the stream xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_FIND,
+ &found);
+
+ /*
+ * If subxact_fileset is not valid that mean we don't have any subxact
+ * info
+ */
+ if (ent->subxact_fileset == NULL)
+ return;
+
+ subxact_filename(path, subid, xid);
+
+ fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
+
+ /* read number of subxact items */
+ if (BufFileRead(fd, &subxact_data.nsubxacts,
+ sizeof(subxact_data.nsubxacts)) !=
+ sizeof(subxact_data.nsubxacts))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from transaction's subxact file \"%s\": %m",
+ path)));
+
+ len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
+
+ /* we keep the maximum as a power of 2 */
+ subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
+
+ /*
+ * Allocate subxact information in the logical context. We need
+ * this information during the complete stream so that we can add the sub
+ * transaction info to this. On stream stop we will flush this information
+ * to the subxact file and reset the logical context.
+ */
+ oldctx = MemoryContextSwitchTo(LogicalContext);
+ subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
+ sizeof(SubXactInfo));
+ MemoryContextSwitchTo(oldctx);
+
+ if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read from transaction's subxact file \"%s\": %m",
+ path)));
+
+ BufFileClose(fd);
+}
+
+/*
+ * subxact_info_add
+ * Add information about a subxact (offset in the main file).
+ */
+static void
+subxact_info_add(TransactionId xid)
+{
+ SubXactInfo *subxacts = subxact_data.subxacts;
+ int64 i;
+
+ /* We must have a valid top level stream xid and a stream fd. */
+ Assert(TransactionIdIsValid(stream_xid));
+ Assert(stream_fd != NULL);
+
+ /*
+ * If the XID matches the toplevel transaction, we don't want to add it.
+ */
+ if (stream_xid == xid)
+ return;
+
+ /*
+ * In most cases we're checking the same subxact as we've already seen in
+ * the last call, so make sure to ignore it (this change comes later).
+ */
+ if (subxact_data.subxact_last == xid)
+ return;
+
+ /* OK, remember we're processing this XID. */
+ subxact_data.subxact_last = xid;
+
+ /*
+ * Check if the transaction is already present in the array of subxact. We
+ * intentionally scan the array from the tail, because we're likely adding
+ * a change for the most recent subtransactions.
+ *
+ * XXX Can we rely on the subxact XIDs arriving in sorted order? That
+ * would allow us to use binary search here.
+ */
+ for (i = subxact_data.nsubxacts; i > 0; i--)
+ {
+ /* found, so we're done */
+ if (subxacts[i - 1].xid == xid)
+ return;
+ }
+
+ /* This is a new subxact, so we need to add it to the array. */
+ if (subxact_data.nsubxacts == 0)
+ {
+ MemoryContext oldctx;
+
+ subxact_data.nsubxacts_max = 128;
+
+ /*
+ * Allocate this memory for subxacts in per-stream context, see
+ * subxact_info_read.
+ */
+ oldctx = MemoryContextSwitchTo(LogicalContext);
+ subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
+ MemoryContextSwitchTo(oldctx);
+ }
+ else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
+ {
+ subxact_data.nsubxacts_max *= 2;
+ subxacts = repalloc(subxacts,
+ subxact_data.nsubxacts_max * sizeof(SubXactInfo));
+ }
+
+ subxacts[subxact_data.nsubxacts].xid = xid;
+
+ /*
+ * Get the current offset of the stream file and store it as offset of
+ * this subxact.
+ */
+ BufFileTell(stream_fd,
+ &subxacts[subxact_data.nsubxacts].fileno,
+ &subxacts[subxact_data.nsubxacts].offset);
+
+ subxact_data.nsubxacts++;
+ subxact_data.subxacts = subxacts;
+}
+
+/* format filename for file containing the info about subxacts */
+static void
+subxact_filename(char *path, Oid subid, TransactionId xid)
+{
+ snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
+}
+
+/* format filename for file containing serialized changes */
+static inline void
+changes_filename(char *path, Oid subid, TransactionId xid)
+{
+ snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
+}
+
+/*
+ * stream_cleanup_files
+ * Cleanup files for a subscription / toplevel transaction.
+ *
+ * Remove files with serialized changes and subxact info for a particular
+ * toplevel transaction. Each subscription has a separate set of files.
+ */
+static void
+stream_cleanup_files(Oid subid, TransactionId xid)
+{
+ char path[MAXPGPATH];
+ StreamXidHash *ent;
+
+ /* Remove the xid entry from the stream xid hash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_REMOVE,
+ NULL);
+ /* By this time we must have created the transaction entry */
+ Assert(ent != NULL);
+
+ /* Delete the change file and release the stream fileset memory */
+ changes_filename(path, subid, xid);
+ SharedFileSetDeleteAll(ent->stream_fileset);
+ pfree(ent->stream_fileset);
+ ent->stream_fileset = NULL;
+
+ /* Delete the subxact file and release the memory, if it exist */
+ if (ent->subxact_fileset)
+ {
+ subxact_filename(path, subid, xid);
+ SharedFileSetDeleteAll(ent->subxact_fileset);
+ pfree(ent->subxact_fileset);
+ ent->subxact_fileset = NULL;
+ }
+}
+
+/*
+ * stream_open_file
+ * Open a file that we'll use to serialize changes for a toplevel
+ * transaction.
+ *
+ * Open a file for streamed changes from a toplevel transaction identified
+ * by stream_xid (global variable). If it's the first chunk of streamed
+ * changes for this transaction, initialize the shared fileset and create the
+ * buffile, otherwise open the previously created file.
+ *
+ * This can only be called at the beginning of a "" block, i.e.
+ * between stream_start/stream_stop messages from the upstream.
+ */
+static void
+stream_open_file(Oid subid, TransactionId xid, bool first_segment)
+{
+ char path[MAXPGPATH];
+ bool found;
+ MemoryContext oldcxt;
+ StreamXidHash *ent;
+
+ Assert(in_streamed_transaction);
+ Assert(OidIsValid(subid));
+ Assert(TransactionIdIsValid(xid));
+ Assert(stream_fd == NULL);
+
+ /* create or find the xid entry in the xidhash */
+ ent = (StreamXidHash *) hash_search(xidhash,
+ (void *) &xid,
+ HASH_ENTER | HASH_FIND,
+ &found);
+ Assert(first_segment || found);
+ changes_filename(path, subid, xid);
+ elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
+
+ /*
+ * Create/open the buffiles under the logical context so that we
+ * have those files until stream stop.
+ */
+ oldcxt = MemoryContextSwitchTo(LogicalContext);
+
+ /*
+ * If this is the first streamed segment, the file must not exist, so make
+ * sure we're the ones creating it. Otherwise just open the file for
+ * writing, in append mode.
+ */
+ if (first_segment)
+ {
+ MemoryContext savectx;
+ SharedFileSet *fileset;
+
+ /*
+ * We need to maintain shared fileset across multiple stream
+ * start/stop calls. So, need to allocate it in a persistent context.
+ */
+ savectx = MemoryContextSwitchTo(ApplyContext);
+ fileset = palloc(sizeof(SharedFileSet));
+
+ SharedFileSetInit(fileset, NULL);
+ MemoryContextSwitchTo(savectx);
+
+ stream_fd = BufFileCreateShared(fileset, path);
+
+ /* Remember the fileset for the next stream of the same transaction */
+ ent->xid = xid;
+ ent->stream_fileset = fileset;
+ ent->subxact_fileset = NULL;
+ }
+ else
+ {
+ /*
+ * Open the file and seek to the end of the file because we always
+ * append the changes file.
+ */
+ stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+ BufFileSeek(stream_fd, 0, 0, SEEK_END);
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * stream_close_file
+ * Close the currently open file with streamed changes.
+ *
+ * This can only be called at the end of a block, i.e. at stream_stop
+ * message from the upstream.
+ */
+static void
+stream_close_file(void)
+{
+ Assert(in_streamed_transaction);
+ Assert(TransactionIdIsValid(stream_xid));
+ Assert(stream_fd != NULL);
+
+ BufFileClose(stream_fd);
+
+ stream_xid = InvalidTransactionId;
+ stream_fd = NULL;
+}
+
+/*
+ * stream_write_change
+ * Serialize a change to a file for the current toplevel transaction.
+ *
+ * The change is serialized in a simple format, with length (not including
+ * the length), action code (identifying the message type) and message
+ * contents (without the subxact TransactionId value).
+ */
+static void
+stream_write_change(char action, StringInfo s)
+{
+ int len;
+
+ Assert(in_streamed_transaction);
+ Assert(TransactionIdIsValid(stream_xid));
+ Assert(stream_fd != NULL);
+
+ /* total on-disk size, including the action type character */
+ len = (s->len - s->cursor) + sizeof(char);
+
+ /* first write the size */
+ BufFileWrite(stream_fd, &len, sizeof(len));
+
+ /* then the action */
+ BufFileWrite(stream_fd, &action, sizeof(action));
+
+ /* and finally the remaining part of the buffer (after the XID) */
+ len = (s->len - s->cursor);
+
+ BufFileWrite(stream_fd, &s->data[s->cursor], len);
+}
+
+/*
+ * Cleanup the memory for subxacts and reset the related variables.
+ */
+static inline void
+cleanup_subxact_info()
+{
+ if (subxact_data.subxacts)
+ pfree(subxact_data.subxacts);
+
+ subxact_data.subxacts = NULL;
+ subxact_data.subxact_last = InvalidTransactionId;
+ subxact_data.nsubxacts = 0;
+ subxact_data.nsubxacts_max = 0;
+}
+
/* Logical Replication Apply worker entry point */
void
ApplyWorkerMain(Datum main_arg)
options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
options.proto.logical.publication_names = MySubscription->publications;
options.proto.logical.binary = MySubscription->binary;
+ options.proto.logical. = MySubscription->stream;
/* Start normal logical replication. */
walrcv_start(wrconn, &options);
ReorderBufferChange *change);
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
RepOriginId origin_id);
+static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn);
+static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn);
+static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
static bool publications_valid;
+static bool in_;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
-static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
+static void send_relation_and_attrs(Relation relation, TransactionId xid,
+ LogicalDecodingContext *ctx);
/*
* Entry in the map used to remember which relation schemas we sent.
*
+ * The schema_sent flag determines if the current schema record was already
+ * sent to the subscriber (in which case we don't need to send it again).
+ *
+ * The schema cache on downstream is however updated only at commit time,
+ * and with streamed transactions the commit order may be different from
+ * the order the transactions are sent in. Also, the (sub) transactions
+ * might get aborted so we need to send the schema for each (sub) transaction
+ * so that we don't loose the schema information on abort. For handling this,
+ * we maintain the list of xids (streamed_txns) for those we have already sent
+ * the schema.
+ *
* For partitions, 'pubactions' considers not only the table's own
* publications, but also those of all of its ancestors.
*/
* have been sent for this to be true.
*/
bool schema_sent;
+ List *streamed_txns; /* streamed toplevel transactions with this
+ * schema */
bool replicate_valid;
PublicationActions pubactions;
static HTAB *RelationSyncCache = NULL;
static void init_rel_sync_cache(MemoryContext decoding_context);
+static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
uint32 hashvalue);
+static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+ TransactionId xid);
+static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+ TransactionId xid);
/*
* Specify output plugin callbacks
cb->commit_cb = pgoutput_commit_txn;
cb->filter_by_origin_cb = pgoutput_origin_filter;
cb->shutdown_cb = pgoutput_shutdown;
+
+ /* transaction */
+ cb->stream_start_cb = pgoutput_stream_start;
+ cb->stream_stop_cb = pgoutput_stream_stop;
+ cb->stream_abort_cb = pgoutput_stream_abort;
+ cb->stream_commit_cb = pgoutput_stream_commit;
+ cb->stream_change_cb = pgoutput_change;
+ cb->stream_truncate_cb = pgoutput_truncate;
}
static void
parse_output_parameters(List *options, uint32 *protocol_version,
- List **publication_names, bool *binary)
+ List **publication_names, bool *binary,
+ bool *enable_)
{
ListCell *lc;
bool protocol_version_given = false;
bool publication_names_given = false;
bool binary_option_given = false;
+ bool _given = false;
*binary = false;
*binary = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "") == 0)
+ {
+ if (_given)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options")));
+ _given = true;
+
+ *enable_ = defGetBoolean(defel);
+ }
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
}
pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init)
{
+ bool enable_ = false;
PGOutputData *data = palloc0(sizeof(PGOutputData));
/* Create our memory context for private allocations. */
parse_output_parameters(ctx->output_plugin_options,
&data->protocol_version,
&data->publication_names,
- &data->binary);
+ &data->binary,
+ &enable_);
/* Check if we support requested protocol */
if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("publication_names parameter missing")));
+ /*
+ * Decide whether to enable . It is disabled by default, in
+ * which case we just update the flag in decoding context. Otherwise
+ * we only allow it with sufficient version of the protocol, and when
+ * the output plugin supports it.
+ */
+ if (!enable_)
+ ctx-> = false;
+ else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support , need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
+ else if (!ctx->)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg(" requested, but not supported by output plugin")));
+
+ /* Also remember we're currently not any transaction. */
+ in_ = false;
+
/* Init publication state. */
data->publications = NIL;
publications_valid = false;
/* Initialize relation schema cache. */
init_rel_sync_cache(CacheMemoryContext);
}
+ else
+ {
+ /* Disable the during the slot initialization mode. */
+ ctx-> = false;
+ }
}
/*
*/
static void
maybe_send_schema(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, ReorderBufferChange *change,
Relation relation, RelationSyncEntry *relentry)
{
- if (relentry->schema_sent)
+ bool schema_sent;
+ TransactionId xid = InvalidTransactionId;
+ TransactionId topxid = InvalidTransactionId;
+
+ /*
+ * Remember XID of the (sub)transaction for the change. We don't care if
+ * it's top-level transaction or not (we have already sent that XID in
+ * start of the current block).
+ *
+ * If we're not in a block, just use InvalidTransactionId and
+ * the write methods will not include it.
+ */
+ if (in_)
+ xid = change->txn->xid;
+
+ if (change->txn->toptxn)
+ topxid = change->txn->toptxn->xid;
+ else
+ topxid = xid;
+
+ /*
+ * Do we need to send the schema? We do track streamed transactions
+ * separately, because those may be applied later (and the regular
+ * transactions won't see their effects until then) and in an order that
+ * we don't know at this point.
+ *
+ * XXX There is a scope of optimization here. Currently, we always send
+ * the schema first time in a transaction but we can probably
+ * avoid that by checking 'relentry->schema_sent' flag. However, before
+ * doing that we need to study its impact on the case where we have a mix
+ * of and non- transactions.
+ */
+ if (in_)
+ schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
+ else
+ schema_sent = relentry->schema_sent;
+
+ if (schema_sent)
return;
/* If needed, send the ancestor's schema first. */
relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
CreateTupleDescCopy(outdesc));
MemoryContextSwitchTo(oldctx);
- send_relation_and_attrs(ancestor, ctx);
+ send_relation_and_attrs(ancestor, xid, ctx);
RelationClose(ancestor);
}
- send_relation_and_attrs(relation, ctx);
- relentry->schema_sent = true;
+ send_relation_and_attrs(relation, xid, ctx);
+
+ if (in_)
+ set_schema_sent_in_streamed_txn(relentry, topxid);
+ else
+ relentry->schema_sent = true;
}
/*
* Sends a relation
*/
static void
-send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
+send_relation_and_attrs(Relation relation, TransactionId xid,
+ LogicalDecodingContext *ctx)
{
TupleDesc desc = RelationGetDescr(relation);
int i;
continue;
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_typ(ctx->out, att->atttypid);
+ logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
}
OutputPluginPrepareWrite(ctx, false);
- logicalrep_write_rel(ctx->out, relation);
+ logicalrep_write_rel(ctx->out, xid, relation);
OutputPluginWrite(ctx, false);
}
/*
* Sends the decoded DML over wire.
+ *
+ * This is called both in and non- modes.
*/
static void
pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
+ TransactionId xid = InvalidTransactionId;
if (!is_publishable_relation(relation))
return;
+ /*
+ * Remember the xid for the change in mode. We need to send xid
+ * with each change in the mode so that subscriber can make
+ * their association and on aborts, it can discard the corresponding
+ * changes.
+ */
+ if (in_)
+ xid = change->txn->xid;
+
relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
/* First check the table filter */
/* Avoid ing memory by using and resetting our own context */
old = MemoryContextSwitchTo(data->context);
- maybe_send_schema(ctx, relation, relentry);
+ maybe_send_schema(ctx, txn, change, relation, relentry);
/* Send the data */
switch (change->action)
}
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_insert(ctx->out, relation, tuple,
+ logicalrep_write_insert(ctx->out, xid, relation, tuple,
data->binary);
OutputPluginWrite(ctx, true);
break;
}
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_update(ctx->out, relation, oldtuple, newtuple,
- data->binary);
+ logicalrep_write_update(ctx->out, xid, relation, oldtuple,
+ newtuple, data->binary);
OutputPluginWrite(ctx, true);
break;
}
}
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_delete(ctx->out, relation, oldtuple,
+ logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
data->binary);
OutputPluginWrite(ctx, true);
}
int i;
int nrelids;
Oid *relids;
+ TransactionId xid = InvalidTransactionId;
+
+ /* Remember the xid for the change in mode. See pgoutput_change. */
+ if (in_)
+ xid = change->txn->xid;
old = MemoryContextSwitchTo(data->context);
continue;
relids[nrelids++] = relid;
- maybe_send_schema(ctx, relation, relentry);
+ maybe_send_schema(ctx, txn, change, relation, relentry);
}
if (nrelids > 0)
{
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_truncate(ctx->out,
+ xid,
nrelids,
relids,
change->data.truncate.cascade,
rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
}
+/*
+ * START STREAM callback
+ */
+static void
+pgoutput_stream_start(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+
+ /* we can't nest of transactions */
+ Assert(!in_);
+
+ /*
+ * If we already sent the first stream for this transaction then don't
+ * send the origin id in the subsequent streams.
+ */
+ if (rbtxn_is_streamed(txn))
+ send_replication_origin = false;
+
+ OutputPluginPrepareWrite(ctx, !send_replication_origin);
+ logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
+
+ if (send_replication_origin)
+ {
+ char *origin;
+
+ /* Message boundary */
+ OutputPluginWrite(ctx, false);
+ OutputPluginPrepareWrite(ctx, true);
+
+ if (replorigin_by_oid(txn->origin_id, true, &origin))
+ logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
+ }
+
+ OutputPluginWrite(ctx, true);
+
+ /* we're a chunk of transaction now */
+ in_ = true;
+}
+
+/*
+ * STOP STREAM callback
+ */
+static void
+pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn)
+{
+ /* we should be a trasanction */
+ Assert(in_);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_stop(ctx->out);
+ OutputPluginWrite(ctx, true);
+
+ /* we've stopped a transaction */
+ in_ = false;
+}
+
+/*
+ * Notify downstream to discard the streamed transaction (along with all
+ * it's subtransactions, if it's a toplevel transaction).
+ */
+static void
+pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr abort_lsn)
+{
+ ReorderBufferTXN *toptxn;
+
+ /*
+ * The abort should happen outside block, even for streamed
+ * transactions. The transaction has to be marked as streamed, though.
+ */
+ Assert(!in_);
+
+ /* determine the toplevel transaction */
+ toptxn = (txn->toptxn) ? txn->toptxn : txn;
+
+ Assert(rbtxn_is_streamed(toptxn));
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
+ OutputPluginWrite(ctx, true);
+
+ cleanup_rel_sync_cache(toptxn->xid, false);
+}
+
+/*
+ * Notify downstream to apply the streamed transaction (along with all
+ * it's subtransactions).
+ */
+static void
+pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn)
+{
+ /*
+ * The commit should happen outside block, even for streamed
+ * transactions. The transaction has to be marked as streamed, though.
+ */
+ Assert(!in_);
+ Assert(rbtxn_is_streamed(txn));
+
+ OutputPluginUpdateProgress(ctx);
+
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
+ OutputPluginWrite(ctx, true);
+
+ cleanup_rel_sync_cache(txn->xid, true);
+}
+
/*
* Initialize the relation schema sync cache for a decoding session.
*
(Datum) 0);
}
+/*
+ * We expect relatively small number of streamed transactions.
+ */
+static bool
+get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+{
+ ListCell *lc;
+
+ foreach(lc, entry->streamed_txns)
+ {
+ if (xid == (uint32) lfirst_int(lc))
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Add the xid in the rel sync entry for which we have already sent the schema
+ * of the relation.
+ */
+static void
+set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+{
+ MemoryContext oldctx;
+
+ oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+ entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
+
+ MemoryContextSwitchTo(oldctx);
+}
+
/*
* Find or create entry in the relation schema cache.
*
}
if (!found)
+ {
entry->schema_sent = false;
+ entry->streamed_txns = NULL;
+ }
return entry;
}
+/*
+ * Cleanup list of streamed transactions and update the schema_sent flag.
+ *
+ * When a streamed transaction commits or aborts, we need to remove the
+ * toplevel XID from the schema cache. If the transaction aborted, the
+ * subscriber will simply throw away the schema records we streamed, so
+ * we don't need to do anything else.
+ *
+ * If the transaction is committed, the subscriber will update the relation
+ * cache - so tweak the schema_sent flag accordingly.
+ */
+static void
+cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+{
+ HASH_SEQ_STATUS hash_seq;
+ RelationSyncEntry *entry;
+ ListCell *lc;
+
+ Assert(RelationSyncCache != NULL);
+
+ hash_seq_init(&hash_seq, RelationSyncCache);
+ while ((entry = hash_seq_search(&hash_seq)) != NULL)
+ {
+ /*
+ * We can set the schema_sent flag for an entry that has committed xid
+ * in the list as that ensures that the subscriber would have the
+ * corresponding schema and we don't need to send it unless there is
+ * any invalidation for that relation.
+ */
+ foreach(lc, entry->streamed_txns)
+ {
+ if (xid == (uint32) lfirst_int(lc))
+ {
+ if (is_commit)
+ entry->schema_sent = true;
+
+ entry->streamed_txns =
+ foreach_delete_current(entry->streamed_txns, lc);
+ break;
+ }
+ }
+ }
+}
+
/*
* Relcache invalidation callback
*/
* Reset schema sent status as the relation definition may have changed.
*/
if (entry != NULL)
+ {
entry->schema_sent = false;
+ list_free(entry->streamed_txns);
+ entry->streamed_txns = NULL;
+ }
}
/*
int i_oid;
int i_subname;
int i_rolname;
+ int i_substream;
int i_subconninfo;
int i_subslotname;
int i_subsynccommit;
if (fout->remoteVersion >= 140000)
appendPQExpBuffer(query,
- " s.subbinary\n");
+ " s.subbinary,\n");
else
appendPQExpBuffer(query,
- " false AS subbinary\n");
+ " false AS subbinary,\n");
+
+ if (fout->remoteVersion >= 140000)
+ appendPQExpBuffer(query,
+ " s.substream\n");
+ else
+ appendPQExpBuffer(query,
+ " false AS substream\n");
appendPQExpBuffer(query,
"FROM pg_subscription s\n"
i_subsynccommit = PQfnumber(res, "subsynccommit");
i_subpublications = PQfnumber(res, "subpublications");
i_subbinary = PQfnumber(res, "subbinary");
+ i_substream = PQfnumber(res, "substream");
subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
pg_strdup(PQgetvalue(res, i, i_subpublications));
subinfo[i].subbinary =
pg_strdup(PQgetvalue(res, i, i_subbinary));
+ subinfo[i].substream =
+ pg_strdup(PQgetvalue(res, i, i_substream));
if (strlen(subinfo[i].rolname) == 0)
pg_log_warning("owner of subscription \"%s\" appears to be invalid",
if (strcmp(subinfo->subbinary, "t") == 0)
appendPQExpBuffer(query, ", binary = true");
+ if (strcmp(subinfo->substream, "f") != 0)
+ appendPQExpBuffer(query, ", = on");
+
if (strcmp(subinfo->subsynccommit, "off") != 0)
appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
char *subconninfo;
char *subslotname;
char *subbinary;
+ char *substream;
char *subsynccommit;
char *subpublications;
} SubscriptionInfo;
PGresult *res;
printQueryOpt myopt = pset.popt;
static const bool translate_columns[] = {false, false, false, false,
- false, false, false};
+ false, false, false, false};
if (pset.sversion < 100000)
{
if (verbose)
{
- /* Binary mode is only supported in v14 and higher */
+ /* Binary mode and are only supported in v14 and higher */
if (pset.sversion >= 140000)
appendPQExpBuffer(&buf,
- ", subbinary AS \"%s\"\n",
- gettext_noop("Binary"));
+ ", subbinary AS \"%s\"\n"
+ ", substream AS \"%s\"\n",
+ gettext_noop("Binary"),
+ gettext_noop(""));
appendPQExpBuffer(&buf,
", subsynccommit AS \"%s\"\n"
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202009021
+#define CATALOG_VERSION_NO 202009031
#endif
bool subbinary; /* True if the subscription wants the
* publisher to send data in binary */
+ bool substream; /* Stream in-progress transactions. */
+
#ifdef CATALOG_VARLEN /* variable-length fields start here */
/* Connection string to the publisher */
text subconninfo BKI_FORCE_NOT_NULL;
bool enabled; /* Indicates if the subscription is enabled */
bool binary; /* Indicates if the subscription wants data in
* binary format */
+ bool stream; /* Allow in-progress transactions. */
char *conninfo; /* Connection string to the publisher */
char *slotname; /* Name of the replication slot */
char *synccommit; /* Synchronous commit setting for worker */
WAIT_EVENT_WAL_READ,
WAIT_EVENT_WAL_SYNC,
WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN,
- WAIT_EVENT_WAL_WRITE
+ WAIT_EVENT_WAL_WRITE,
+ WAIT_EVENT_LOGICAL_CHANGES_READ,
+ WAIT_EVENT_LOGICAL_CHANGES_WRITE,
+ WAIT_EVENT_LOGICAL_SUBXACT_READ,
+ WAIT_EVENT_LOGICAL_SUBXACT_WRITE
} WaitEventIO;
/* ----------
* we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we
* have backwards compatibility for. The client requests protocol version at
* connect time.
+ *
+ * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with
+ * support for large transactions.
*/
#define LOGICALREP_PROTO_MIN_VERSION_NUM 1
-#define LOGICALREP_PROTO_VERSION_NUM 1
+#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
+#define LOGICALREP_PROTO_VERSION_NUM 2
/*
* This struct stores a tuple received via logical replication.
extern void logicalrep_write_origin(StringInfo out, const char *origin,
XLogRecPtr origin_lsn);
extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
-extern void logicalrep_write_insert(StringInfo out, Relation rel,
- HeapTuple newtuple, bool binary);
+extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
+ Relation rel, HeapTuple newtuple,
+ bool binary);
extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
-extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
+extern void logicalrep_write_update(StringInfo out, TransactionId xid,
+ Relation rel, HeapTuple oldtuple,
HeapTuple newtuple, bool binary);
extern LogicalRepRelId logicalrep_read_update(StringInfo in,
bool *has_oldtuple, LogicalRepTupleData *oldtup,
LogicalRepTupleData *newtup);
-extern void logicalrep_write_delete(StringInfo out, Relation rel,
- HeapTuple oldtuple, bool binary);
+extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
+ Relation rel, HeapTuple oldtuple,
+ bool binary);
extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
LogicalRepTupleData *oldtup);
-extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
+extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
+ int nrelids, Oid relids[],
bool cascade, bool restart_seqs);
extern List *logicalrep_read_truncate(StringInfo in,
bool *cascade, bool *restart_seqs);
-extern void logicalrep_write_rel(StringInfo out, Relation rel);
+extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
+ Relation rel);
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
-extern void logicalrep_write_typ(StringInfo out, Oid typoid);
+extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
+ Oid typoid);
extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
+extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid,
+ bool first_segment);
+extern TransactionId logicalrep_read_stream_start(StringInfo in,
+ bool *first_segment);
+extern void logicalrep_write_stream_stop(StringInfo out);
+extern TransactionId logicalrep_read_stream_stop(StringInfo in);
+extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
+ XLogRecPtr commit_lsn);
+extern TransactionId logicalrep_read_stream_commit(StringInfo out,
+ LogicalRepCommitData *commit_data);
+extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
+ TransactionId subxid);
+extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
+ TransactionId *subxid);
#endif /* LOGICAL_PROTO_H */
uint32 proto_version; /* Logical protocol version */
List *publication_names; /* String list of publications */
bool binary; /* Ask publisher to use binary */
+ bool ; /* of large transactions */
} logical;
} proto;
} WalRcvStreamOptions;
ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
ERROR: unrecognized subscription parameter: "create_slot"
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2
(1 row)
BEGIN;
ERROR: invalid value for parameter "synchronous_commit": "foobar"
HINT: Available values: local, remote_write, remote_apply, on, off.
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
----------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | local | dbname=regress_doesnotexist2
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+---------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2
(1 row)
-- rename back to keep the rest simple
CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | t | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist
(1 row)
ALTER SUBSCRIPTION regress_testsub SET (binary = false);
ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
\dRs+
- List of subscriptions
- Name | Owner | Enabled | Publication | Binary | Synchronous commit | Conninfo
------------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f | {testpub} | f | off | dbname=regress_doesnotexist
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
+(1 row)
+
+DROP SUBSCRIPTION regress_testsub;
+-- fail - must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, = foo);
+ERROR: requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, = true);
+WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET ( = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+\dRs+
+ List of subscriptions
+ Name | Owner | Enabled | Publication | Binary | | Synchronous commit | Conninfo
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist
(1 row)
DROP SUBSCRIPTION regress_testsub;
DROP SUBSCRIPTION regress_testsub;
+-- fail - must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET ( = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+
+\dRs+
+
+DROP SUBSCRIPTION regress_testsub;
+
RESET SESSION AUTHORIZATION;
DROP ROLE regress_subscription_user;
DROP ROLE regress_subscription_user2;
--- /dev/null
+# Test of simple large transaction
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_ => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_ => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+ "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+ "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH ( = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+ or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'check extra columns contain local defaults');
+
+# Test the in binary mode
+$node_subscriber->safe_psql('postgres',
+"ALTER SUBSCRIPTION tap_sub SET (binary = on)"
+);
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(6667|6667|6667), 'check extra columns contain local defaults');
+
+# Change the local values of the extra columns on the subscriber,
+# update publisher, and check that subscriber retains the expected
+# values. This is to ensure that non- transactions behave
+# properly after a transaction.
+$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'");
+$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+ $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab");
+is($result, qq(6667|6667|6667), 'check extra columns contain locally changed data');
+
+$node_subscriber->stop;
+$node_publisher->stop;
AppendPath
AppendRelInfo
AppendState
+ApplySubXactData
Archive
ArchiveEntryPtrType
ArchiveFormat
StopWorkersData
StrategyNumber
StreamCtl
+StreamXidHash
StringInfo
StringInfoData
StripnullState
SubTransactionId
SubXactCallback
SubXactCallbackItem
+SubXactInfo
SubXactEvent
SubplanResultRelHashElem
SubqueryScan