Add support for to built-in logical replication.
authorAmit Kapila <[email protected]>
Thu, 3 Sep 2020 02:24:07 +0000 (07:54 +0530)
committerAmit Kapila <[email protected]>
Thu, 3 Sep 2020 02:24:07 +0000 (07:54 +0530)
To add support for  of in-progress transactions into the
built-in logical replication, we need to do three things:

* Extend the logical replication protocol, so identify in-progress
transactions, and allow adding additional bits of information (e.g.
XID of subtransactions).

* Modify the output plugin (pgoutput) to implement the new stream
API callbacks, by leveraging the extended replication protocol.

* Modify the replication apply worker, to properly handle streamed
in-progress transaction by spilling the data to disk and then
replaying them on commit.

We however must explicitly disable  replication during
replication slot creation, even if the plugin supports it. We
don't need to replicate the changes accumulated during this phase,
and moreover we don't have a replication connection open so we
don't have where to send the data anyway.

Author: Tomas Vondra, Dilip Kumar and Amit Kapila
Reviewed-by: Amit Kapila, Kuntal Ghosh and Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com

23 files changed:
doc/src/sgml/monitoring.sgml
doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/catalog/pg_subscription.c
src/backend/catalog/system_views.sql
src/backend/commands/subscriptioncmds.c
src/backend/postmaster/pgstat.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/logical/proto.c
src/backend/replication/logical/worker.c
src/backend/replication/pgoutput/pgoutput.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/pg_dump.h
src/bin/psql/describe.c
src/include/catalog/catversion.h
src/include/catalog/pg_subscription.h
src/include/pgstat.h
src/include/replication/logicalproto.h
src/include/replication/walreceiver.h
src/test/regress/expected/subscription.out
src/test/regress/sql/subscription.sql
src/test/subscription/t/015_stream.pl[new file with mode: 0644]
src/tools/pgindent/typedefs.list

index d973e1149aa9391e32b9badcd433bb6bb8833250..673a0e73e4534f69975ee7f8b605f1df32136487 100644 (file)
@@ -1509,6 +1509,22 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry><literal>WALWrite</literal></entry>
       <entry>Waiting for a write to a WAL file.</entry>
      </row>
+     <row>
+      <entry><literal>LogicalChangesRead</literal></entry>
+      <entry>Waiting for a read from a logical changes file.</entry>
+     </row>
+     <row>
+      <entry><literal>LogicalChangesWrite</literal></entry>
+      <entry>Waiting for a write to a logical changes file.</entry>
+     </row>
+     <row>
+      <entry><literal>LogicalSubxactRead</literal></entry>
+      <entry>Waiting for a read from a logical subxact file.</entry>
+     </row>
+     <row>
+      <entry><literal>LogicalSubxactWrite</literal></entry>
+      <entry>Waiting for a write to a logical subxact file.</entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
index 81c4e70cdf45754c19b6733bf2b62e022719dd7f..a1666b370be9d738d4f3353048963b828feb95aa 100644 (file)
@@ -165,8 +165,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <xref linkend="sql-createsubscription"/>.  See there for more
       information.  The parameters that can be altered
       are <literal>slot_name</literal>,
-      <literal>synchronous_commit</literal>, and
-      <literal>binary</literal>.
+      <literal>synchronous_commit</literal>,
+      <literal>binary</literal>, and
+      <literal></literal>.
      </para>
     </listitem>
    </varlistentry>
index cdb22c54feabd41e6e48c80fde07cd3133306f57..b7d7457d004e983cec9346c5f9c6c21d21f5c84b 100644 (file)
@@ -228,6 +228,17 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
          </para>
         </listitem>
        </varlistentry>
+       <varlistentry>
+        <term><literal></literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether  of in-progress transactions should
+          be enabled for this subscription.  By default, all transactions
+          are fully decoded on the publisher, and only then sent to the
+          subscriber as a whole.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
     </listitem>
    </varlistentry>
index 90bf5cf0c6defa5ae2004a59f5f3ce1acf23e901..311d46225adf025ccc73d8c51e284ec29858e39e 100644 (file)
@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
    sub->owner = subform->subowner;
    sub->enabled = subform->subenabled;
    sub->binary = subform->subbinary;
+   sub->stream = subform->substream;
 
    /* Get conninfo */
    datum = SysCacheGetAttr(SUBSCRIPTIONOID,
index a2d61302f9e82305fee05a21ac6110734d87746e..ed4f3f142d87d4ffc905f13ab8e3f967c22f4d7f 100644 (file)
@@ -1128,7 +1128,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
 
 -- All columns of pg_subscription except subconninfo are readable.
 REVOKE ALL ON pg_subscription FROM public;
-GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
+GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subslotname, subpublications)
     ON pg_subscription TO public;
 
 
index 40b6377a8522d29e557a0326118c323640a7c1f4..1696454c0bbb60a84a2aa0acb50c3631b0938382 100644 (file)
@@ -63,7 +63,8 @@ parse_subscription_options(List *options,
                           bool *copy_data,
                           char **synchronous_commit,
                           bool *refresh,
-                          bool *binary_given, bool *binary)
+                          bool *binary_given, bool *binary,
+                          bool *_given, bool *)
 {
    ListCell   *lc;
    bool        connect_given = false;
@@ -99,6 +100,11 @@ parse_subscription_options(List *options,
        *binary_given = false;
        *binary = false;
    }
+   if ()
+   {
+       *_given = false;
+       * = false;
+   }
 
    /* Parse options */
    foreach(lc, options)
@@ -194,6 +200,16 @@ parse_subscription_options(List *options,
            *binary_given = true;
            *binary = defGetBoolean(defel);
        }
+       else if (strcmp(defel->defname, "") == 0 && )
+       {
+           if (*_given)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+
+           *_given = true;
+           * = defGetBoolean(defel);
+       }
        else
            ereport(ERROR,
                    (errcode(ERRCODE_SYNTAX_ERROR),
@@ -337,6 +353,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
    bool        enabled_given;
    bool        enabled;
    bool        copy_data;
+   bool        ;
+   bool        _given;
    char       *synchronous_commit;
    char       *conninfo;
    char       *slotname;
@@ -360,7 +378,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
                               &copy_data,
                               &synchronous_commit,
                               NULL,    /* no "refresh" */
-                              &binary_given, &binary);
+                              &binary_given, &binary,
+                              &_given, &);
 
    /*
     * Since creating a replication slot is not transactional, rolling back
@@ -427,6 +446,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
    values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
    values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
    values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
+   values[Anum_pg_subscription_substream - 1] = BoolGetDatum();
    values[Anum_pg_subscription_subconninfo - 1] =
        CStringGetTextDatum(conninfo);
    if (slotname)
@@ -698,6 +718,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                char       *synchronous_commit;
                bool        binary_given;
                bool        binary;
+               bool        _given;
+               bool        ;
 
                parse_subscription_options(stmt->options,
                                           NULL,    /* no "connect" */
@@ -707,7 +729,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                                           NULL,    /* no "copy_data" */
                                           &synchronous_commit,
                                           NULL,    /* no "refresh" */
-                                          &binary_given, &binary);
+                                          &binary_given, &binary,
+                                          &_given, &);
 
                if (slotname_given)
                {
@@ -739,6 +762,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                    replaces[Anum_pg_subscription_subbinary - 1] = true;
                }
 
+               if (_given)
+               {
+                   values[Anum_pg_subscription_substream - 1] =
+                       BoolGetDatum();
+                   replaces[Anum_pg_subscription_substream - 1] = true;
+               }
+
                update_tuple = true;
                break;
            }
@@ -756,7 +786,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                                           NULL,    /* no "copy_data" */
                                           NULL,    /* no "synchronous_commit" */
                                           NULL,    /* no "refresh" */
-                                          NULL, NULL); /* no "binary" */
+                                          NULL, NULL,  /* no "binary" */
+                                          NULL, NULL); /* no  */
                Assert(enabled_given);
 
                if (!sub->slotname && enabled)
@@ -800,8 +831,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                                           &copy_data,
                                           NULL,    /* no "synchronous_commit" */
                                           &refresh,
-                                          NULL, NULL); /* no "binary" */
-
+                                          NULL, NULL /* no "binary" */
+                                          NULL, NULL); /* no "" */
                values[Anum_pg_subscription_subpublications - 1] =
                    publicationListToArray(stmt->publication);
                replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -843,7 +874,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
                                           &copy_data,
                                           NULL,    /* no "synchronous_commit" */
                                           NULL,    /* no "refresh" */
-                                          NULL, NULL); /* no "binary" */
+                                          NULL, NULL,  /* no "binary" */
+                                          NULL, NULL); /* no "" */
 
                AlterSubscription_refresh(sub, copy_data);
 
index 8116b236143037fafcbaa285c7af0e8f3c8af845..5f4b168fd16bf438b022b76fc12be1801566e614 100644 (file)
@@ -4141,6 +4141,18 @@ pgstat_get_wait_io(WaitEventIO w)
        case WAIT_EVENT_WAL_WRITE:
            event_name = "WALWrite";
            break;
+       case WAIT_EVENT_LOGICAL_CHANGES_READ:
+           event_name = "LogicalChangesRead";
+           break;
+       case WAIT_EVENT_LOGICAL_CHANGES_WRITE:
+           event_name = "LogicalChangesWrite";
+           break;
+       case WAIT_EVENT_LOGICAL_SUBXACT_READ:
+           event_name = "LogicalSubxactRead";
+           break;
+       case WAIT_EVENT_LOGICAL_SUBXACT_WRITE:
+           event_name = "LogicalSubxactWrite";
+           break;
 
            /* no default case, so that compiler will warn */
    }
index 8afa5a29b484cf0a094f3365b6566b456309b5c4..ad574099ff7006d707126737c70537b68a4b67e3 100644 (file)
@@ -425,6 +425,10 @@ libpqrcv_start(WalReceiverConn *conn,
        appendStringInfo(&cmd, "proto_version '%u'",
                         options->proto.logical.proto_version);
 
+       if (options->proto.logical. &&
+           PQserverVersion(conn->streamConn) >= 140000)
+           appendStringInfo(&cmd, ",  'on'");
+
        pubnames = options->proto.logical.publication_names;
        pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
        if (!pubnames_str)
index 9ff8097bf5fda2d043042ee8cf437a73b8be4f23..eb19142b48659567d91a984fc09d912d20974865 100644 (file)
@@ -138,10 +138,15 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn)
  * Write INSERT to the output stream.
  */
 void
