<para>
Next, the following message part appears for each column included in
- the publication (except generated columns):
+ the publication:
</para>
<variablelist>
</variablelist>
<para>
- Next, one of the following submessages appears for each column (except generated columns):
+ Next, one of the following submessages appears for each column:
<variablelist>
<varlistentry>
<para>
When a column list is specified, only the named columns are replicated.
- If no column list is specified, all columns of the table are replicated
+ The column list can contain generated columns as well. If no column list
+ is specified, all table columns (except generated columns) are replicated
through this publication, including any columns added later. It has no
effect on <literal>TRUNCATE</literal> commands. See
<xref linkend="logical-replication-col-lists"/> for details about column
* pub_collist_validate
* Process and validate the 'columns' list and ensure the columns are all
* valid to use for a publication. Checks for and raises an ERROR for
- * any; unknown columns, system columns, duplicate columns or generated
- * columns.
+ * any unknown columns, system columns, or duplicate columns.
*
* Looks up each column's attnum and returns a 0-based Bitmapset of the
* corresponding attnums.
{
Bitmapset *set = NULL;
ListCell *lc;
- TupleDesc tupdesc = RelationGetDescr(targetrel);
foreach(lc, columns)
{
errmsg("cannot use system column \"%s\" in publication column list",
colname));
- if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
- errmsg("cannot use generated column \"%s\" in publication column list",
- colname));
-
if (bms_is_member(attnum, set))
ereport(ERROR,
errcode(ERRCODE_DUPLICATE_OBJECT),
static void logicalrep_write_namespace(StringInfo out, Oid nspid);
static const char *logicalrep_read_namespace(StringInfo in);
-/*
- * Check if a column is covered by a column list.
- *
- * Need to be careful about NULL, which is treated as a column list covering
- * all columns.
- */
-static bool
-column_in_column_list(int attnum, Bitmapset *columns)
-{
- return (columns == NULL || bms_is_member(attnum, columns));
-}
-
-
/*
* Write BEGIN to the output stream.
*/
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
- continue;
-
- if (!column_in_column_list(att->attnum, columns))
+ if (!logicalrep_should_publish_column(att, columns))
continue;
nliveatts++;
Form_pg_type typclass;
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
- continue;
-
- if (!column_in_column_list(att->attnum, columns))
+ if (!logicalrep_should_publish_column(att, columns))
continue;
if (isnull[i])
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
- continue;
-
- if (!column_in_column_list(att->attnum, columns))
+ if (!logicalrep_should_publish_column(att, columns))
continue;
nliveatts++;
Form_pg_attribute att = TupleDescAttr(desc, i);
uint8 flags = 0;
- if (att->attisdropped || att->attgenerated)
- continue;
-
- if (!column_in_column_list(att->attnum, columns))
+ if (!logicalrep_should_publish_column(att, columns))
continue;
/* REPLICA IDENTITY FULL means all columns are sent as part of key. */
return err_unknown;
}
+
+/*
+ * Check if the column 'att' of a table should be published.
+ *
+ * 'columns' represents the column list specified for that table in the
+ * publication.
+ *
+ * Note that generated columns can be present only in 'columns' list.
+ */
+bool
+logicalrep_should_publish_column(Form_pg_attribute att, Bitmapset *columns)
+{
+ if (att->attisdropped)
+ return false;
+
+ /*
+ * Skip publishing generated columns if they are not included in the
+ * column list.
+ */
+ if (!columns && att->attgenerated)
+ return false;
+
+ /*
+ * Check if a column is covered by a column list.
+ */
+ if (columns && !bms_is_member(att->attnum, columns))
+ return false;
+
+ return true;
+}
/*
* Get information about remote relation in similar fashion the RELATION
- * message provides during replication. This function also returns the relation
- * qualifications to be used in the COPY command.
+ * message provides during replication.
+ *
+ * This function also returns (a) the relation qualifications to be used in
+ * the COPY command, and (b) whether the remote relation has published any
+ * generated column.
*/
static void
-fetch_remote_table_info(char *nspname, char *relname,
- LogicalRepRelation *lrel, List **qual)
+fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel,
+ List **qual, bool *gencol_published)
{
WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
- Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID};
+ Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
Oid qualRow[] = {TEXTOID};
bool isnull;
int natt;
StringInfo pub_names = NULL;
Bitmapset *included_cols = NULL;
+ int server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
lrel->nspname = nspname;
lrel->relname = relname;
* We need to do this before fetching info about column names and types,
* so that we can skip columns that should not be replicated.
*/
- if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ if (server_version >= 150000)
{
WalRcvExecResult *pubres;
TupleTableSlot *tslot;
"SELECT a.attnum,"
" a.attname,"
" a.atttypid,"
- " a.attnum = ANY(i.indkey)"
+ " a.attnum = ANY(i.indkey)");
+
+ /* Generated columns can be replicated since version 18. */
+ if (server_version >= 180000)
+ appendStringInfo(&cmd, ", a.attgenerated != ''");
+
+ appendStringInfo(&cmd,
" FROM pg_catalog.pg_attribute a"
" LEFT JOIN pg_catalog.pg_index i"
" ON (i.indexrelid = pg_get_replica_identity_index(%u))"
" AND a.attrelid = %u"
" ORDER BY a.attnum",
lrel->remoteid,
- (walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
+ (server_version >= 120000 && server_version < 180000 ?
"AND a.attgenerated = ''" : ""),
lrel->remoteid);
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
- lengthof(attrRow), attrRow);
+ server_version >= 180000 ? lengthof(attrRow) : lengthof(attrRow) - 1, attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
if (DatumGetBool(slot_getattr(slot, 4, &isnull)))
lrel->attkeys = bms_add_member(lrel->attkeys, natt);
+ /* Remember if the remote table has published any generated column. */
+ if (server_version >= 180000 && !(*gencol_published))
+ {
+ *gencol_published = DatumGetBool(slot_getattr(slot, 5, &isnull));
+ Assert(!isnull);
+ }
+
/* Should never happen. */
if (++natt >= MaxTupleAttributeNumber)
elog(ERROR, "too many columns in remote table \"%s.%s\"",
* 3) one of the subscribed publications is declared as TABLES IN SCHEMA
* that includes this relation
*/
- if (walrcv_server_version(LogRepWorkerWalRcvConn) >= 150000)
+ if (server_version >= 150000)
{
/* Reuse the already-built pub_names. */
Assert(pub_names != NULL);
List *attnamelist;
ParseState *pstate;
List *options = NIL;
+ bool gencol_published = false;
/* Get the publisher relation info. */
fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)),
- RelationGetRelationName(rel), &lrel, &qual);
+ RelationGetRelationName(rel), &lrel, &qual,
+ &gencol_published);
/* Put the relation into relmap. */
logicalrep_relmap_update(&lrel);
/* Start copy on the publisher. */
initStringInfo(&cmd);
- /* Regular table with no row filter */
- if (lrel.relkind == RELKIND_RELATION && qual == NIL)
+ /* Regular table with no row filter or generated columns */
+ if (lrel.relkind == RELKIND_RELATION && qual == NIL && !gencol_published)
{
appendStringInfo(&cmd, "COPY %s",
quote_qualified_identifier(lrel.nspname, lrel.relname));
{
/*
* For non-tables and tables with row filters, we need to do COPY
- * (SELECT ...), but we can't just do SELECT * because we need to not
- * copy generated columns. For tables with any row filters, build a
- * SELECT query with OR'ed row filters for COPY.
+ * (SELECT ...), but we can't just do SELECT * because we may need to
+ * copy only subset of columns including generated columns. For tables
+ * with any row filters, build a SELECT query with OR'ed row filters
+ * for COPY.
+ *
+ * We also need to use this same COPY (SELECT ...) syntax when
+ * generated columns are published, because copy of generated columns
+ * is not supported by the normal COPY.
*/
appendStringInfoString(&cmd, "COPY (SELECT ");
for (int i = 0; i < lrel.natts; i++)
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
+ if (!logicalrep_should_publish_column(att, columns))
continue;
if (att->atttypid < FirstGenbkiObjectId)
continue;
- /* Skip this attribute if it's not present in the column list */
- if (columns != NULL && !bms_is_member(att->attnum, columns))
- continue;
-
OutputPluginPrepareWrite(ctx, false);
logicalrep_write_typ(ctx->out, xid, att->atttypid);
OutputPluginWrite(ctx, false);
int i;
int nliveatts = 0;
TupleDesc desc = RelationGetDescr(relation);
+ bool att_gen_present = false;
pgoutput_ensure_entry_cxt(data, entry);
{
Form_pg_attribute att = TupleDescAttr(desc, i);
- if (att->attisdropped || att->attgenerated)
+ if (att->attisdropped)
continue;
+ if (att->attgenerated)
+ {
+ /*
+ * Generated cols are skipped unless they are
+ * present in a column list.
+ */
+ if (!bms_is_member(att->attnum, cols))
+ continue;
+
+ att_gen_present = true;
+ }
+
nliveatts++;
}
/*
- * If column list includes all the columns of the table,
- * set it to NULL.
+ * Generated attributes are published only when they are
+ * present in the column list. Otherwise, a NULL column
+ * list means publish all columns.
*/
- if (bms_num_members(cols) == nliveatts)
+ if (!att_gen_present && bms_num_members(cols) == nliveatts)
{
bms_free(cols);
cols = NULL;
LogicalRepStreamAbortData *abort_data,
bool read_abort_info);
extern const char *logicalrep_message_type(LogicalRepMsgType action);
+extern bool logicalrep_should_publish_column(Form_pg_attribute att,
+ Bitmapset *columns);
#endif /* LOGICAL_PROTO_H */
ERROR: cannot update table "testpub_tbl5"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
-ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
-ERROR: cannot use generated column "d" in publication column list
-- error: system attributes "ctid" not allowed in column list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
ERROR: cannot use system column "ctid" in publication column list
ERROR: cannot update table "testpub_tbl5"
DETAIL: Column list used by the publication does not cover the replica identity.
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+-- ok: generated column "d" can be in the list too
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
-- error: change the replica identity to "b", and column list to (a, c)
-- then update fails, because (a, c) does not cover replica identity
ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (b, c);
UPDATE testpub_tbl5 SET a = 1;
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
--- error: generated column "d" can't be in list
-ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
-- error: system attributes "ctid" not allowed in column list
ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl1 (id, ctid);
UPDATE testpub_tbl5 SET a = 1;
ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+-- ok: generated column "d" can be in the list too
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
+ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
+
-- error: change the replica identity to "b", and column list to (a, c)
-- then update fails, because (a, c) does not cover replica identity
ALTER TABLE testpub_tbl5 REPLICA IDENTITY USING INDEX testpub_tbl5_b_key;
is( $result, qq(t
t), 'check the number of columns in the old tuple');
-# TEST: Generated and dropped columns are not considered for the column list.
-# So, the publication having a column list except for those columns and a
-# publication without any column (aka all columns as part of the columns
+# TEST: Dropped columns are not considered for the column list, and generated
+# columns are not replicated if they are not explicitly included in the column
+# list. So, the publication having a column list except for those columns and a
+# publication without any column list (aka all columns as part of the columns
# list) are considered to have the same column list.
$node_publisher->safe_psql(
'postgres', qq(
qr/cannot use different column lists for table "public.test_mix_1" in different publications/,
'different column lists detected');
+# TEST: Generated columns are considered for the column list.
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE test_gen (a int PRIMARY KEY, b int GENERATED ALWAYS AS (a + 1) STORED);
+ INSERT INTO test_gen VALUES (0);
+ CREATE PUBLICATION pub_gen FOR TABLE test_gen (a, b);
+));
+
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE test_gen (a int PRIMARY KEY, b int);
+ CREATE SUBSCRIPTION sub_gen CONNECTION '$publisher_connstr' PUBLICATION pub_gen;
+));
+
+$node_subscriber->wait_for_subscription_sync;
+
+is( $node_subscriber->safe_psql(
+ 'postgres', "SELECT * FROM test_gen ORDER BY a"),
+ qq(0|1),
+ 'initial replication with generated columns in column list');
+
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ INSERT INTO test_gen VALUES (1);
+));
+
+$node_publisher->wait_for_catchup('sub_gen');
+
+is( $node_subscriber->safe_psql(
+ 'postgres', "SELECT * FROM test_gen ORDER BY a"),
+ qq(0|1
+1|2),
+ 'replication with generated columns in column list');
+
# TEST: If the column list is changed after creating the subscription, we
# should catch the error reported by walsender.