Raise a warning if there is a possibility of data from multiple origins.
authorAmit Kapila <[email protected]>
Thu, 8 Sep 2022 01:24:13 +0000 (06:54 +0530)
committerAmit Kapila <[email protected]>
Thu, 8 Sep 2022 01:24:13 +0000 (06:54 +0530)
This commit raises a warning message for a combination of options
('copy_data = true' and 'origin = none') during CREATE/ALTER subscription
operations if the publication tables were also replicated from other
publishers.

During replication, we can skip the data from other origins as we have that
information in WAL but that is not possible during initial sync so we raise
a warning if there is such a possibility.

Author: Vignesh C
Reviewed-By: Peter Smith, Amit Kapila, Jonathan Katz, Shi yu, Wang wei
Discussion: https://www.postgresql.org/message-id/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubhjcQ@mail.gmail.com

doc/src/sgml/ref/alter_subscription.sgml
doc/src/sgml/ref/create_subscription.sgml
src/backend/commands/subscriptioncmds.c
src/test/subscription/t/030_origin.pl

index 64efc21f53769dd496155d5931ba3d7a88d65aa8..1e8d72062b3575d09171bb39f479bc9438ea96a3 100644 (file)
@@ -172,6 +172,11 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
           Previously subscribed tables are not copied, even if a table's row
           filter <literal>WHERE</literal> clause has since been modified.
          </para>
+         <para>
+          See <xref linkend="sql-createsubscription-notes"/> for details of
+          how <literal>copy_data = true</literal> can interact with the
+          <literal>origin</literal> parameter.
+         </para>
         </listitem>
        </varlistentry>
       </variablelist></para>
index 7390c715bc30b72cd53cd49008303504e7d2d835..4e001f811195b0dd73e337d72b7e7d84163be2dd 100644 (file)
@@ -213,6 +213,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           will affect what data is copied. Refer to the
           <xref linkend="sql-createsubscription-notes" /> for details.
          </para>
+         <para>
+          See <xref linkend="sql-createsubscription-notes"/> for details of how
+          <literal>copy_data = true</literal> can interact with the
+          <literal>origin</literal> parameter.
+         </para>
         </listitem>
        </varlistentry>
 
@@ -315,6 +320,11 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
           to <literal>any</literal> means that the publisher sends changes
           regardless of their origin. The default is <literal>any</literal>.
          </para>
+         <para>
+          See <xref linkend="sql-createsubscription-notes"/> for details of how
+          <literal>copy_data = true</literal> can interact with the
+          <literal>origin</literal> parameter.
+         </para>
         </listitem>
        </varlistentry>
       </variablelist></para>
@@ -386,6 +396,31 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
    can have non-existent publications.
   </para>
 
+  <para>
+   When using a subscription parameter combination of
+   <literal>copy_data = true</literal> and <literal>origin = NONE</literal>,
+   the initial sync table data is copied directly from the publisher, meaning
+   that knowledge of the true origin of that data is not possible. If the
+   publisher also has subscriptions then the copied table data might have
+   originated from further upstream. This scenario is detected and a WARNING is
+   logged to the user, but the warning is only an indication of a potential
+   problem; it is the user's responsibility to make the necessary checks to
+   ensure the copied data origins are really as wanted or not.
+  </para>
+
+  <para>
+   To find which tables might potentially include non-local origins (due to
+   other subscriptions created on the publisher) try this SQL query:
+<programlisting>
+# substitute &lt;pub-names&gt; below with your publication name(s) to be queried
+SELECT DISTINCT N.nspname AS schemaname, C.relname AS tablename
+FROM pg_publication P,
+     LATERAL pg_get_publication_tables(P.pubname) GPT
+     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),
+     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+WHERE C.oid = GPT.relid AND P.pubname IN (&lt;pub-names&gt;);
+</programlisting></para>
+
  </refsect1>
 
  <refsect1>