-logicalrep_write_insert(StringInfo out, Relation rel, HeapTuple newtuple, bool binary)
+logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel,
+                       HeapTuple newtuple, bool binary)
 {
    pq_sendbyte(out, 'I');      /* action INSERT */
 
+   /* transaction ID (if not valid, we're not ) */
+   if (TransactionIdIsValid(xid))
+       pq_sendint32(out, xid);
+
    /* use Oid as relation identifier */
    pq_sendint32(out, RelationGetRelid(rel));
 
@@ -177,8 +182,8 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  * Write UPDATE to the output stream.
  */
 void
-logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
-                       HeapTuple newtuple, bool binary)
+logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+                       HeapTuple oldtuple, HeapTuple newtuple, bool binary)
 {
    pq_sendbyte(out, 'U');      /* action UPDATE */
 
@@ -186,6 +191,10 @@ logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
           rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
           rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
+   /* transaction ID (if not valid, we're not ) */
+   if (TransactionIdIsValid(xid))
+       pq_sendint32(out, xid);
+
    /* use Oid as relation identifier */
    pq_sendint32(out, RelationGetRelid(rel));
 
@@ -247,7 +256,8 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
  * Write DELETE to the output stream.
  */
 void
-logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool binary)
+logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+                       HeapTuple oldtuple, bool binary)
 {
    Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
           rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
@@ -255,6 +265,10 @@ logicalrep_write_delete(StringInfo out, Relation rel, HeapTuple oldtuple, bool b
 
    pq_sendbyte(out, 'D');      /* action DELETE */
 
+   /* transaction ID (if not valid, we're not ) */
+   if (TransactionIdIsValid(xid))
+       pq_sendint32(out, xid);
+
    /* use Oid as relation identifier */
    pq_sendint32(out, RelationGetRelid(rel));
 
@@ -295,6 +309,7 @@ logicalrep_read_delete(StringInfo in, LogicalRepTupleData *oldtup)
  */
 void
 logicalrep_write_truncate(StringInfo out,
+                         TransactionId xid,
                          int nrelids,
                          Oid relids[],
                          bool cascade, bool restart_seqs)
@@ -304,6 +319,10 @@ logicalrep_write_truncate(StringInfo out,
 
    pq_sendbyte(out, 'T');      /* action TRUNCATE */
 
+   /* transaction ID (if not valid, we're not ) */
+   if (TransactionIdIsValid(xid))
+       pq_sendint32(out, xid);
+
    pq_sendint32(out, nrelids);
 
    /* encode and send truncate flags */
@@ -346,12 +365,16 @@ logicalrep_read_truncate(StringInfo in,
  * Write relation description to the output stream.
  */
 void
-logicalrep_write_rel(StringInfo out, Relation rel)
+logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel)
 {
    char       *relname;
 
    pq_sendbyte(out, 'R');      /* sending RELATION */
 
+   /* transaction ID (if not valid, we're not ) */
+   if (TransactionIdIsValid(xid))
+       pq_sendint32(out, xid);
+
    /* use Oid as relation identifier */
    pq_sendint32(out, RelationGetRelid(rel));
 
@@ -396,7 +419,7 @@ logicalrep_read_rel(StringInfo in)
  * This function will always write base type info.
  */
 void
-logicalrep_write_typ(StringInfo out, Oid typoid)
+logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid)
 {
    Oid         basetypoid = getBaseType(typoid);
    HeapTuple   tup;
@@ -404,6 +427,10 @@ logicalrep_write_typ(StringInfo out, Oid typoid)
 
    pq_sendbyte(out, 'Y');      /* sending TYPE */
 
+   /* transaction ID (if not valid, we're not ) */
+   if (TransactionIdIsValid(xid))
+       pq_sendint32(out, xid);
+
    tup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(basetypoid));
    if (!HeapTupleIsValid(tup))
        elog(ERROR, "cache lookup failed for type %u", basetypoid);
@@ -720,3 +747,126 @@ logicalrep_read_namespace(StringInfo in)
 
    return nspname;
 }
+
+/*
+ * Write the information for the start stream message to the output stream.
+ */
+void
+logicalrep_write_stream_start(StringInfo out,
+                             TransactionId xid, bool first_segment)
+{
+   pq_sendbyte(out, 'S');      /* action STREAM START */
+
+   Assert(TransactionIdIsValid(xid));
+
+   /* transaction ID (we're starting to stream, so must be valid) */
+   pq_sendint32(out, xid);
+
+   /* 1 if this is the first  segment for this xid */
+   pq_sendbyte(out, first_segment ? 1 : 0);
+}
+
+/*
+ * Read the information about the start stream message from output stream.
+ */
+TransactionId
+logicalrep_read_stream_start(StringInfo in, bool *first_segment)
+{
+   TransactionId xid;
+
+   Assert(first_segment);
+
+   xid = pq_getmsgint(in, 4);
+   *first_segment = (pq_getmsgbyte(in) == 1);
+
+   return xid;
+}
+
+/*
+ * Write the stop stream message to the output stream.
+ */
+void
+logicalrep_write_stream_stop(StringInfo out)
+{
+   pq_sendbyte(out, 'E');      /* action STREAM END */
+}
+
+/*
+ * Write STREAM COMMIT to the output stream.
+ */
+void
+logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
+                              XLogRecPtr commit_lsn)
+{
+   uint8       flags = 0;
+
+   pq_sendbyte(out, 'c');      /* action STREAM COMMIT */
+
+   Assert(TransactionIdIsValid(txn->xid));
+
+   /* transaction ID */
+   pq_sendint32(out, txn->xid);
+
+   /* send the flags field (unused for now) */
+   pq_sendbyte(out, flags);
+
+   /* send fields */
+   pq_sendint64(out, commit_lsn);
+   pq_sendint64(out, txn->end_lsn);
+   pq_sendint64(out, txn->commit_time);
+}
+
+/*
+ * Read STREAM COMMIT from the output stream.
+ */
+TransactionId
+logicalrep_read_stream_commit(StringInfo in, LogicalRepCommitData *commit_data)
+{
+   TransactionId xid;
+   uint8       flags;
+
+   xid = pq_getmsgint(in, 4);
+
+   /* read flags (unused for now) */
+   flags = pq_getmsgbyte(in);
+
+   if (flags != 0)
+       elog(ERROR, "unrecognized flags %u in commit message", flags);
+
+   /* read fields */
+   commit_data->commit_lsn = pq_getmsgint64(in);
+   commit_data->end_lsn = pq_getmsgint64(in);
+   commit_data->committime = pq_getmsgint64(in);
+
+   return xid;
+}
+
+/*
+ * Write STREAM ABORT to the output stream. Note that xid and subxid will be
+ * same for the top-level transaction abort.
+ */
+void
+logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
+                             TransactionId subxid)
+{
+   pq_sendbyte(out, 'A');      /* action STREAM ABORT */
+
+   Assert(TransactionIdIsValid(xid) && TransactionIdIsValid(subxid));
+
+   /* transaction ID */
+   pq_sendint32(out, xid);
+   pq_sendint32(out, subxid);
+}
+
+/*
+ * Read STREAM ABORT from the output stream.
+ */
+void
+logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
+                            TransactionId *subxid)
+{
+   Assert(xid && subxid);
+
+   *xid = pq_getmsgint(in, 4);
+   *subxid = pq_getmsgint(in, 4);
+}
index b576e342cb7d57f0d5dec7d3e83bf7823d5efcef..812aca80112cef2d2dcfd67f1344dad23f303674 100644 (file)
  *   This module includes server facing code and shares libpqwalreceiver
  *   module with walreceiver for providing the libpq specific functionality.
  *
+ *
+ * STREAMED TRANSACTIONS
+ * ---------------------
+ * Streamed transactions (large transactions exceeding a memory limit on the
+ * upstream) are not applied immediately, but instead, the data is written
+ * to temporary files and then applied at once when the final commit arrives.
+ *
+ * Unlike the regular (non-streamed) case, handling streamed transactions has
+ * to handle aborts of both the toplevel transaction and subtransactions. This
+ * is achieved by tracking offsets for subtransactions, which is then used
+ * to truncate the file with serialized changes.
+ *
+ * The files are placed in tmp file directory by default, and the filenames
+ * include both the XID of the toplevel transaction and OID of the
+ * subscription. This is necessary so that different workers processing a
+ * remote transaction with the same XID doesn't interfere.
+ *
+ * We use BufFiles instead of using normal temporary files because (a) the
+ * BufFile infrastructure supports temporary files that exceed the OS file size
+ * limit, (b) provides a way for automatic clean up on the error and (c) provides
+ * a way to survive these files across local transactions and allow to open and
+ * close at stream start and close. We decided to use SharedFileSet
+ * infrastructure as without that it deletes the files on the closure of the
+ * file and if we decide to keep stream files open across the start/stop stream
+ * then it will consume a lot of memory (more than 8K for each BufFile and
+ * there could be multiple such BufFiles as the subscriber could receive
+ * multiple start/stop streams for different transactions before getting the
+ * commit). Moreover, if we don't use SharedFileSet then we also need to invent
+ * a new way to pass filenames to BufFile APIs so that we are allowed to open
+ * the file we desired across multiple stream-open calls for the same
+ * transaction.
  *-------------------------------------------------------------------------
  */
 
 #include "postgres.h"
 
+#include <sys/stat.h>
+#include <unistd.h>
+
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/xact.h"
@@ -33,7 +67,9 @@
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
+#include "catalog/pg_tablespace.h"
 #include "commands/tablecmds.h"
+#include "commands/tablespace.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/execPartition.h"
@@ -63,7 +99,9 @@
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "rewrite/rewriteHandler.h"
+#include "storage/buffile.h"
 #include "storage/bufmgr.h"
+#include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/catcache.h"
+#include "utils/dynahash.h"
 #include "utils/datum.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
@@ -99,9 +138,26 @@ typedef struct SlotErrCallbackArg
    int         remote_attnum;
 } SlotErrCallbackArg;
 
+/*
+ * Stream xid hash entry. Whenever we see a new xid we create this entry in the
+ * xidhash and along with it create the  file and store the fileset handle.
+ * The subxact file is created iff there is any subxact info under this xid. This
+ * entry is used on the subsequent streams for the xid to get the corresponding
+ * fileset handles, so storing them in hash makes the search faster.
+ */
+typedef struct StreamXidHash
+{
+   TransactionId xid;          /* xid is the hash key and must be first */
+   SharedFileSet *stream_fileset;  /* shared file set for stream data */
+   SharedFileSet *subxact_fileset; /* shared file set for subxact info */
+} StreamXidHash;
+
 static MemoryContext ApplyMessageContext = NULL;
 MemoryContext ApplyContext = NULL;
 
+/* per stream context for  transactions */
+static MemoryContext LogicalContext = NULL;
+
 WalReceiverConn *wrconn = NULL;
 
 Subscription *MySubscription = NULL;
@@ -110,12 +166,66 @@ bool      MySubscriptionValid = false;
 bool       in_remote_transaction = false;
 static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
 
