Previously subscribed tables are not copied, even if a table's row
filter <literal>WHERE</literal> clause has since been modified.
</para>
+ <para>
+ See <xref linkend="sql-createsubscription-notes"/> for details of
+ how <literal>copy_data = true</literal> can interact with the
+ <literal>origin</literal> parameter.
+ </para>
</listitem>
</varlistentry>
</variablelist></para>
will affect what data is copied. Refer to the
<xref linkend="sql-createsubscription-notes" /> for details.
</para>
+ <para>
+ See <xref linkend="sql-createsubscription-notes"/> for details of how
+ <literal>copy_data = true</literal> can interact with the
+ <literal>origin</literal> parameter.
+ </para>
</listitem>
</varlistentry>
to <literal>any</literal> means that the publisher sends changes
regardless of their origin. The default is <literal>any</literal>.
</para>
+ <para>
+ See <xref linkend="sql-createsubscription-notes"/> for details of how
+ <literal>copy_data = true</literal> can interact with the
+ <literal>origin</literal> parameter.
+ </para>
</listitem>
</varlistentry>
</variablelist></para>
can have non-existent publications.
</para>
+ <para>
+ When using a subscription parameter combination of
+ <literal>copy_data = true</literal> and <literal>origin = NONE</literal>,
+ the initial sync table data is copied directly from the publisher, meaning
+ that knowledge of the true origin of that data is not possible. If the
+ publisher also has subscriptions then the copied table data might have
+ originated from further upstream. This scenario is detected and a WARNING is
+ logged to the user, but the warning is only an indication of a potential
+ problem; it is the user's responsibility to make the necessary checks to
+ ensure the copied data origins are really as wanted or not.
+ </para>
+
+ <para>
+ To find which tables might potentially include non-local origins (due to
+ other subscriptions created on the publisher) try this SQL query:
+<programlisting>
+# substitute <pub-names> below with your publication name(s) to be queried
+SELECT DISTINCT N.nspname AS schemaname, C.relname AS tablename
+FROM pg_publication P,
+ LATERAL pg_get_publication_tables(P.pubname) GPT
+ JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),
+ pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)
+WHERE C.oid = GPT.relid AND P.pubname IN (<pub-names>);
+</programlisting></para>
+
</refsect1>
<refsect1>
} SubOpts;
static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
+static void check_publications_origin(WalReceiverConn *wrconn,
+ List *publications, bool copydata,
+ char *origin, Oid *subrel_local_oids,
+ int subrel_count, char *subname);
static void check_duplicates_in_publist(List *publist, Datum *datums);
static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
PG_TRY();
{
check_publications(wrconn, publications);
+ check_publications_origin(wrconn, publications, opts.copy_data,
+ opts.origin, NULL, 0, stmt->subname);
/*
* Set sync state based on if we were asked to do data copy or
ListCell *lc;
int off;
int remove_rel_len;
+ int subrel_count;
Relation rel = NULL;
typedef struct SubRemoveRels
{
/* Get local table list. */
subrel_states = GetSubscriptionRelations(sub->oid, false);
+ subrel_count = list_length(subrel_states);
/*
* Build qsorted array of local table oids for faster lookup. This can
* potentially contain all tables in the database so speed of lookup
* is important.
*/
- subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid));
+ subrel_local_oids = palloc(subrel_count * sizeof(Oid));
off = 0;
foreach(lc, subrel_states)
{
subrel_local_oids[off++] = relstate->relid;
}
- qsort(subrel_local_oids, list_length(subrel_states),
+ qsort(subrel_local_oids, subrel_count,
sizeof(Oid), oid_cmp);
+ check_publications_origin(wrconn, sub->publications, copy_data,
+ sub->origin, subrel_local_oids,
+ subrel_count, sub->name);
+
/*
* Rels that we want to remove from subscription and drop any slots
* and origins corresponding to them.
*/
- sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels));
+ sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels));
/*
* Walk over the remote tables and try to match them to locally known
pubrel_local_oids[off++] = relid;
if (!bsearch(&relid, subrel_local_oids,
- list_length(subrel_states), sizeof(Oid), oid_cmp))
+ subrel_count, sizeof(Oid), oid_cmp))
{
AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
sizeof(Oid), oid_cmp);
remove_rel_len = 0;
- for (off = 0; off < list_length(subrel_states); off++)
+ for (off = 0; off < subrel_count; off++)
{
Oid relid = subrel_local_oids[off];
table_close(rel, RowExclusiveLock);
}
+/*
+ * Check and log a warning if the publisher has subscribed to the same table
+ * from some other publisher. This check is required only if "copy_data = true"
+ * and "origin = none" for CREATE SUBSCRIPTION and
+ * ALTER SUBSCRIPTION ... REFRESH statements to notify the user that data
+ * having origin might have been copied.
+ *
+ * This check need not be performed on the tables that are already added
+ * because incremental sync for those tables will happen through WAL and the
+ * origin of the data can be identified from the WAL records.
+ *
+ * subrel_local_oids contains the list of relation oids that are already
+ * present on the subscriber.
+ */
+static void
+check_publications_origin(WalReceiverConn *wrconn, List *publications,
+ bool copydata, char *origin, Oid *subrel_local_oids,
+ int subrel_count, char *subname)
+{
+ WalRcvExecResult *res;
+ StringInfoData cmd;
+ TupleTableSlot *slot;
+ Oid tableRow[1] = {TEXTOID};
+ List *publist = NIL;
+ int i;
+
+ if (!copydata || !origin ||
+ (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
+ return;
+
+ initStringInfo(&cmd);
+ appendStringInfoString(&cmd,
+ "SELECT DISTINCT P.pubname AS pubname\n"
+ "FROM pg_publication P,\n"
+ " LATERAL pg_get_publication_tables(P.pubname) GPT\n"
+ " JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n"
+ " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n"
+ "WHERE C.oid = GPT.relid AND P.pubname IN (");
+ get_publications_str(publications, &cmd, true);
+ appendStringInfoString(&cmd, ")\n");
+
+ /*
+ * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
+ * the list of relation oids that are already present on the subscriber.
+ * This check should be skipped for these tables.
+ */
+ for (i = 0; i < subrel_count; i++)
+ {
+ Oid relid = subrel_local_oids[i];
+ char *schemaname = get_namespace_name(get_rel_namespace(relid));
+ char *tablename = get_rel_name(relid);
+
+ appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+ schemaname, tablename);
+ }
+
+ res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
+ pfree(cmd.data);
+
+ if (res->status != WALRCV_OK_TUPLES)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("could not receive list of replicated tables from the publisher: %s",
+ res->err)));
+
+ /* Process tables. */
+ slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+ {
+ char *pubname;
+ bool isnull;
+
+ pubname = TextDatumGetCString(slot_getattr(slot, 1, &isnull));
+ Assert(!isnull);
+
+ ExecClearTuple(slot);
+ publist = list_append_unique(publist, makeString(pubname));
+ }
+
+ /*
+ * Log a warning if the publisher has subscribed to the same table from
+ * some other publisher. We cannot know the origin of data during the
+ * initial sync. Data origins can be found only from the WAL by looking at
+ * the origin id.
+ *
+ * XXX: For simplicity, we don't check whether the table has any data or
+ * not. If the table doesn't have any data then we don't need to
+ * distinguish between data having origin and data not having origin so we
+ * can avoid logging a warning in that case.
+ */
+ if (publist)
+ {
+ StringInfo pubnames = makeStringInfo();
+
+ /* Prepare the list of publication(s) for warning message. */
+ get_publications_str(publist, pubnames, false);
+ ereport(WARNING,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
+ subname),
+ errdetail_plural("Subscribed publication %s is subscribing to other publications.",
+ "Subscribed publications %s are subscribing to other publications.",
+ list_length(publist), pubnames->data),
+ errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
+ }
+
+ ExecDropSingleTupleTableSlot(slot);
+
+ walrcv_clear_result(res);
+}
+
/*
* Get the list of tables which belong to specified publications on the
* publisher connection.
# Copyright (c) 2021-2022, PostgreSQL Global Development Group
-# Test the CREATE SUBSCRIPTION 'origin' parameter.
+# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with
+# 'copy_data' parameter.
use strict;
use warnings;
use PostgreSQL::Test::Cluster;
use PostgreSQL::Test::Utils;
use Test::More;
+my $subname_AB = 'tap_sub_A_B';
+my $subname_AB2 = 'tap_sub_A_B_2';
+my $subname_BA = 'tap_sub_B_A';
+my $subname_BC = 'tap_sub_B_C';
+
+my $result;
+my $stdout;
+my $stderr;
+
###############################################################################
# Setup a bidirectional logical replication between node_A & node_B
###############################################################################
# node_A (pub) -> node_B (sub)
my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
-my $appname_B1 = 'tap_sub_B1';
$node_B->safe_psql(
'postgres', "
- CREATE SUBSCRIPTION tap_sub_B1
- CONNECTION '$node_A_connstr application_name=$appname_B1'
+ CREATE SUBSCRIPTION $subname_BA
+ CONNECTION '$node_A_connstr application_name=$subname_BA'
PUBLICATION tap_pub_A
WITH (origin = none)");
# node_B (pub) -> node_A (sub)
my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
-my $appname_A = 'tap_sub_A';
$node_A->safe_psql(
'postgres', "
- CREATE SUBSCRIPTION tap_sub_A
- CONNECTION '$node_B_connstr application_name=$appname_A'
+ CREATE SUBSCRIPTION $subname_AB
+ CONNECTION '$node_B_connstr application_name=$subname_AB'
PUBLICATION tap_pub_B
WITH (origin = none, copy_data = off)");
# Wait for initial table sync to finish
-$node_A->wait_for_subscription_sync($node_B, $appname_A);
-$node_B->wait_for_subscription_sync($node_A, $appname_B1);
+$node_A->wait_for_subscription_sync($node_B, $subname_AB);
+$node_B->wait_for_subscription_sync($node_A, $subname_BA);
is(1, 1, 'Bidirectional replication setup is complete');
-my $result;
-
###############################################################################
# Check that bidirectional logical replication setup does not cause infinite
# recursive insertion.
$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);");
$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);");
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
# check that transaction was committed on subscriber(s)
$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
$node_A->safe_psql('postgres', "DELETE FROM tab;");
-$node_A->wait_for_catchup($appname_B1);
-$node_B->wait_for_catchup($appname_A);
+$node_A->wait_for_catchup($subname_BA);
+$node_B->wait_for_catchup($subname_AB);
###############################################################################
# Check that remote data of node_B (that originated from node_C) is not
# node_C (pub) -> node_B (sub)
my $node_C_connstr = $node_C->connstr . ' dbname=postgres';
$node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab");
-
-my $appname_B2 = 'tap_sub_B2';
$node_B->safe_psql(
'postgres', "
- CREATE SUBSCRIPTION tap_sub_B2
- CONNECTION '$node_C_connstr application_name=$appname_B2'
+ CREATE SUBSCRIPTION $subname_BC
+ CONNECTION '$node_C_connstr application_name=$subname_BC'
PUBLICATION tap_pub_C
WITH (origin = none)");
-
-$node_B->wait_for_subscription_sync($node_C, $appname_B2);
+$node_B->wait_for_subscription_sync($node_C, $subname_BC);
# insert a record
$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);");
-$node_C->wait_for_catchup($appname_B2);
-$node_B->wait_for_catchup($appname_A);
-$node_A->wait_for_catchup($appname_B1);
+$node_C->wait_for_catchup($subname_BC);
+$node_B->wait_for_catchup($subname_AB);
+$node_A->wait_for_catchup($subname_BA);
$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;");
is($result, qq(32), 'The node_C data replicated to node_B');
'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none'
);
+###############################################################################
+# Specifying origin = NONE indicates that the publisher should only replicate the
+# changes that are generated locally from node_B, but in this case since the
+# node_B is also subscribing data from node_A, node_B can have remotely
+# originated data from node_A. We log a warning, in this case, to draw
+# attention to there being possible remote data.
+###############################################################################
+($result, $stdout, $stderr) = $node_A->psql(
+ 'postgres', "
+ CREATE SUBSCRIPTION $subname_AB2
+ CONNECTION '$node_B_connstr application_name=$subname_AB2'
+ PUBLICATION tap_pub_B
+ WITH (origin = none, copy_data = on)");
+like(
+ $stderr,
+ qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/,
+ "Create subscription with origin = none and copy_data when the publisher has subscribed same table"
+);
+
+$node_A->wait_for_subscription_sync($node_B, $subname_AB2);
+
+# Alter subscription ... refresh publication should be successful when no new
+# table is added
+$node_A->safe_psql(
+ 'postgres', "
+ ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");
+
+# Check Alter subscription ... refresh publication when there is a new
+# table that is subscribing data from a different publication
+$node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
+$node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)");
+
+# add a new table to the publication
+$node_A->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_A ADD TABLE tab_new");
+$node_B->safe_psql(
+ 'postgres', "
+ ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION");
+
+$node_B->wait_for_subscription_sync($node_A, $subname_BA);
+
+# add a new table to the publication
+$node_B->safe_psql('postgres',
+ "ALTER PUBLICATION tap_pub_B ADD TABLE tab_new");
+
+# Alter subscription ... refresh publication should log a warning when a new
+# table in the publisher is subscribing data from a different publication
+($result, $stdout, $stderr) = $node_A->psql(
+ 'postgres', "
+ ALTER SUBSCRIPTION $subname_AB2 REFRESH PUBLICATION");
+like(
+ $stderr,
+ qr/WARNING: ( [A-Z0-9]+:)? subscription "tap_sub_a_b_2" requested copy_data with origin = NONE but might copy data that had a different origin/,
+ "Refresh publication when the publisher has subscribed for the new table, but the subscriber-side wants origin = none"
+);
+
+$node_A->wait_for_subscription_sync($node_B, $subname_AB2);
+
+# clear the operations done by this test
+$node_A->safe_psql('postgres', "DROP TABLE tab_new");
+$node_B->safe_psql('postgres', "DROP TABLE tab_new");
+$node_A->safe_psql('postgres', "DROP SUBSCRIPTION $subname_AB2");
+
# shutdown
$node_B->stop('fast');
$node_A->stop('fast');