Prepare to support non-tables in publications
authorPeter Eisentraut <[email protected]>
Thu, 19 Mar 2020 07:17:50 +0000 (08:17 +0100)
committerPeter Eisentraut <[email protected]>
Thu, 19 Mar 2020 07:25:07 +0000 (08:25 +0100)
This by itself doesn't change any functionality but prepares the way
for having relations other than base tables in publications.

Make arrangements for COPY handling the initial table sync.  For
non-tables we have to use COPY (SELECT ...) instead of directly
copying from the table, but then we have to take care to omit
generated columns from the column list.

Also, remove a hardcoded reference to relkind = 'r' and rely on the
publisher to send only what it can actually publish, which will be
correct even in future cross-version scenarios.

Reviewed-by: Amit Langote <[email protected]>
Discussion: https://www.postgresql.org/message-id/flat/CA+HiwqH=Y85vRK3mOdjEkqFK+E=ST=eQiHdpj43L=_eJMOOznQ@mail.gmail.com

src/backend/replication/logical/tablesync.c
src/include/replication/logicalproto.h

index 98825f01e98663d377662bbde7fa8db868ae0376..a60c6661538a23e1b0a2add219daeec6d01510d5 100644 (file)
@@ -639,8 +639,8 @@ fetch_remote_table_info(char *nspname, char *relname,
    WalRcvExecResult *res;
    StringInfoData cmd;
    TupleTableSlot *slot;
-   Oid         tableRow[2] = {OIDOID, CHAROID};
-   Oid         attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
+   Oid         tableRow[] = {OIDOID, CHAROID, CHAROID};
+   Oid         attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
    bool        isnull;
    int         natt;
 
@@ -649,16 +649,15 @@ fetch_remote_table_info(char *nspname, char *relname,
 
    /* First fetch Oid and replica identity. */
    initStringInfo(&cmd);
-   appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
+   appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
                     "  FROM pg_catalog.pg_class c"
                     "  INNER JOIN pg_catalog.pg_namespace n"
                     "        ON (c.relnamespace = n.oid)"
                     " WHERE n.nspname = %s"
-                    "   AND c.relname = %s"
-                    "   AND c.relkind = 'r'",
+                    "   AND c.relname = %s",
                     quote_literal_cstr(nspname),
                     quote_literal_cstr(relname));
-   res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+   res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
 
    if (res->status != WALRCV_OK_TUPLES)
        ereport(ERROR,
@@ -675,6 +674,8 @@ fetch_remote_table_info(char *nspname, char *relname,
    Assert(!isnull);
    lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
    Assert(!isnull);
+   lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
+   Assert(!isnull);
 
    ExecDropSingleTupleTableSlot(slot);
    walrcv_clear_result(res);
@@ -696,7 +697,7 @@ fetch_remote_table_info(char *nspname, char *relname,
                     lrel->remoteid,
                     (walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
                     lrel->remoteid);
-   res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
+   res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
 
    if (res->status != WALRCV_OK_TUPLES)
        ereport(ERROR,
@@ -765,8 +766,25 @@ copy_table(Relation rel)
 
    /* Start copy on the publisher. */
    initStringInfo(&cmd);
-   appendStringInfo(&cmd, "COPY %s TO STDOUT",
-                    quote_qualified_identifier(lrel.nspname, lrel.relname));
+   if (lrel.relkind == RELKIND_RELATION)
+       appendStringInfo(&cmd, "COPY %s TO STDOUT",
+                        quote_qualified_identifier(lrel.nspname, lrel.relname));
+   else
+   {
+       /*
+        * For non-tables, we need to do COPY (SELECT ...), but we can't just
+        * do SELECT * because we need to not copy generated columns.
+        */
+       appendStringInfo(&cmd, "COPY (SELECT ");
+       for (int i = 0; i < lrel.natts; i++)
+       {
+           appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+           if (i < lrel.natts - 1)
+               appendStringInfoString(&cmd, ", ");
+       }
+       appendStringInfo(&cmd, " FROM %s) TO STDOUT",
+                        quote_qualified_identifier(lrel.nspname, lrel.relname));
+   }
    res = walrcv_exec(wrconn, cmd.data, 0, NULL);
    pfree(cmd.data);
    if (res->status != WALRCV_OK_COPY_OUT)
index 2cc2dc4db3ccd89bb4d3ea6849df4e9a9b5c2504..4860561be9f5de1f8cc4b05c8eda4c063c2009b0 100644 (file)
@@ -49,6 +49,7 @@ typedef struct LogicalRepRelation
    char      **attnames;       /* column names */
    Oid        *atttyps;        /* column types */
    char        replident;      /* replica identity */
+   char        relkind;        /* remote relation kind */
    Bitmapset  *attkeys;        /* Bitmap of key columns */
 } LogicalRepRelation;