+/* fields valid only when processing streamed transaction */
+bool       in_streamed_transaction = false;
+
+static TransactionId stream_xid = InvalidTransactionId;
+
+/*
+ * Hash table for storing the  xid information along with shared file
+ * set for  and subxact files.
+ */
+static HTAB *xidhash = NULL;
+
+/* BufFile handle of the current  file */
+static BufFile *stream_fd = NULL;
+
+typedef struct SubXactInfo
+{
+   TransactionId xid;          /* XID of the subxact */
+   int         fileno;         /* file number in the buffile */
+   off_t       offset;         /* offset in the file */
+} SubXactInfo;
+
+/* Sub-transaction data for the current  transaction */
+typedef struct ApplySubXactData
+{
+   uint32      nsubxacts;      /* number of sub-transactions */
+   uint32      nsubxacts_max;  /* current capacity of subxacts */
+   TransactionId subxact_last; /* xid of the last sub-transaction */
+   SubXactInfo *subxacts;      /* sub-xact offset in changes file */
+} ApplySubXactData;
+
+static ApplySubXactData subxact_data = {0, 0, InvalidTransactionId, NULL};
+
+static void subxact_filename(char *path, Oid subid, TransactionId xid);
+static void changes_filename(char *path, Oid subid, TransactionId xid);
+
+/*
+ * Information about subtransactions of a given toplevel transaction.
+ */
+static void subxact_info_write(Oid subid, TransactionId xid);
+static void subxact_info_read(Oid subid, TransactionId xid);
+static void subxact_info_add(TransactionId xid);
+static inline void cleanup_subxact_info(void);
+
+/*
+ * Serialize and deserialize changes for a toplevel transaction.
+ */
+static void stream_cleanup_files(Oid subid, TransactionId xid);
+static void stream_open_file(Oid subid, TransactionId xid, bool first);
+static void stream_write_change(char action, StringInfo s);
+static void stream_close_file(void);
+
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
 static void store_flush_position(XLogRecPtr remote_lsn);
 
 static void maybe_reread_subscription(void);
 
+/*  needed because of stream_commit */
+static void apply_dis(StringInfo s);
+
 static void apply_handle_insert_internal(ResultRelInfo *relinfo,
                                         EState *estate, TupleTableSlot *remoteslot);
 static void apply_handle_update_internal(ResultRelInfo *relinfo,
@@ -187,6 +297,42 @@ ensure_transaction(void)
    return true;
 }
 
+/*
+ * Handle streamed transactions.
+ *
+ * If in  mode (receiving a block of streamed transaction), we
+ * simply redirect it to a file for the proper toplevel transaction.
+ *
+ * Returns true for streamed transactions, false otherwise (regular mode).
+ */
+static bool
+handle_streamed_transaction(const char action, StringInfo s)
+{
+   TransactionId xid;
+
+   /* not in  mode */
+   if (!in_streamed_transaction)
+       return false;
+
+   Assert(stream_fd != NULL);
+   Assert(TransactionIdIsValid(stream_xid));
+
+   /*
+    * We should have received XID of the subxact as the first part of the
+    * message, so extract it.
+    */
+   xid = pq_getmsgint(s, 4);
+
+   Assert(TransactionIdIsValid(xid));
+
+   /* Add the new subxact to the array (unless already there). */
+   subxact_info_add(xid);
+
+   /* write the change to the current file */
+   stream_write_change(action, s);
+
+   return true;
+}
 
 /*
  * Executor state preparation for evaluation of constraint expressions,
@@ -612,16 +758,335 @@ static void
 apply_handle_origin(StringInfo s)
 {
    /*
-    * ORIGIN message can only come inside remote transaction and before any
-    * actual writes.
+    * ORIGIN message can only come inside  transaction or inside
+    * remote transaction and before any actual writes.
     */
-   if (!in_remote_transaction ||
-       (IsTransactionState() && !am_tablesync_worker()))
+   if (!in_streamed_transaction &&
+       (!in_remote_transaction ||
+        (IsTransactionState() && !am_tablesync_worker())))
        ereport(ERROR,
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
                 errmsg("ORIGIN message sent out of order")));
 }
 
+/*
+ * Handle STREAM START message.
+ */
+static void
+apply_handle_stream_start(StringInfo s)
+{
+   bool        first_segment;
+   HASHCTL     hash_ctl;
+
+   Assert(!in_streamed_transaction);
+
+   /*
+    * Start a transaction on stream start, this transaction will be committed
+    * on the stream stop. We need the transaction for handling the buffile,
+    * used for serializing the  data and subxact info.
+    */
+   ensure_transaction();
+
+   /* notify handle methods we're processing a remote transaction */
+   in_streamed_transaction = true;
+
+   /* extract XID of the top-level transaction */
+   stream_xid = logicalrep_read_stream_start(s, &first_segment);
+
+   /*
+    * Initialize the xidhash table if we haven't yet. This will be used for
+    * the entire duration of the apply worker so create it in permanent
+    * context.
+    */
+   if (xidhash == NULL)
+   {
+       hash_ctl.keysize = sizeof(TransactionId);
+       hash_ctl.entrysize = sizeof(StreamXidHash);
+       hash_ctl.hcxt = ApplyContext;
+       xidhash = hash_create("StreamXidHash", 1024, &hash_ctl,
+                             HASH_ELEM | HASH_CONTEXT);
+   }
+
+   /* open the spool file for this transaction */
+   stream_open_file(MyLogicalRepWorker->subid, stream_xid, first_segment);
+
+   /* if this is not the first segment, open existing subxact file */
+   if (!first_segment)
+       subxact_info_read(MyLogicalRepWorker->subid, stream_xid);
+
+   pgstat_report_activity(STATE_RUNNING, NULL);
+}
+
+/*
+ * Handle STREAM STOP message.
+ */
+static void
+apply_handle_stream_stop(StringInfo s)
+{
+   Assert(in_streamed_transaction);
+
+   /*
+    * Close the file with serialized changes, and serialize information about
+    * subxacts for the toplevel transaction.
+    */
+   subxact_info_write(MyLogicalRepWorker->subid, stream_xid);
+   stream_close_file();
+
+   /* We must be in a valid transaction state */
+   Assert(IsTransactionState());
+
+   /* Commit the per-stream transaction */
+   CommitTransactionCommand();
+
+   in_streamed_transaction = false;
+
+   /* Reset per-stream context */
+   MemoryContextReset(LogicalContext);
+
+   pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
+ * Handle STREAM abort message.
+ */
+static void
+apply_handle_stream_abort(StringInfo s)
+{
+   TransactionId xid;
+   TransactionId subxid;
+
+   Assert(!in_streamed_transaction);
+
+   logicalrep_read_stream_abort(s, &xid, &subxid);
+
+   /*
+    * If the two XIDs are the same, it's in fact abort of toplevel xact, so
+    * just delete the files with serialized info.
+    */
+   if (xid == subxid)
+       stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+   else
+   {
+       /*
+        * OK, so it's a subxact. We need to read the subxact file for the
+        * toplevel transaction, determine the offset tracked for the subxact,
+        * and truncate the file with changes. We also remove the subxacts
+        * with higher offsets (or rather higher XIDs).
+        *
+        * We intentionally scan the array from the tail, because we're likely
+        * aborting a change for the most recent subtransactions.
+        *
+        * We can't use the binary search here as subxact XIDs won't
+        * necessarily arrive in sorted order, consider the case where we have
+        * released the savepoint for multiple subtransactions and then
+        * performed rollback to savepoint for one of the earlier
+        * sub-transaction.
+        */
+
+       int64       i;
+       int64       subidx;
+       BufFile    *fd;
+       bool        found = false;
+       char        path[MAXPGPATH];
+       StreamXidHash *ent;
+
+       subidx = -1;
+       ensure_transaction();
+       subxact_info_read(MyLogicalRepWorker->subid, xid);
+
+       for (i = subxact_data.nsubxacts; i > 0; i--)
+       {
+           if (subxact_data.subxacts[i - 1].xid == subxid)
+           {
+               subidx = (i - 1);
+               found = true;
+               break;
+           }
+       }
+
+       /*
+        * If it's an empty sub-transaction then we will not find the subxid
+        * here so just cleanup the subxact info and return.
+        */
+       if (!found)
+       {
+           /* Cleanup the subxact info */
+           cleanup_subxact_info();
+           CommitTransactionCommand();
+           return;
+       }
+
+       Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
+
+       ent = (StreamXidHash *) hash_search(xidhash,
+                                           (void *) &xid,
+                                           HASH_FIND,
+                                           &found);
+       Assert(found);
+
+       /* open the changes file */
+       changes_filename(path, MyLogicalRepWorker->subid, xid);
+       fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+
+       /* OK, truncate the file at the right offset */
+       BufFileTruncateShared(fd, subxact_data.subxacts[subidx].fileno,
+                             subxact_data.subxacts[subidx].offset);
+       BufFileClose(fd);
+
+       /* discard the subxacts added later */
+       subxact_data.nsubxacts = subidx;
+
+       /* write the updated subxact list */
+       subxact_info_write(MyLogicalRepWorker->subid, xid);
+       CommitTransactionCommand();
+   }
+}
+
+/*
+ * Handle STREAM COMMIT message.
+ */
+static void
+apply_handle_stream_commit(StringInfo s)
+{
+   TransactionId xid;
+   StringInfoData s2;
+   int         nchanges;
+   char        path[MAXPGPATH];
+   char       *buffer = NULL;
+   bool        found;
+   LogicalRepCommitData commit_data;
+   StreamXidHash *ent;
+   MemoryContext oldcxt;
+   BufFile    *fd;
+
+   Assert(!in_streamed_transaction);
+
+   xid = logicalrep_read_stream_commit(s, &commit_data);
+
+   elog(DEBUG1, "received commit for streamed transaction %u", xid);
+
+   ensure_transaction();
+
+   /*
+    * Allocate file handle and memory required to process all the messages in
+    * TopTransactionContext to avoid them getting reset after each message is
+    * processed.
+    */
+   oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+
+   /* open the spool file for the committed transaction */
+   changes_filename(path, MyLogicalRepWorker->subid, xid);
+   elog(DEBUG1, "replaying changes from file \"%s\"", path);
+   ent = (StreamXidHash *) hash_search(xidhash,
+                                       (void *) &xid,
+                                       HASH_FIND,
+                                       &found);
+   Assert(found);
+   fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
+
+   buffer = palloc(BLCKSZ);
+   initStringInfo(&s2);
+
+   MemoryContextSwitchTo(oldcxt);
+
+   remote_final_lsn = commit_data.commit_lsn;
+
+   /*
+    * Make sure the handle apply_dis methods are aware we're in a remote
+    * transaction.
+    */
+   in_remote_transaction = true;
+   pgstat_report_activity(STATE_RUNNING, NULL);
+
+   /*
+    * Read the entries one by one and pass them through the same logic as in
+    * apply_dis.
+    */
+   nchanges = 0;
+   while (true)
+   {
+       int         nbytes;
+       int         len;
+
+       CHECK_FOR_INTERRUPTS();
+
+       /* read length of the on-disk record */
+       nbytes = BufFileRead(fd, &len, sizeof(len));
+
+       /* have we reached end of the file? */
+       if (nbytes == 0)
+           break;
+
+       /* do we have a correct length? */
+       if (nbytes != sizeof(len))
+           ereport(ERROR,
+                   (errcode_for_file_access(),
+                    errmsg("could not read from  transaction's changes file \"%s\": %m",
+                           path)));
+
+       Assert(len > 0);
+
+       /* make sure we have sufficiently large buffer */
+       buffer = repalloc(buffer, len);
+
+       /* and finally read the data into the buffer */
+       if (BufFileRead(fd, buffer, len) != len)
+           ereport(ERROR,
+                   (errcode_for_file_access(),
+                    errmsg("could not read from  transaction's changes file \"%s\": %m",
+                           path)));
+
+       /* copy the buffer to the stringinfo and call apply_dis */
+       resetStringInfo(&s2);
+       appendBinaryStringInfo(&s2, buffer, len);
+
+       /* Ensure we are reading the data into our memory context. */
+       oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
+
+       apply_dis(&s2);
+
+       MemoryContextReset(ApplyMessageContext);
+
+       MemoryContextSwitchTo(oldcxt);
+
+       nchanges++;
+
+       if (nchanges % 1000 == 0)
+           elog(DEBUG1, "replayed %d changes from file '%s'",
+                nchanges, path);
+   }
+
+   BufFileClose(fd);
+
+   /*
+    * Update origin state so we can restart  from correct position
+    * in case of crash.
+    */
+   replorigin_session_origin_lsn = commit_data.end_lsn;
+   replorigin_session_origin_timestamp = commit_data.committime;
+
+   pfree(buffer);
+   pfree(s2.data);
+
+   CommitTransactionCommand();
+   pgstat_report_stat(false);
+
+   store_flush_position(commit_data.end_lsn);
+
+   elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
+        nchanges, path);
+
+   in_remote_transaction = false;
+
+   /* Process any tables that are being synchronized in parallel. */
+   process_syncing_tables(commit_data.end_lsn);
+
+   /* unlink the files with serialized changes and subxact info */
+   stream_cleanup_files(MyLogicalRepWorker->subid, xid);
+
+   pgstat_report_activity(STATE_IDLE, NULL);
+}
+
 /*
  * Handle RELATION message.
  *
@@ -635,6 +1100,9 @@ apply_handle_relation(StringInfo s)
 {
    LogicalRepRelation *rel;
 
+   if (handle_streamed_transaction('R', s))
+       return;
+
    rel = logicalrep_read_rel(s);
    logicalrep_relmap_update(rel);
 }
@@ -650,6 +1118,9 @@ apply_handle_type(StringInfo s)
 {
    LogicalRepTyp typ;
 
+   if (handle_streamed_transaction('Y', s))
+       return;
+
    logicalrep_read_typ(s, &typ);
    logicalrep_typmap_update(&typ);
 }
@@ -686,6 +1157,9 @@ apply_handle_insert(StringInfo s)
    TupleTableSlot *remoteslot;
    MemoryContext oldctx;
 
+   if (handle_streamed_transaction('I', s))
+       return;
+
    ensure_transaction();
 
    relid = logicalrep_read_insert(s, &newtup);
@@ -801,6 +1275,9 @@ apply_handle_update(StringInfo s)
    RangeTblEntry *target_rte;
    MemoryContext oldctx;
 
+   if (handle_streamed_transaction('U', s))
+       return;
+
    ensure_transaction();
 
    relid = logicalrep_read_update(s, &has_oldtup, &oldtup,
@@ -950,6 +1427,9 @@ apply_handle_delete(StringInfo s)
    TupleTableSlot *remoteslot;
    MemoryContext oldctx;
 
+   if (handle_streamed_transaction('D', s))
+       return;
+
    ensure_transaction();
 
    relid = logicalrep_read_delete(s, &oldtup);
@@ -1320,6 +1800,9 @@ apply_handle_truncate(StringInfo s)
    List       *relids_logged = NIL;
    ListCell   *lc;
 
+   if (handle_streamed_transaction('T', s))
+       return;
+
    ensure_transaction();
 
    remote_relids = logicalrep_read_truncate(s, &cascade, &restart_seqs);
@@ -1458,6 +1941,22 @@ apply_dis(StringInfo s)
        case 'O':
            apply_handle_origin(s);
            break;
+           /* STREAM START */
+       case 'S':
+           apply_handle_stream_start(s);
+           break;
+           /* STREAM END */
+       case 'E':
+           apply_handle_stream_stop(s);
+           break;
+           /* STREAM ABORT */
+       case 'A':
+           apply_handle_stream_abort(s);
+           break;
+           /* STREAM COMMIT */
+       case 'c':
+           apply_handle_stream_commit(s);
+           break;
        default:
            ereport(ERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
@@ -1570,6 +2069,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
                                                "ApplyMessageContext",
                                                ALLOCSET_DEFAULT_SIZES);
 
