WalRcvExecResult *res;
StringInfoData cmd;
TupleTableSlot *slot;
- Oid tableRow[2] = {OIDOID, CHAROID};
- Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
+ Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
+ Oid attrRow[] = {TEXTOID, OIDOID, INT4OID, BOOLOID};
bool isnull;
int natt;
/* First fetch Oid and replica identity. */
initStringInfo(&cmd);
- appendStringInfo(&cmd, "SELECT c.oid, c.relreplident"
+ appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind"
" FROM pg_catalog.pg_class c"
" INNER JOIN pg_catalog.pg_namespace n"
" ON (c.relnamespace = n.oid)"
" WHERE n.nspname = %s"
- " AND c.relname = %s"
- " AND c.relkind = 'r'",
+ " AND c.relname = %s",
quote_literal_cstr(nspname),
quote_literal_cstr(relname));
- res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
+ res = walrcv_exec(wrconn, cmd.data, lengthof(tableRow), tableRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
Assert(!isnull);
lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull));
Assert(!isnull);
+ lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull));
+ Assert(!isnull);
ExecDropSingleTupleTableSlot(slot);
walrcv_clear_result(res);
lrel->remoteid,
(walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
lrel->remoteid);
- res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
+ res = walrcv_exec(wrconn, cmd.data, lengthof(attrRow), attrRow);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
/* Start copy on the publisher. */
initStringInfo(&cmd);
- appendStringInfo(&cmd, "COPY %s TO STDOUT",
- quote_qualified_identifier(lrel.nspname, lrel.relname));
+ if (lrel.relkind == RELKIND_RELATION)
+ appendStringInfo(&cmd, "COPY %s TO STDOUT",
+ quote_qualified_identifier(lrel.nspname, lrel.relname));
+ else
+ {
+ /*
+ * For non-tables, we need to do COPY (SELECT ...), but we can't just
+ * do SELECT * because we need to not copy generated columns.
+ */
+ appendStringInfo(&cmd, "COPY (SELECT ");
+ for (int i = 0; i < lrel.natts; i++)
+ {
+ appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i]));
+ if (i < lrel.natts - 1)
+ appendStringInfoString(&cmd, ", ");
+ }
+ appendStringInfo(&cmd, " FROM %s) TO STDOUT",
+ quote_qualified_identifier(lrel.nspname, lrel.relname));
+ }
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
pfree(cmd.data);
if (res->status != WALRCV_OK_COPY_OUT)