index f87796e5afefde307ab8af3805c6a94f278843ff..66d800f0cff11aa27ca75363d80d8442ec49e0f0 100644 (file)
@@ -92,6 +92,10 @@ typedef struct SubOpts
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_publications_origin(WalReceiverConn *wrconn,
+                                     List *publications, bool copydata,
+                                     char *origin, Oid *subrel_local_oids,
+                                     int subrel_count, char *subname);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
@@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
        PG_TRY();
        {
            check_publications(wrconn, publications);
+           check_publications_origin(wrconn, publications, opts.copy_data,
+                                     opts.origin, NULL, 0, stmt->subname);
 
            /*
             * Set sync state based on if we were asked to do data copy or
@@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
    ListCell   *lc;
    int         off;
    int         remove_rel_len;
+   int         subrel_count;
    Relation    rel = NULL;
    typedef struct SubRemoveRels
    {
@@ -815,13 +822,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
        /* Get local table list. */
        subrel_states = GetSubscriptionRelations(sub->oid, false);
+       subrel_count = list_length(subrel_states);
 
        /*
         * Build qsorted array of local table oids for faster lookup. This can
         * potentially contain all tables in the database so speed of lookup
         * is important.
         */
-       subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+       subrel_local_oids = palloc(subrel_count * sizeof(Oid));
        off = 0;
        foreach(lc, subrel_states)
        {
@@ -829,14 +837,18 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
            subrel_local_oids[off++] = relstate->relid;
        }
-       qsort(subrel_local_oids, list_length(subrel_states),
+       qsort(subrel_local_oids, subrel_count,
              sizeof(Oid), oid_cmp);
 
+       check_publications_origin(wrconn, sub->publications, copy_data,
+                                 sub->origin, subrel_local_oids,
+                                 subrel_count, sub->name);
+
        /*
         * Rels that we want to remove from subscription and drop any slots
         * and origins corresponding to them.
         */
-       sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+       sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
 
        /*
         * Walk over the remote tables and try to match them to locally known
@@ -862,7 +874,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
            pubrel_local_oids[off++] = relid;
 
            if (!bsearch(&relid, subrel_local_oids,
-                        list_length(subrel_states), sizeof(Oid), oid_cmp))
+                        subrel_count, sizeof(Oid), oid_cmp))
            {
                AddSubscriptionRelState(sub->oid, relid,
                                        copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
@@ -881,7 +893,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
              sizeof(Oid), oid_cmp);
 
        remove_rel_len = 0;
-       for (off = 0; off < list_length(subrel_states); off++)
+       for (off = 0; off < subrel_count; off++)
        {
            Oid         relid = subrel_local_oids[off];
 
@@ -1784,6 +1796,117 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId)
    table_close(rel, RowExclusiveLock);
 }
 
+/*
+ * Check and log a warning if the publisher has subscribed to the same table
+ * from some other publisher. This check is required only if "copy_data = true"
+ * and "origin = none" for CREATE SUBSCRIPTION and
+ * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
+ * having origin might have been copied.
+ *
+ * This check need not be performed on the tables that are already added
+ * because incremental sync for those tables will happen through WAL and the
+ * origin of the data can be identified from the WAL records.
+ *
+ * subrel_local_oids contains the list of relation oids that are already
+ * present on the subscriber.
+ */
+static void
+check_publications_origin(WalReceiverConn *wrconn, List *publications,
+                         bool copydata, char *origin, Oid *subrel_local_oids,
+                         int subrel_count, char *subname)
+{
+   WalRcvExecResult *res;
+   StringInfoData cmd;
+   TupleTableSlot *slot;
+   Oid         tableRow[1] = {TEXTOID};
+   List       *publist = NIL;
+   int         i;
+
+   if (!copydata || !origin ||
+       (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
+       return;
+
+   initStringInfo(&cmd);
+   appendStringInfoString(&cmd,
+                          "SELECT DISTINCT P.pubname AS pubname\n"
+                          "FROM pg_publication P,\n"
+                          "     LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+                          "     JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
+                          "     pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+                          "WHERE C.oid = GPT.relid AND P.pubname IN (");
+   get_publications_str(publications, &cmd, true);
+   appendStringInfoString(&cmd, ")\n");
+
+   /*
+    * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
+    * the list of relation oids that are already present on the subscriber.
+    * This check should be skipped for these tables.
+    */
+   for (i = 0; i < subrel_count; i++)
+   {
+       Oid         relid = subrel_local_oids[i];
+       char       *schemaname = get_namespace_name(get_rel_namespace(relid));
+       char       *tablename = get_rel_name(relid);
+
+       appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+                        schemaname, tablename);
+   }
+
+   res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+   pfree(cmd.data);
+
+   if (res->status != WALRCV_OK_TUPLES)
+       ereport(ERROR,
+               (errcode(ERRCODE_CONNECTION_FAILURE),
+                errmsg("could not receive list of replicated tables from the publisher: %s",
+                       res->err)));
+
+   /* Process tables. */
+   slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+   while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+   {
+       char       *pubname;
+       bool        isnull;
+
+       pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+       Assert(!isnull);
+
+       ExecClearTuple(slot);
+       publist = list_append_unique(publist, makeString(pubname));
+   }
+
+   /*
+    * Log a warning if the publisher has subscribed to the same table from
+    * some other publisher. We cannot know the origin of data during the
+    * initial sync. Data origins can be found only from the WAL by looking at
+    * the origin id.
+    *
+    * XXX: For simplicity, we don't check whether the table has any data or
+    * not. If the table doesn't have any data then we don't need to
+    * distinguish between data having origin and data not having origin so we
+    * can avoid logging a warning in that case.
+    */
+   if (publist)
+   {
+       StringInfo  pubnames = makeStringInfo();
+
+       /* Prepare the list of publication(s) for warning message. */
+       get_publications_str(publist, pubnames, false);
+       ereport(WARNING,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
+                      subname),
+               errdetail_plural("Subscribed publication %s is subscribing to other publications.",
+                                "Subscribed publications %s are subscribing to other publications.",
+                                list_length(publist), pubnames->data),
+               errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
+   }
+
+   ExecDropSingleTupleTableSlot(slot);
+
+   walrcv_clear_result(res);
+}
+
 /*
  * Get the list of tables which belong to specified publications on the
  * publisher connection.
index b297a51f7c7288496fdd2806a41b700da3164914..0a5cc4503bd6a47917b2705b2bf19124d90d76bb 100644 (file)
@@ -1,13 +1,23 @@
 
 # Copyright (c) 2021-2022, PostgreSQL Global Development Group
 
-# Test the CREATE SUBSCRIPTION 'origin' parameter.
+# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with
+# 'copy_data' parameter.
 use strict;
 use warnings;
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
+my $subname_AB  = 'tap_sub_A_B';
+my $subname_AB2 = 'tap_sub_A_B_2';
+my $subname_BA  = 'tap_sub_B_A';
+my $subname_BC  = 'tap_sub_B_C';
+
+my $result;
+my $stdout;
+my $stderr;
+
 ###############################################################################
 # Setup a bidirectional logical replication between node_A & node_B
 ###############################################################################
@@ -32,33 +42,29 @@ $node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
 # node_A (pub) -> node_B (sub)
 my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
 $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
-my $appname_B1 = 'tap_sub_B1';
 $node_B->safe_psql(
    'postgres', "
-   CREATE SUBSCRIPTION tap_sub_B1
-   CONNECTION '$node_A_connstr application_name=$appname_B1'
+   CREATE SUBSCRIPTION $subname_BA
+   CONNECTION '$node_A_connstr application_name=$subname_BA'
    PUBLICATION tap_pub_A
    WITH (origin = none)");
 
 # node_B (pub) -> node_A (sub)
 my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
 $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
-my $appname_A = 'tap_sub_A';
 $node_A->safe_psql(
    'postgres', "
-   CREATE SUBSCRIPTION tap_sub_A
-   CONNECTION '$node_B_connstr application_name=$appname_A'
+   CREATE SUBSCRIPTION $subname_AB
+   CONNECTION '$node_B_connstr application_name=$subname_AB'
    PUBLICATION tap_pub_B
    WITH (origin = none, copy_data = off)");
 
 # Wait for initial table sync to finish
-$node_A->wait_for_subscription_sync($node_B, $appname_A);
-$node_B->wait_for_subscription_sync($node_A, $appname_B1);
+$node_A->wait_for_subscription_sync($node_B, $subname_AB);
+$node_B->wait_for_subscription_sync($node_A, $subname_BA);
 
 is(1, 1, 'Bidirectional replication setup is complete');
 
-my $result;
-
 ###############################################################################
 # Check that bidirectional logical replication setup does not cause infinite
 # recursive insertion.
@@ -68,8 +74,8 @@ my $result;
 $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
 $node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");
 
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
 
 # check that transaction was committed on subscriber(s)
 $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
@@ -85,8 +91,8 @@ is( $result, qq(11
 
 $node_A->safe_psql('postgres', "DELETE FROM tab;");
 
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
 
 ###############################################################################
 # Check that remote data of node_B (that originated from node_C) is not
@@ -109,23 +115,20 @@ $node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)");
 # node_C (pub) -> node_B (sub)
 my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
 $node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");
-
-my $appname_B2 = 'tap_sub_B2';
 $node_B->safe_psql(
    'postgres', "
-   CREATE SUBSCRIPTION tap_sub_B2
-   CONNECTION '$node_C_connstr application_name=$appname_B2'
+   CREATE SUBSCRIPTION $subname_BC
+   CONNECTION '$node_C_connstr application_name=$subname_BC'
    PUBLICATION tap_pub_C
    WITH (origin = none)");
-
-$node_B->wait_for_subscription_sync($node_C, $appname_B2);
+$node_B->wait_for_subscription_sync($node_C, $subname_BC);
 
 # insert a record
 $node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
 
-$node_C->wait_for_catchup($appname_B2);
-$node_B->wait_for_catchup($appname_A);
-$node_A->wait_for_catchup($appname_B1);
+$node_C->wait_for_catchup($subname_BC);
+$node_B->wait_for_catchup($subname_AB);
+$node_A->wait_for_catchup($subname_BA);
 
 $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
 is($result, qq(32), 'The node_C data replicated to node_B');
@@ -136,6 +139,69 @@ is($result, qq(),
    'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
 );
 
+###############################################################################
+# Specifying origin = NONE indicates that the publisher should only replicate the
+# changes that are generated locally from node_B, but in this case since the
+# node_B is also subscribing data from node_A, node_B can have remotely
+# originated data from node_A. We log a warning, in this case, to draw
+# attention to there being possible remote data.
+###############################################################################
+($result, $stdout, $stderr) = $node_A->psql(
+   'postgres', "
+        CREATE SUBSCRIPTION $subname_AB2
+        CONNECTION '$node_B_connstr application_name=$subname_AB2'
+        PUBLICATION tap_pub_B
+        WITH (origin = none, copy_data = on)");
+like(
+   $stderr,
+   qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/,
+   "Create subscription with origin = none and copy_data when the publisher has subscribed same table"
+);
+
+$node_A->wait_for_subscription_sync($node_B, $subname_AB2);
+
+# Alter subscription ... refresh publication should be successful when no new
+# table is added
+$node_A->safe_psql(
+   'postgres', "
+        ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");
+
+# Check Alter subscription ... refresh publication when there is a new
+# table that is subscribing data from a different publication
+$node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
+$node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
+
+# add a new table to the publication
+$node_A->safe_psql('postgres',
+   "ALTER PUBLICATION tap_pub_A ADD TABLE tab_new");
+$node_B->safe_psql(
+   'postgres', "
+        ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION");
+
+$node_B->wait_for_subscription_sync($node_A, $subname_BA);
+
+# add a new table to the publication
+$node_B->safe_psql('postgres',
+   "ALTER PUBLICATION tap_pub_B ADD TABLE tab_new");
+
+# Alter subscription ... refresh publication should log a warning when a new
+# table in the publisher is subscribing data from a different publication
+($result, $stdout, $stderr) = $node_A->psql(
+   'postgres', "
+        ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");
+like(
+   $stderr,
+   qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/,
+   "Refresh publication when the publisher has subscribed for the new table, but the subscriber-side wants origin = none"
+);
+
+$node_A->wait_for_subscription_sync($node_B, $subname_AB2);
+
+# clear the operations done by this test
+$node_A->safe_psql('postgres', "DROP TABLE tab_new");
+$node_B->safe_psql('postgres', "DROP TABLE tab_new");
+$node_A->safe_psql('postgres', "DROP SUBSCRIPTION $subname_AB2");
+
 # shutdown
 $node_B->stop('fast');
 $node_A->stop('fast');