+   /*
+    * This memory context is used for per-stream data when the  mode
+    * is enabled. This context is reset on each stream stop.
+    */
+   LogicalContext = AllocSetContextCreate(ApplyContext,
+                                                   "LogicalContext",
+                                                   ALLOCSET_DEFAULT_SIZES);
+
    /* mark as idle, before starting to loop */
    pgstat_report_activity(STATE_IDLE, NULL);
 
@@ -1674,7 +2181,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
        /* confirm all writes so far */
        send_feedback(last_received, false, false);
 
-       if (!in_remote_transaction)
+       if (!in_remote_transaction && !in_streamed_transaction)
        {
            /*
             * If we didn't get any transactions for a while there might be
@@ -1938,6 +2445,7 @@ maybe_reread_subscription(void)
        strcmp(newsub->name, MySubscription->name) != 0 ||
        strcmp(newsub->slotname, MySubscription->slotname) != 0 ||
        newsub->binary != MySubscription->binary ||
+       newsub->stream != MySubscription->stream ||
        !equal(newsub->publications, MySubscription->publications))
    {
        ereport(LOG,
@@ -1979,6 +2487,439 @@ subscription_change_cb(Datum arg, int cacheid, uint32 hashvalue)
    MySubscriptionValid = false;
 }
 
+/*
+ * subxact_info_write
+ *   Store information about subxacts for a toplevel transaction.
+ *
+ * For each subxact we store offset of it's first change in the main file.
+ * The file is always over-written as a whole.
+ *
+ * XXX We should only store subxacts that were not aborted yet.
+ */
+static void
+subxact_info_write(Oid subid, TransactionId xid)
+{
+   char        path[MAXPGPATH];
+   bool        found;
+   Size        len;
+   StreamXidHash *ent;
+   BufFile    *fd;
+
+   Assert(TransactionIdIsValid(xid));
+
+   /* find the xid entry in the xidhash */
+   ent = (StreamXidHash *) hash_search(xidhash,
+                                       (void *) &xid,
+                                       HASH_FIND,
+                                       &found);
+   /* we must found the entry for its top transaction by this time */
+   Assert(found);
+
+   /*
+    * If there is no subtransaction then nothing to do, but if already have
+    * subxact file then delete that.
+    */
+   if (subxact_data.nsubxacts == 0)
+   {
+       if (ent->subxact_fileset)
+       {
+           cleanup_subxact_info();
+           SharedFileSetDeleteAll(ent->subxact_fileset);
+           pfree(ent->subxact_fileset);
+           ent->subxact_fileset = NULL;
+       }
+       return;
+   }
+
+   subxact_filename(path, subid, xid);
+
+   /*
+    * Create the subxact file if it not already created, otherwise open the
+    * existing file.
+    */
+   if (ent->subxact_fileset == NULL)
+   {
+       MemoryContext oldctx;
+
+       /*
+        * We need to maintain shared fileset across multiple stream
+        * start/stop calls.  So, need to allocate it in a persistent context.
+        */
+       oldctx = MemoryContextSwitchTo(ApplyContext);
+       ent->subxact_fileset = palloc(sizeof(SharedFileSet));
+       SharedFileSetInit(ent->subxact_fileset, NULL);
+       MemoryContextSwitchTo(oldctx);
+
+       fd = BufFileCreateShared(ent->subxact_fileset, path);
+   }
+   else
+       fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDWR);
+
+   len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
+
+   /* Write the subxact count and subxact info */
+   BufFileWrite(fd, &subxact_data.nsubxacts, sizeof(subxact_data.nsubxacts));
+   BufFileWrite(fd, subxact_data.subxacts, len);
+
+   BufFileClose(fd);
+
+   /* free the memory allocated for subxact info */
+   cleanup_subxact_info();
+}
+
+/*
+ * subxact_info_read
+ *   Restore information about subxacts of a streamed transaction.
+ *
+ * Read information about subxacts into the structure subxact_data that can be
+ * used later.
+ */
+static void
+subxact_info_read(Oid subid, TransactionId xid)
+{
+   char        path[MAXPGPATH];
+   bool        found;
+   Size        len;
+   BufFile    *fd;
+   StreamXidHash *ent;
+   MemoryContext oldctx;
+
+   Assert(TransactionIdIsValid(xid));
+   Assert(!subxact_data.subxacts);
+   Assert(subxact_data.nsubxacts == 0);
+   Assert(subxact_data.nsubxacts_max == 0);
+
+   /* Find the stream xid entry in the xidhash */
+   ent = (StreamXidHash *) hash_search(xidhash,
+                                       (void *) &xid,
+                                       HASH_FIND,
+                                       &found);
+
+   /*
+    * If subxact_fileset is not valid that mean we don't have any subxact
+    * info
+    */
+   if (ent->subxact_fileset == NULL)
+       return;
+
+   subxact_filename(path, subid, xid);
+
+   fd = BufFileOpenShared(ent->subxact_fileset, path, O_RDONLY);
+
+   /* read number of subxact items */
+   if (BufFileRead(fd, &subxact_data.nsubxacts,
+                   sizeof(subxact_data.nsubxacts)) !=
+       sizeof(subxact_data.nsubxacts))
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("could not read from  transaction's subxact file \"%s\": %m",
+                       path)));
+
+   len = sizeof(SubXactInfo) * subxact_data.nsubxacts;
+
+   /* we keep the maximum as a power of 2 */
+   subxact_data.nsubxacts_max = 1 << my_log2(subxact_data.nsubxacts);
+
+   /*
+    * Allocate subxact information in the logical  context. We need
+    * this information during the complete stream so that we can add the sub
+    * transaction info to this. On stream stop we will flush this information
+    * to the subxact file and reset the logical  context.
+    */
+   oldctx = MemoryContextSwitchTo(LogicalContext);
+   subxact_data.subxacts = palloc(subxact_data.nsubxacts_max *
+                                  sizeof(SubXactInfo));
+   MemoryContextSwitchTo(oldctx);
+
+   if ((len > 0) && ((BufFileRead(fd, subxact_data.subxacts, len)) != len))
+       ereport(ERROR,
+               (errcode_for_file_access(),
+                errmsg("could not read from  transaction's subxact file \"%s\": %m",
+                       path)));
+
+   BufFileClose(fd);
+}
+
+/*
+ * subxact_info_add
+ *   Add information about a subxact (offset in the main file).
+ */
+static void
+subxact_info_add(TransactionId xid)
+{
+   SubXactInfo *subxacts = subxact_data.subxacts;
+   int64       i;
+
+   /* We must have a valid top level stream xid and a stream fd. */
+   Assert(TransactionIdIsValid(stream_xid));
+   Assert(stream_fd != NULL);
+
+   /*
+    * If the XID matches the toplevel transaction, we don't want to add it.
+    */
+   if (stream_xid == xid)
+       return;
+
+   /*
+    * In most cases we're checking the same subxact as we've already seen in
+    * the last call, so make sure to ignore it (this change comes later).
+    */
+   if (subxact_data.subxact_last == xid)
+       return;
+
+   /* OK, remember we're processing this XID. */
+   subxact_data.subxact_last = xid;
+
+   /*
+    * Check if the transaction is already present in the array of subxact. We
+    * intentionally scan the array from the tail, because we're likely adding
+    * a change for the most recent subtransactions.
+    *
+    * XXX Can we rely on the subxact XIDs arriving in sorted order? That
+    * would allow us to use binary search here.
+    */
+   for (i = subxact_data.nsubxacts; i > 0; i--)
+   {
+       /* found, so we're done */
+       if (subxacts[i - 1].xid == xid)
+           return;
+   }
+
+   /* This is a new subxact, so we need to add it to the array. */
+   if (subxact_data.nsubxacts == 0)
+   {
+       MemoryContext oldctx;
+
+       subxact_data.nsubxacts_max = 128;
+
+       /*
+        * Allocate this memory for subxacts in per-stream context, see
+        * subxact_info_read.
+        */
+       oldctx = MemoryContextSwitchTo(LogicalContext);
+       subxacts = palloc(subxact_data.nsubxacts_max * sizeof(SubXactInfo));
+       MemoryContextSwitchTo(oldctx);
+   }
+   else if (subxact_data.nsubxacts == subxact_data.nsubxacts_max)
+   {
+       subxact_data.nsubxacts_max *= 2;
+       subxacts = repalloc(subxacts,
+                           subxact_data.nsubxacts_max * sizeof(SubXactInfo));
+   }
+
+   subxacts[subxact_data.nsubxacts].xid = xid;
+
+   /*
+    * Get the current offset of the stream file and store it as offset of
+    * this subxact.
+    */
+   BufFileTell(stream_fd,
+               &subxacts[subxact_data.nsubxacts].fileno,
+               &subxacts[subxact_data.nsubxacts].offset);
+
+   subxact_data.nsubxacts++;
+   subxact_data.subxacts = subxacts;
+}
+
+/* format filename for file containing the info about subxacts */
+static void
+subxact_filename(char *path, Oid subid, TransactionId xid)
+{
+   snprintf(path, MAXPGPATH, "%u-%u.subxacts", subid, xid);
+}
+
+/* format filename for file containing serialized changes */
+static inline void
+changes_filename(char *path, Oid subid, TransactionId xid)
+{
+   snprintf(path, MAXPGPATH, "%u-%u.changes", subid, xid);
+}
+
+/*
+ * stream_cleanup_files
+ *   Cleanup files for a subscription / toplevel transaction.
+ *
+ * Remove files with serialized changes and subxact info for a particular
+ * toplevel transaction. Each subscription has a separate set of files.
+ */
+static void
+stream_cleanup_files(Oid subid, TransactionId xid)
+{
+   char        path[MAXPGPATH];
+   StreamXidHash *ent;
+
+   /* Remove the xid entry from the stream xid hash */
+   ent = (StreamXidHash *) hash_search(xidhash,
+                                       (void *) &xid,
+                                       HASH_REMOVE,
+                                       NULL);
+   /* By this time we must have created the transaction entry */
+   Assert(ent != NULL);
+
+   /* Delete the change file and release the stream fileset memory */
+   changes_filename(path, subid, xid);
+   SharedFileSetDeleteAll(ent->stream_fileset);
+   pfree(ent->stream_fileset);
+   ent->stream_fileset = NULL;
+
+   /* Delete the subxact file and release the memory, if it exist */
+   if (ent->subxact_fileset)
+   {
+       subxact_filename(path, subid, xid);
+       SharedFileSetDeleteAll(ent->subxact_fileset);
+       pfree(ent->subxact_fileset);
+       ent->subxact_fileset = NULL;
+   }
+}
+
+/*
+ * stream_open_file
+ *   Open a file that we'll use to serialize changes for a toplevel
+ * transaction.
+ *
+ * Open a file for streamed changes from a toplevel transaction identified
+ * by stream_xid (global variable). If it's the first chunk of streamed
+ * changes for this transaction, initialize the shared fileset and create the
+ * buffile, otherwise open the previously created file.
+ *
+ * This can only be called at the beginning of a "" block, i.e.
+ * between stream_start/stream_stop messages from the upstream.
+ */
+static void
+stream_open_file(Oid subid, TransactionId xid, bool first_segment)
+{
+   char        path[MAXPGPATH];
+   bool        found;
+   MemoryContext oldcxt;
+   StreamXidHash *ent;
+
+   Assert(in_streamed_transaction);
+   Assert(OidIsValid(subid));
+   Assert(TransactionIdIsValid(xid));
+   Assert(stream_fd == NULL);
+
+   /* create or find the xid entry in the xidhash */
+   ent = (StreamXidHash *) hash_search(xidhash,
+                                       (void *) &xid,
+                                       HASH_ENTER | HASH_FIND,
+                                       &found);
+   Assert(first_segment || found);
+   changes_filename(path, subid, xid);
+   elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
+
+   /*
+    * Create/open the buffiles under the logical  context so that we
+    * have those files until stream stop.
+    */
+   oldcxt = MemoryContextSwitchTo(LogicalContext);
+
+   /*
+    * If this is the first streamed segment, the file must not exist, so make
+    * sure we're the ones creating it. Otherwise just open the file for
+    * writing, in append mode.
+    */
+   if (first_segment)
+   {
+       MemoryContext savectx;
+       SharedFileSet *fileset;
+
+       /*
+        * We need to maintain shared fileset across multiple stream
+        * start/stop calls. So, need to allocate it in a persistent context.
+        */
+       savectx = MemoryContextSwitchTo(ApplyContext);
+       fileset = palloc(sizeof(SharedFileSet));
+
+       SharedFileSetInit(fileset, NULL);
+       MemoryContextSwitchTo(savectx);
+
+       stream_fd = BufFileCreateShared(fileset, path);
+
+       /* Remember the fileset for the next stream of the same transaction */
+       ent->xid = xid;
+       ent->stream_fileset = fileset;
+       ent->subxact_fileset = NULL;
+   }
+   else
+   {
+       /*
+        * Open the file and seek to the end of the file because we always
+        * append the changes file.
+        */
+       stream_fd = BufFileOpenShared(ent->stream_fileset, path, O_RDWR);
+       BufFileSeek(stream_fd, 0, 0, SEEK_END);
+   }
+
+   MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * stream_close_file
+ *   Close the currently open file with streamed changes.
+ *
+ * This can only be called at the end of a  block, i.e. at stream_stop
+ * message from the upstream.
+ */
+static void
+stream_close_file(void)
+{
+   Assert(in_streamed_transaction);
+   Assert(TransactionIdIsValid(stream_xid));
+   Assert(stream_fd != NULL);
+
+   BufFileClose(stream_fd);
+
+   stream_xid = InvalidTransactionId;
+   stream_fd = NULL;
+}
+
+/*
+ * stream_write_change
+ *   Serialize a change to a file for the current toplevel transaction.
+ *
+ * The change is serialized in a simple format, with length (not including
+ * the length), action code (identifying the message type) and message
+ * contents (without the subxact TransactionId value).
+ */
+static void
+stream_write_change(char action, StringInfo s)
+{
+   int         len;
+
+   Assert(in_streamed_transaction);
+   Assert(TransactionIdIsValid(stream_xid));
+   Assert(stream_fd != NULL);
+
+   /* total on-disk size, including the action type character */
+   len = (s->len - s->cursor) + sizeof(char);
+
+   /* first write the size */
+   BufFileWrite(stream_fd, &len, sizeof(len));
+
+   /* then the action */
+   BufFileWrite(stream_fd, &action, sizeof(action));
+
+   /* and finally the remaining part of the buffer (after the XID) */
+   len = (s->len - s->cursor);
+
+   BufFileWrite(stream_fd, &s->data[s->cursor], len);
+}
+
+/*
+ * Cleanup the memory for subxacts and reset the related variables.
+ */
+static inline void
+cleanup_subxact_info()
+{
+   if (subxact_data.subxacts)
+       pfree(subxact_data.subxacts);
+
+   subxact_data.subxacts = NULL;
+   subxact_data.subxact_last = InvalidTransactionId;
+   subxact_data.nsubxacts = 0;
+   subxact_data.nsubxacts_max = 0;
+}
+
 /* Logical Replication Apply worker entry point */
 void
 ApplyWorkerMain(Datum main_arg)
@@ -2151,6 +3092,7 @@ ApplyWorkerMain(Datum main_arg)
    options.proto.logical.proto_version = LOGICALREP_PROTO_VERSION_NUM;
    options.proto.logical.publication_names = MySubscription->publications;
    options.proto.logical.binary = MySubscription->binary;
+   options.proto.logical. = MySubscription->stream;
 
    /* Start normal logical  replication. */
    walrcv_start(wrconn, &options);
index 81ef7dc4c1a33c1380cbcb5705c4be4faee3d786..c29c0888133af40eabcf7078f5ade43fe7cbd824 100644 (file)
@@ -47,17 +47,40 @@ static void pgoutput_truncate(LogicalDecodingContext *ctx,
                              ReorderBufferChange *change);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
                                   RepOriginId origin_id);
+static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
+                                 ReorderBufferTXN *txn);
+static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
+                                ReorderBufferTXN *txn);
+static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+                                 ReorderBufferTXN *txn,
+                                 XLogRecPtr abort_lsn);
+static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+                                  ReorderBufferTXN *txn,
+                                  XLogRecPtr commit_lsn);
 
 static bool publications_valid;
+static bool in_;
 
 static List *LoadPublications(List *pubnames);
 static void publication_invalidation_cb(Datum arg, int cacheid,
                                        uint32 hashvalue);
-static void send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx);
+static void send_relation_and_attrs(Relation relation, TransactionId xid,
+                                   LogicalDecodingContext *ctx);
 
 /*
  * Entry in the map used to remember which relation schemas we sent.
  *
+ * The schema_sent flag determines if the current schema record was already
+ * sent to the subscriber (in which case we don't need to send it again).
+ *
+ * The schema cache on downstream is however updated only at commit time,
+ * and with streamed transactions the commit order may be different from
+ * the order the transactions are sent in. Also, the (sub) transactions
+ * might get aborted so we need to send the schema for each (sub) transaction
+ * so that we don't loose the schema information on abort. For handling this,
+ * we maintain the list of xids (streamed_txns) for those we have already sent
+ * the schema.
+ *
  * For partitions, 'pubactions' considers not only the table's own
  * publications, but also those of all of its ancestors.
  */
@@ -70,6 +93,8 @@ typedef struct RelationSyncEntry
     * have been sent for this to be true.
     */
    bool        schema_sent;
+   List       *streamed_txns;  /* streamed toplevel transactions with this
+                                * schema */
 
    bool        replicate_valid;
    PublicationActions pubactions;
@@ -95,10 +120,15 @@ typedef struct RelationSyncEntry
 static HTAB *RelationSyncCache = NULL;
 
 static void init_rel_sync_cache(MemoryContext decoding_context);
+static void cleanup_rel_sync_cache(TransactionId xid, bool is_commit);
 static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid);
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
                                          uint32 hashvalue);
+static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+                                           TransactionId xid);
+static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
+                                           TransactionId xid);
 
 /*
  * Specify output plugin callbacks
@@ -115,16 +145,26 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
    cb->commit_cb = pgoutput_commit_txn;
    cb->filter_by_origin_cb = pgoutput_origin_filter;
    cb->shutdown_cb = pgoutput_shutdown;
+
+   /* transaction  */
+   cb->stream_start_cb = pgoutput_stream_start;
+   cb->stream_stop_cb = pgoutput_stream_stop;
+   cb->stream_abort_cb = pgoutput_stream_abort;
+   cb->stream_commit_cb = pgoutput_stream_commit;
+   cb->stream_change_cb = pgoutput_change;
+   cb->stream_truncate_cb = pgoutput_truncate;
 }
 
 static void
 parse_output_parameters(List *options, uint32 *protocol_version,
-                       List **publication_names, bool *binary)
+                       List **publication_names, bool *binary,
+                       bool *enable_)
 {
    ListCell   *lc;
    bool        protocol_version_given = false;
    bool        publication_names_given = false;
    bool        binary_option_given = false;
+   bool        _given = false;
 
    *binary = false;
 
@@ -182,6 +222,16 @@ parse_output_parameters(List *options, uint32 *protocol_version,
 
            *binary = defGetBoolean(defel);
        }
+       else if (strcmp(defel->defname, "") == 0)
+       {
+           if (_given)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           _given = true;
+
+           *enable_ = defGetBoolean(defel);
+       }
        else
            elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
    }
@@ -194,6 +244,7 @@ static void
 pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                 bool is_init)
 {
+   bool        enable_ = false;
    PGOutputData *data = palloc0(sizeof(PGOutputData));
 
    /* Create our memory context for private allocations. */
@@ -217,7 +268,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
        parse_output_parameters(ctx->output_plugin_options,
                                &data->protocol_version,
                                &data->publication_names,
-                               &data->binary);
+                               &data->binary,
+                               &enable_);
 
        /* Check if we support requested protocol */
        if (data->protocol_version > LOGICALREP_PROTO_VERSION_NUM)
@@ -237,6 +289,27 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                    (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                     errmsg("publication_names parameter missing")));
 
+       /*
+        * Decide whether to enable . It is disabled by default, in
+        * which case we just update the flag in decoding context. Otherwise
+        * we only allow it with sufficient version of the protocol, and when
+        * the output plugin supports it.
+        */
+       if (!enable_)
+           ctx-> = false;
+       else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
+           ereport(ERROR,
+                   (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                    errmsg("requested proto_version=%d does not support , need %d or higher",
+                           data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
+       else if (!ctx->)
+           ereport(ERROR,
+                   (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                    errmsg(" requested, but not supported by output plugin")));
+
+       /* Also remember we're currently not  any transaction. */
+       in_ = false;
+
        /* Init publication state. */
        data->publications = NIL;
        publications_valid = false;
@@ -247,6 +320,11 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
        /* Initialize relation schema cache. */
        init_rel_sync_cache(CacheMemoryContext);
    }
+   else
+   {
+       /* Disable the  during the slot initialization mode. */
+       ctx-> = false;
+   }
 }
 
 /*
@@ -305,9 +383,47 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
  */
 static void
 maybe_send_schema(LogicalDecodingContext *ctx,
+                 ReorderBufferTXN *txn, ReorderBufferChange *change,
                  Relation relation, RelationSyncEntry *relentry)
 {
-   if (relentry->schema_sent)
+   bool        schema_sent;
+   TransactionId xid = InvalidTransactionId;
+   TransactionId topxid = InvalidTransactionId;
+
+   /*
+    * Remember XID of the (sub)transaction for the change. We don't care if
+    * it's top-level transaction or not (we have already sent that XID in
+    * start of the current  block).
+    *
+    * If we're not in a  block, just use InvalidTransactionId and
+    * the write methods will not include it.
+    */
+   if (in_)
+       xid = change->txn->xid;
+
+   if (change->txn->toptxn)
+       topxid = change->txn->toptxn->xid;
+   else
+       topxid = xid;
+
+   /*
+    * Do we need to send the schema? We do track streamed transactions
+    * separately, because those may be applied later (and the regular
+    * transactions won't see their effects until then) and in an order that
+    * we don't know at this point.
+    *
+    * XXX There is a scope of optimization here. Currently, we always send
+    * the schema first time in a  transaction but we can probably
+    * avoid that by checking 'relentry->schema_sent' flag. However, before
+    * doing that we need to study its impact on the case where we have a mix
+    * of  and non- transactions.
+    */
+   if (in_)
+       schema_sent = get_schema_sent_in_streamed_txn(relentry, topxid);
+   else
+       schema_sent = relentry->schema_sent;
+
+   if (schema_sent)
        return;
 
    /* If needed, send the ancestor's schema first. */
@@ -323,19 +439,24 @@ maybe_send_schema(LogicalDecodingContext *ctx,
        relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
                                               CreateTupleDescCopy(outdesc));
        MemoryContextSwitchTo(oldctx);
-       send_relation_and_attrs(ancestor, ctx);
+       send_relation_and_attrs(ancestor, xid, ctx);
        RelationClose(ancestor);
    }
 
-   send_relation_and_attrs(relation, ctx);
-   relentry->schema_sent = true;
+   send_relation_and_attrs(relation, xid, ctx);
+
+   if (in_)
+       set_schema_sent_in_streamed_txn(relentry, topxid);
+   else
+       relentry->schema_sent = true;
 }
 
 /*
  * Sends a relation
  */
 static void
-send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
+send_relation_and_attrs(Relation relation, TransactionId xid,
+                       LogicalDecodingContext *ctx)
 {
    TupleDesc   desc = RelationGetDescr(relation);
    int         i;
@@ -359,17 +480,19 @@ send_relation_and_attrs(Relation relation, LogicalDecodingContext *ctx)
            continue;
 
        OutputPluginPrepareWrite(ctx, false);
-       logicalrep_write_typ(ctx->out, att->atttypid);
+       logicalrep_write_typ(ctx->out, xid, att->atttypid);
        OutputPluginWrite(ctx, false);
    }
 
    OutputPluginPrepareWrite(ctx, false);
-   logicalrep_write_rel(ctx->out, relation);
+   logicalrep_write_rel(ctx->out, xid, relation);
    OutputPluginWrite(ctx, false);
 }
 
 /*
  * Sends the decoded DML over wire.
+ *
+ * This is called both in  and non- modes.
  */
 static void
 pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
@@ -378,10 +501,20 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
    MemoryContext old;
    RelationSyncEntry *relentry;
+   TransactionId xid = InvalidTransactionId;
 
    if (!is_publishable_relation(relation))
        return;
 
+   /*
+    * Remember the xid for the change in  mode. We need to send xid
+    * with each change in the  mode so that subscriber can make
+    * their association and on aborts, it can discard the corresponding
+    * changes.
+    */
+   if (in_)
+       xid = change->txn->xid;
+
    relentry = get_rel_sync_entry(data, RelationGetRelid(relation));
 
    /* First check the table filter */
@@ -406,7 +539,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    /* Avoid ing memory by using and resetting our own context */
    old = MemoryContextSwitchTo(data->context);
 
-   maybe_send_schema(ctx, relation, relentry);
+   maybe_send_schema(ctx, txn, change, relation, relentry);
 
    /* Send the data */
    switch (change->action)
@@ -426,7 +559,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                }
 
                OutputPluginPrepareWrite(ctx, true);
-               logicalrep_write_insert(ctx->out, relation, tuple,
+               logicalrep_write_insert(ctx->out, xid, relation, tuple,
                                        data->binary);
                OutputPluginWrite(ctx, true);
                break;
@@ -451,8 +584,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                }
 
                OutputPluginPrepareWrite(ctx, true);
-               logicalrep_write_update(ctx->out, relation, oldtuple, newtuple,
-                                       data->binary);
+               logicalrep_write_update(ctx->out, xid, relation, oldtuple,
+                                       newtuple, data->binary);
                OutputPluginWrite(ctx, true);
                break;
            }
@@ -472,7 +605,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                }
 
                OutputPluginPrepareWrite(ctx, true);
-               logicalrep_write_delete(ctx->out, relation, oldtuple,
+               logicalrep_write_delete(ctx->out, xid, relation, oldtuple,
                                        data->binary);
                OutputPluginWrite(ctx, true);
            }
@@ -498,6 +631,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    int         i;
    int         nrelids;
    Oid        *relids;
+   TransactionId xid = InvalidTransactionId;
+
+   /* Remember the xid for the change in  mode. See pgoutput_change. */
+   if (in_)
+       xid = change->txn->xid;
 
    old = MemoryContextSwitchTo(data->context);
 
@@ -526,13 +664,14 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
            continue;
 
        relids[nrelids++] = relid;
-       maybe_send_schema(ctx, relation, relentry);
+       maybe_send_schema(ctx, txn, change, relation, relentry);
    }
 
    if (nrelids > 0)
    {
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_truncate(ctx->out,
+                                 xid,
                                  nrelids,
                                  relids,
                                  change->data.truncate.cascade,
@@ -605,6 +744,118 @@ publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue)
    rel_sync_cache_publication_cb(arg, cacheid, hashvalue);
 }
 
+/*
+ * START STREAM callback
+ */
+static void
+pgoutput_stream_start(struct LogicalDecodingContext *ctx,
+                     ReorderBufferTXN *txn)
+{
+   bool        send_replication_origin = txn->origin_id != InvalidRepOriginId;
+
+   /* we can't nest  of transactions */
+   Assert(!in_);
+
+   /*
+    * If we already sent the first stream for this transaction then don't
+    * send the origin id in the subsequent streams.
+    */
+   if (rbtxn_is_streamed(txn))
+       send_replication_origin = false;
+
+   OutputPluginPrepareWrite(ctx, !send_replication_origin);
+   logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn));
+
+   if (send_replication_origin)
+   {
+       char       *origin;
+
+       /* Message boundary */
+       OutputPluginWrite(ctx, false);
+       OutputPluginPrepareWrite(ctx, true);
+
+       if (replorigin_by_oid(txn->origin_id, true, &origin))
+           logicalrep_write_origin(ctx->out, origin, InvalidXLogRecPtr);
+   }
+
+   OutputPluginWrite(ctx, true);
+
+   /* we're  a chunk of transaction now */
+   in_ = true;
+}
+
+/*
+ * STOP STREAM callback
+ */
+static void
+pgoutput_stream_stop(struct LogicalDecodingContext *ctx,
+                    ReorderBufferTXN *txn)
+{
+   /* we should be  a trasanction */
+   Assert(in_);
+
+   OutputPluginPrepareWrite(ctx, true);
+   logicalrep_write_stream_stop(ctx->out);
+   OutputPluginWrite(ctx, true);
+
+   /* we've stopped  a transaction */
+   in_ = false;
+}
+
+/*
+ * Notify downstream to discard the streamed transaction (along with all
+ * it's subtransactions, if it's a toplevel transaction).
+ */
+static void
+pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
+                     ReorderBufferTXN *txn,
+                     XLogRecPtr abort_lsn)
+{
+   ReorderBufferTXN *toptxn;
+
+   /*
+    * The abort should happen outside  block, even for streamed
+    * transactions. The transaction has to be marked as streamed, though.
+    */
+   Assert(!in_);
+
+   /* determine the toplevel transaction */
+   toptxn = (txn->toptxn) ? txn->toptxn : txn;
+
+   Assert(rbtxn_is_streamed(toptxn));
+
+   OutputPluginPrepareWrite(ctx, true);
+   logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
+   OutputPluginWrite(ctx, true);
+
+   cleanup_rel_sync_cache(toptxn->xid, false);
+}
+
+/*
+ * Notify downstream to apply the streamed transaction (along with all
+ * it's subtransactions).
+ */
+static void
+pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
+                      ReorderBufferTXN *txn,
+                      XLogRecPtr commit_lsn)
+{
+   /*
+    * The commit should happen outside  block, even for streamed
+    * transactions. The transaction has to be marked as streamed, though.
+    */
+   Assert(!in_);
+   Assert(rbtxn_is_streamed(txn));
+
+   OutputPluginUpdateProgress(ctx);
+
+   OutputPluginPrepareWrite(ctx, true);
+   logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
+   OutputPluginWrite(ctx, true);
+
+   cleanup_rel_sync_cache(txn->xid, true);
+}
+
 /*
  * Initialize the relation schema sync cache for a decoding session.
  *
@@ -641,6 +892,39 @@ init_rel_sync_cache(MemoryContext cachectx)
                                  (Datum) 0);
 }
 
+/*
+ * We expect relatively small number of streamed transactions.
+ */
+static bool
+get_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+{
+   ListCell   *lc;
+
+   foreach(lc, entry->streamed_txns)
+   {
+       if (xid == (uint32) lfirst_int(lc))
+           return true;
+   }
+
+   return false;
+}
+
+/*
+ * Add the xid in the rel sync entry for which we have already sent the schema
+ * of the relation.
+ */
+static void
+set_schema_sent_in_streamed_txn(RelationSyncEntry *entry, TransactionId xid)
+{
+   MemoryContext oldctx;
+
+   oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+
+   entry->streamed_txns = lappend_int(entry->streamed_txns, xid);
+
+   MemoryContextSwitchTo(oldctx);
+}
+
 /*
  * Find or create entry in the relation schema cache.
  *
@@ -771,11 +1055,58 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
    }
 
    if (!found)
+   {
        entry->schema_sent = false;
+       entry->streamed_txns = NULL;
+   }
 
    return entry;
 }
 
+/*
+ * Cleanup list of streamed transactions and update the schema_sent flag.
+ *
+ * When a streamed transaction commits or aborts, we need to remove the
+ * toplevel XID from the schema cache. If the transaction aborted, the
+ * subscriber will simply throw away the schema records we streamed, so
+ * we don't need to do anything else.
+ *
+ * If the transaction is committed, the subscriber will update the relation
+ * cache - so tweak the schema_sent flag accordingly.
+ */
+static void
+cleanup_rel_sync_cache(TransactionId xid, bool is_commit)
+{
+   HASH_SEQ_STATUS hash_seq;
+   RelationSyncEntry *entry;
+   ListCell   *lc;
+
+   Assert(RelationSyncCache != NULL);
+
+   hash_seq_init(&hash_seq, RelationSyncCache);
+   while ((entry = hash_seq_search(&hash_seq)) != NULL)
+   {
+       /*
+        * We can set the schema_sent flag for an entry that has committed xid
+        * in the list as that ensures that the subscriber would have the
+        * corresponding schema and we don't need to send it unless there is
+        * any invalidation for that relation.
+        */
+       foreach(lc, entry->streamed_txns)
+       {
+           if (xid == (uint32) lfirst_int(lc))
+           {
+               if (is_commit)
+                   entry->schema_sent = true;
+
+               entry->streamed_txns =
+                   foreach_delete_current(entry->streamed_txns, lc);
+               break;
+           }
+       }
+   }
+}
+
 /*
  * Relcache invalidation callback
  */
@@ -811,7 +1142,11 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
     * Reset schema sent status as the relation definition may have changed.
     */
    if (entry != NULL)
+   {
        entry->schema_sent = false;
+       list_free(entry->streamed_txns);
+       entry->streamed_txns = NULL;
+   }
 }
 
 /*
index 2cb3f9b083ecebdf19acf8824e4d869269f4b4d9..d3ca54e4dc6a490ad315acb9ce0e662520f00438 100644 (file)
@@ -4202,6 +4202,7 @@ getSubscriptions(Archive *fout)
    int         i_oid;
    int         i_subname;
    int         i_rolname;
+   int         i_substream;
    int         i_subconninfo;
    int         i_subslotname;
    int         i_subsynccommit;
@@ -4241,10 +4242,17 @@ getSubscriptions(Archive *fout)
 
    if (fout->remoteVersion >= 140000)
        appendPQExpBuffer(query,
-                         " s.subbinary\n");
+                         " s.subbinary,\n");
    else
        appendPQExpBuffer(query,
-                         " false AS subbinary\n");
+                         " false AS subbinary,\n");
+
+   if (fout->remoteVersion >= 140000)
+       appendPQExpBuffer(query,
+                         " s.substream\n");
+   else
+       appendPQExpBuffer(query,
+                         " false AS substream\n");
 
    appendPQExpBuffer(query,
                      "FROM pg_subscription s\n"
@@ -4264,6 +4272,7 @@ getSubscriptions(Archive *fout)
    i_subsynccommit = PQfnumber(res, "subsynccommit");
    i_subpublications = PQfnumber(res, "subpublications");
    i_subbinary = PQfnumber(res, "subbinary");
+   i_substream = PQfnumber(res, "substream");
 
    subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo));
 
@@ -4287,6 +4296,8 @@ getSubscriptions(Archive *fout)
            pg_strdup(PQgetvalue(res, i, i_subpublications));
        subinfo[i].subbinary =
            pg_strdup(PQgetvalue(res, i, i_subbinary));
+       subinfo[i].substream =
+           pg_strdup(PQgetvalue(res, i, i_substream));
 
        if (strlen(subinfo[i].rolname) == 0)
            pg_log_warning("owner of subscription \"%s\" appears to be invalid",
@@ -4358,6 +4369,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo)
    if (strcmp(subinfo->subbinary, "t") == 0)
        appendPQExpBuffer(query, ", binary = true");
 
+   if (strcmp(subinfo->substream, "f") != 0)
+       appendPQExpBuffer(query, ",  = on");
+
    if (strcmp(subinfo->subsynccommit, "off") != 0)
        appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
index 2f051b83d9117856d1a0ec86c51e91bbf514d94f..e0b42e83912fab74c1ca8e20a640d0f1c85686f1 100644 (file)
@@ -626,6 +626,7 @@ typedef struct _SubscriptionInfo
    char       *subconninfo;
    char       *subslotname;
    char       *subbinary;
+   char       *substream;
    char       *subsynccommit;
    char       *subpublications;
 } SubscriptionInfo;
index 0266fc5fa85832edde195ae0a53634c918174c74..0861d74a6fe0ca95d36fbb3729871144caf52f09 100644 (file)
@@ -5979,7 +5979,7 @@ describeSubscriptions(const char *pattern, bool verbose)
    PGresult   *res;
    printQueryOpt myopt = pset.popt;
    static const bool translate_columns[] = {false, false, false, false,
-   false, false, false};
+   false, false, false, false};
 
    if (pset.sversion < 100000)
    {
@@ -6005,11 +6005,13 @@ describeSubscriptions(const char *pattern, bool verbose)
 
    if (verbose)
    {
-       /* Binary mode is only supported in v14 and higher */
+       /* Binary mode and  are only supported in v14 and higher */
        if (pset.sversion >= 140000)
            appendPQExpBuffer(&buf,
-                             ", subbinary AS \"%s\"\n",
-                             gettext_noop("Binary"));
+                             ", subbinary AS \"%s\"\n"
+                             ", substream AS \"%s\"\n",
+                             gettext_noop("Binary"),
+                             gettext_noop(""));
 
        appendPQExpBuffer(&buf,
                          ",  subsynccommit AS \"%s\"\n"
index c807f83baddd211c6cda5309801cc0556b2bd543..0bbe0a122afd3c0cd79579023fe82e09e3642e90 100644 (file)
@@ -53,6 +53,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202009021
+#define CATALOG_VERSION_NO 202009031
 
 #endif
index 9795c35000d89d870d711338995c3402de232d67..9ebec7bf0bff03878e7064ba6f95a278cf3b7e91 100644 (file)
@@ -51,6 +51,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW
    bool        subbinary;      /* True if the subscription wants the
                                 * publisher to send data in binary */
 
+   bool        substream;      /* Stream in-progress transactions. */
+
 #ifdef CATALOG_VARLEN          /* variable-length fields start here */
    /* Connection string to the publisher */
    text        subconninfo BKI_FORCE_NOT_NULL;
@@ -78,6 +80,7 @@ typedef struct Subscription
    bool        enabled;        /* Indicates if the subscription is enabled */
    bool        binary;         /* Indicates if the subscription wants data in
                                 * binary format */
+   bool        stream;         /* Allow  in-progress transactions. */
    char       *conninfo;       /* Connection string to the publisher */
    char       *slotname;       /* Name of the replication slot */
    char       *synccommit;     /* Synchronous commit setting for worker */
index 807a9c1edf6e8caf0f3e9c3b08763e7059285127..0dfbac46b4b06b82ee43e5981794fb3ab553dcf1 100644 (file)
@@ -982,7 +982,11 @@ typedef enum
    WAIT_EVENT_WAL_READ,
    WAIT_EVENT_WAL_SYNC,
    WAIT_EVENT_WAL_SYNC_METHOD_ASSIGN,
-   WAIT_EVENT_WAL_WRITE
+   WAIT_EVENT_WAL_WRITE,
+   WAIT_EVENT_LOGICAL_CHANGES_READ,
+   WAIT_EVENT_LOGICAL_CHANGES_WRITE,
+   WAIT_EVENT_LOGICAL_SUBXACT_READ,
+   WAIT_EVENT_LOGICAL_SUBXACT_WRITE
 } WaitEventIO;
 
 /* ----------
index 60a76bc85cf9618b2abc3c5d2185ca72c1da52b9..53905ee6080f4bd88c537b47a3adc2f2cac1d444 100644 (file)
  * we can support. LOGICALREP_PROTO_MIN_VERSION_NUM is the oldest version we
  * have backwards compatibility for. The client requests protocol version at
  * connect time.
+ *
+ * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with
+ * support for  large transactions.
  */
 #define LOGICALREP_PROTO_MIN_VERSION_NUM 1
-#define LOGICALREP_PROTO_VERSION_NUM 1
+#define LOGICALREP_PROTO_STREAM_VERSION_NUM 2
+#define LOGICALREP_PROTO_VERSION_NUM 2
 
 /*
  * This struct stores a tuple received via logical replication.
@@ -98,25 +102,45 @@ extern void logicalrep_read_commit(StringInfo in,
 extern void logicalrep_write_origin(StringInfo out, const char *origin,
                                    XLogRecPtr origin_lsn);
 extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn);
-extern void logicalrep_write_insert(StringInfo out, Relation rel,
-                                   HeapTuple newtuple, bool binary);
+extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
+                                   Relation rel, HeapTuple newtuple,
+                                   bool binary);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
-extern void logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
+extern void logicalrep_write_update(StringInfo out, TransactionId xid,
+                                   Relation rel, HeapTuple oldtuple,
                                    HeapTuple newtuple, bool binary);
 extern LogicalRepRelId logicalrep_read_update(StringInfo in,
                                              bool *has_oldtuple, LogicalRepTupleData *oldtup,
                                              LogicalRepTupleData *newtup);
-extern void logicalrep_write_delete(StringInfo out, Relation rel,
-                                   HeapTuple oldtuple, bool binary);
+extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
+                                   Relation rel, HeapTuple oldtuple,
+                                   bool binary);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
                                              LogicalRepTupleData *oldtup);
-extern void logicalrep_write_truncate(StringInfo out, int nrelids, Oid relids[],
+extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
+                                     int nrelids, Oid relids[],
                                      bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
                                      bool *cascade, bool *restart_seqs);
-extern void logicalrep_write_rel(StringInfo out, Relation rel);
+extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
+                                Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
-extern void logicalrep_write_typ(StringInfo out, Oid typoid);
+extern void logicalrep_write_typ(StringInfo out, TransactionId xid,
+                                Oid typoid);
 extern void logicalrep_read_typ(StringInfo out, LogicalRepTyp *ltyp);
+extern void logicalrep_write_stream_start(StringInfo out, TransactionId xid,
+                                         bool first_segment);
+extern TransactionId logicalrep_read_stream_start(StringInfo in,
+                                                 bool *first_segment);
+extern void logicalrep_write_stream_stop(StringInfo out);
+extern TransactionId logicalrep_read_stream_stop(StringInfo in);
+extern void logicalrep_write_stream_commit(StringInfo out, ReorderBufferTXN *txn,
+                                          XLogRecPtr commit_lsn);
+extern TransactionId logicalrep_read_stream_commit(StringInfo out,
+                                                  LogicalRepCommitData *commit_data);
+extern void logicalrep_write_stream_abort(StringInfo out, TransactionId xid,
+                                         TransactionId subxid);
+extern void logicalrep_read_stream_abort(StringInfo in, TransactionId *xid,
+                                        TransactionId *subxid);
 
 #endif                         /* LOGICAL_PROTO_H */
index c2d5dbee5491622137b1dcbd4150096be644a893..1b05b39df4bd1a69a491d1ccf6653ec85a224a22 100644 (file)
@@ -178,6 +178,7 @@ typedef struct
            uint32      proto_version;  /* Logical protocol version */
            List       *publication_names;  /* String list of publications */
            bool        binary; /* Ask publisher to use binary */
+           bool        ;  /*  of large transactions */
        }           logical;
    }           proto;
 } WalRcvStreamOptions;
index d71db0d520748375037e522cc8ae009989766a27..2fa9bce66a422273316e22812708ee31c0d08a8e 100644 (file)
@@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar';
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                      List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off                | dbname=regress_doesnotexist
+                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
@@ -91,10 +91,10 @@ ERROR:  subscription "regress_doesnotexist" does not exist
 ALTER SUBSCRIPTION regress_testsub SET (create_slot = false);
 ERROR:  unrecognized subscription parameter: "create_slot"
 \dRs+
-                                                          List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Synchronous commit |           Conninfo           
------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | off                | dbname=regress_doesnotexist2
+                                                                List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Synchronous commit |           Conninfo           
+-----------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | off                | dbname=regress_doesnotexist2
 (1 row)
 
 BEGIN;
@@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar);
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                            List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Synchronous commit |           Conninfo           
----------------------+---------------------------+---------+---------------------+--------+--------------------+------------------------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | local              | dbname=regress_doesnotexist2
+                                                                  List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Synchronous commit |           Conninfo           
+---------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | f         | local              | dbname=regress_doesnotexist2
 (1 row)
 
 -- rename back to keep the rest simple
@@ -162,19 +162,42 @@ ERROR:  binary requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true);
 WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
 \dRs+
-                                                      List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | off                | dbname=regress_doesnotexist
+                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | f         | off                | dbname=regress_doesnotexist
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                      List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Synchronous commit |          Conninfo           
------------------+---------------------------+---------+-------------+--------+--------------------+-----------------------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off                | dbname=regress_doesnotexist
+                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary |  | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
+(1 row)
+
+DROP SUBSCRIPTION regress_testsub;
+-- fail -  must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false,  = foo);
+ERROR:   requires a Boolean value
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false,  = true);
+WARNING:  tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables
+\dRs+
+                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary |  | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | t         | off                | dbname=regress_doesnotexist
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET ( = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+\dRs+
+                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary |  | Synchronous commit |          Conninfo           
+-----------------+---------------------------+---------+-------------+--------+-----------+--------------------+-----------------------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | f         | off                | dbname=regress_doesnotexist
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
index eeb2ec06ebed24f4fedee1d1b547a9b842e52310..14fa0b247e1b24e0f9d55ec9474b82cdeba11e52 100644 (file)
@@ -132,6 +132,21 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail -  must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false,  = foo);
+
+-- now it works
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false,  = true);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET ( = false);
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+
+\dRs+
+
+DROP SUBSCRIPTION regress_testsub;
+
 RESET SESSION AUTHORIZATION;
 DROP ROLE regress_subscription_user;
 DROP ROLE regress_subscription_user2;
diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl
new file mode 100644 (file)
index 0000000..fffe001
--- /dev/null
@@ -0,0 +1,98 @@
+# Test  of simple large transaction
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 4;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_ => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_ => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE test_tab (a int primary key, b varchar)");
+$node_publisher->safe_psql('postgres',
+   "INSERT INTO test_tab VALUES (1, 'foo'), (2, 'bar')");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab");
+
+my $appname = 'tap_sub';
+$node_subscriber->safe_psql('postgres',
+"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH ( = on)"
+);
+
+$node_publisher->wait_for_catchup($appname);
+
+# Also wait for initial table sync to finish
+my $synced_query =
+"SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+my $result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(2|2|2), 'check initial data was copied to subscriber');
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(3334|3334|3334), 'check extra columns contain local defaults');
+
+# Test the  in binary mode
+$node_subscriber->safe_psql('postgres',
+"ALTER SUBSCRIPTION tap_sub SET (binary = on)"
+);
+
+# Insert, update and delete enough rows to exceed the 64kB limit.
+$node_publisher->safe_psql('postgres', q{
+BEGIN;
+INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i);
+UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0;
+DELETE FROM test_tab WHERE mod(a,3) = 0;
+COMMIT;
+});
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab");
+is($result, qq(6667|6667|6667), 'check extra columns contain local defaults');
+
+# Change the local values of the extra columns on the subscriber,
+# update publisher, and check that subscriber retains the expected
+# values. This is to ensure that non- transactions behave
+# properly after a  transaction.
+$node_subscriber->safe_psql('postgres', "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'");
+$node_publisher->safe_psql('postgres', "UPDATE test_tab SET b = md5(a::text)");
+
+$node_publisher->wait_for_catchup($appname);
+
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab");
+is($result, qq(6667|6667|6667), 'check extra columns contain locally changed data');
+
+$node_subscriber->stop;
+$node_publisher->stop;
index 3d990463ce9cfae97b17ddee0a366a559730f113..500623e230199d2ffb01301046b26e073fd2d9bc 100644 (file)
@@ -111,6 +111,7 @@ Append
 AppendPath
 AppendRelInfo
 AppendState
+ApplySubXactData
 Archive
 ArchiveEntryPtrType
 ArchiveFormat
@@ -2370,6 +2371,7 @@ StopList
 StopWorkersData
 StrategyNumber
 StreamCtl
+StreamXidHash
 StringInfo
 StringInfoData
 StripnullState
@@ -2380,6 +2382,7 @@ SubPlanState
 SubTransactionId
 SubXactCallback
 SubXactCallbackItem
+SubXactInfo
 SubXactEvent
 SubplanResultRelHashElem
 SubqueryScan