Improve ALTER PUBLICATION validation and error messages
authorDavid Rowley <[email protected]>
Thu, 15 Aug 2024 01:10:25 +0000 (13:10 +1200)
committerDavid Rowley <[email protected]>
Thu, 15 Aug 2024 01:10:25 +0000 (13:10 +1200)
Attempting to add a system column for a table to an existing publication
would result in the not very intuitive error message of:

ERROR:  negative bitmapset member not allowed

Here we improve that to have it display the same error message as a user
would see if they tried adding a system column for a table when adding
it to the publication in the first place.

Doing this requires making the function which validates the list of
columns an extern function.  The signature of the static function wasn't
an ideal external API as it made the code more complex than it needed to be.
Here we adjust the function to have it populate a Bitmapset of attribute
numbers.  Doing it this way allows code simplification.

There was no particular bug here other than the weird error message, so
no back.

Bug: #18558
Reported-by: Alexander Lakhin <[email protected]>
Author: Peter Smith, David Rowley
Discussion: https://postgr.es/m/18558-411bc81b03592125@postgresql.org

src/backend/catalog/pg_publication.c
src/backend/commands/publicationcmds.c
src/backend/commands/subscriptioncmds.c
src/include/catalog/pg_publication.h
src/test/regress/expected/publication.out
src/test/regress/sql/publication.sql

index 0602398a54546e63eef2a18b97fb7eb92f4456ad..7fe5fe2b86cfdc02640af2c1229529e490d7abe3 100644 (file)
@@ -48,9 +48,6 @@ typedef struct
                                                                 * table. */
 } published_rel;
 
-static void publication_translate_columns(Relation targetrel, List *columns,
-                                                                                 int *natts, AttrNumber **attrs);
-
 /*
  * Check if relation can be in given publication and throws appropriate
  * error if not.
@@ -351,6 +348,33 @@ GetTopMostAncestorInPublication(Oid puboid, List *ancestors, int *ancestor_level
        return topmost_relid;
 }
 
+/*
+ * attnumstoint2vector
+ *             Convert a Bitmapset of AttrNumbers into an int2vector.
+ *
+ * AttrNumber numbers are 0-based, i.e., not offset by
+ * FirstLowInvalidHeapAttributeNumber.
+ */
+static int2vector *
+attnumstoint2vector(Bitmapset *attrs)
+{
+       int2vector *result;
+       int                     n = bms_num_members(attrs);
+       int                     i = -1;
+       int                     j = 0;
+
+       result = buildint2vector(NULL, n);
+
+       while ((i = bms_next_member(attrs, i)) >= 0)
+       {
+               Assert(i <= PG_INT16_MAX);
+
+               result->values[j++] = (int16) i;
+       }
+
+       return result;
+}
+
 /*
  * Insert new publication / relation mapping.
  */
@@ -365,12 +389,12 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
        Relation        targetrel = pri->relation;
        Oid                     relid = RelationGetRelid(targetrel);
        Oid                     pubreloid;
+       Bitmapset  *attnums;
        Publication *pub = GetPublication(pubid);
-       AttrNumber *attarray = NULL;
-       int                     natts = 0;
        ObjectAddress myself,
                                referenced;
        List       *relids = NIL;
+       int                     i;
 
        rel = table_open(PublicationRelRelationId, RowExclusiveLock);
 
@@ -395,13 +419,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
 
        check_publication_add_relation(targetrel);
 
-       /*
-        * Translate column names to attnums and make sure the column list
-        * contains only allowed elements (no system or generated columns etc.).
-        * Also build an array of attnums, for storing in the catalog.
-        */
-       publication_translate_columns(pri->relation, pri->columns,
-                                                                 &natts, &attarray);
+       /* Validate and translate column names into a Bitmapset of attnums. */
+       attnums = pub_collist_validate(pri->relation, pri->columns);
 
        /* Form a tuple. */
        memset(values, 0, sizeof(values));
@@ -423,7 +442,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
 
        /* Add column list, if available */
        if (pri->columns)
-               values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(buildint2vector(attarray, natts));
+               values[Anum_pg_publication_rel_prattrs - 1] = PointerGetDatum(attnumstoint2vector(attnums));
        else
                nulls[Anum_pg_publication_rel_prattrs - 1] = true;
 
@@ -451,9 +470,10 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
                                                                                false);
 
        /* Add dependency on the columns, if any are listed */
-       for (int i = 0; i < natts; i++)
+       i = -1;
+       while ((i = bms_next_member(attnums, i)) >= 0)
        {
-               ObjectAddressSubSet(referenced, RelationRelationId, relid, attarray[i]);
+               ObjectAddressSubSet(referenced, RelationRelationId, relid, i);
                recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
        }
 
@@ -476,47 +496,23 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri,
        return myself;
 }
 
-/* qsort comparator for attnums */
-static int
-compare_int16(const void *a, const void *b)
-{
-       int                     av = *(const int16 *) a;
-       int                     bv = *(const int16 *) b;
-
-       /* this can't overflow if int is wider than int16 */
-       return (av - bv);
-}
-
 /*
- * Translate a list of column names to an array of attribute numbers
- * and a Bitmapset with them; verify that each attribute is appropriate
- * to have in a publication column list (no system or generated attributes,
- * no duplicates).  Additional checks with replica identity are done later;
- * see pub_collist_contains_invalid_column.
+ * pub_collist_validate
+ *             Process and validate the 'columns' list and ensure the columns are all
+ *             valid to use for a publication.  Checks for and raises an ERROR for
+ *             any; unknown columns, system columns, duplicate columns or generated
+ *             columns.
  *
- * Note that the attribute numbers are *not* offset by
- * FirstLowInvalidHeapAttributeNumber; system columns are forbidden so this
- * is okay.
+ * Looks up each column's attnum and returns a 0-based Bitmapset of the
+ * corresponding attnums.
  */
-static void
-publication_translate_columns(Relation targetrel, List *columns,
-                                                         int *natts, AttrNumber **attrs)
+Bitmapset *
+pub_collist_validate(Relation targetrel, List *columns)
 {
-       AttrNumber *attarray = NULL;
        Bitmapset  *set = NULL;
        ListCell   *lc;
-       int                     n = 0;
        TupleDesc       tupdesc = RelationGetDescr(targetrel);
 
-       /* Bail out when no column list defined. */
-       if (!columns)
-               return;
-
-       /*
-        * Translate list of columns to attnums. We prohibit system attributes and
-        * make sure there are no duplicate columns.
-        */
-       attarray = palloc(sizeof(AttrNumber) * list_length(columns));
        foreach(lc, columns)
        {
                char       *colname = strVal(lfirst(lc));
@@ -547,16 +543,9 @@ publication_translate_columns(Relation targetrel, List *columns,
                                                   colname));
 
                set = bms_add_member(set, attnum);
-               attarray[n++] = attnum;
        }
 
-       /* Be tidy, so that the catalog representation is always sorted */
-       qsort(attarray, n, sizeof(AttrNumber), compare_int16);
-
-       *natts = n;
-       *attrs = attarray;
-
-       bms_free(set);
+       return set;
 }
 
 /*
@@ -569,19 +558,12 @@ publication_translate_columns(Relation targetrel, List *columns,
 Bitmapset *
 pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt)
 {
-       Bitmapset  *result = NULL;
+       Bitmapset  *result = columns;
        ArrayType  *arr;
        int                     nelems;
        int16      *elems;
        MemoryContext oldcxt = NULL;
 
-       /*
-        * If an existing bitmap was provided, use it. Otherwise just use NULL and
-        * build a new bitmap.
-        */
-       if (columns)
-               result = columns;
-
        arr = DatumGetArrayTypeP(pubcols);
        nelems = ARR_DIMS(arr)[0];
        elems = (int16 *) ARR_DATA_PTR(arr);
index 6ea709988ee7d7d5b8543906e978f99c51f4e970..d6ffef374ea35399571d5a0b12d5f678026c6b88 100644 (file)
@@ -1176,21 +1176,13 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
                                newrelid = RelationGetRelid(newpubrel->relation);
 
                                /*
-                                * If the new publication has column list, transform it to a
-                                * bitmap too.
+                                * Validate the column list.  If the column list or WHERE
+                                * clause changes, then the validation done here will be
+                                * duplicated inside PublicationAddTables().  The validation
+                                * is cheap enough that that seems harmless.
                                 */
-                               if (newpubrel->columns)
-                               {
-                                       ListCell   *lc;
-
-                                       foreach(lc, newpubrel->columns)
-                                       {
-                                               char       *colname = strVal(lfirst(lc));
-                                               AttrNumber      attnum = get_attnum(newrelid, colname);
-
-                                               newcolumns = bms_add_member(newcolumns, attnum);
-                                       }
-                               }
+                               newcolumns = pub_collist_validate(newpubrel->relation,
+                                                                                                 newpubrel->columns);
 
                                /*
                                 * Check if any of the new set of relations matches with the
@@ -1199,7 +1191,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup,
                                 * expressions also match. Same for the column list. Drop the
                                 * rest.
                                 */
-                               if (RelationGetRelid(newpubrel->relation) == oldrelid)
+                               if (newrelid == oldrelid)
                                {
                                        if (equal(oldrelwhereclause, newpubrel->whereClause) &&
                                                bms_equal(oldcolumns, newcolumns))
index d124bfe55caaaacef01f53bc5d93715cbf6ff6da..b925c464ae6249746e980229569b0c91525c39f3 100644 (file)
@@ -2266,7 +2266,7 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications)
                 *
                 * Note that attrs are always stored in sorted order so we don't need
                 * to worry if different publications have specified them in a
-                * different order. See publication_translate_columns.
+                * different order. See pub_collist_validate.
                 */
                appendStringInfo(&cmd, "SELECT DISTINCT n.nspname, c.relname, gpt.attrs\n"
                                                 "       FROM pg_class c\n"
index 2f1b6abbfa7c68ad194ce93880e3a0f34f30382c..d9518a58b008c285ca4c39dc7770b87c0c669d53 100644 (file)
@@ -152,6 +152,7 @@ extern bool is_publishable_relation(Relation rel);
 extern bool is_schema_publication(Oid pubid);
 extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri,
                                                                                          bool if_not_exists);
+extern Bitmapset *pub_collist_validate(Relation targetrel, List *columns);
 extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid,
                                                                                        bool if_not_exists);
 
index 30b637113404f9bfb64d896a689b7ba15c54f03d..660245ed0c4a138f6aa600a22d31ffdc29915dbe 100644 (file)
@@ -693,6 +693,13 @@ ERROR:  cannot use generated column "d" in publication column list
 -- error: system attributes "ctid" not allowed in column list
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
 ERROR:  cannot use system column "ctid" in publication column list
+ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl1 (id, ctid);
+ERROR:  cannot use system column "ctid" in publication column list
+-- error: duplicates not allowed in column list
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, a);
+ERROR:  duplicate column "a" in publication column list
+ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl5 (a, a);
+ERROR:  duplicate column "a" in publication column list
 -- ok
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
 ALTER TABLE testpub_tbl5 DROP COLUMN c;                -- no dice
index 479d4f3264472dde452c3c5cdb85c32d1a5d743c..f68a5b59862a4b43f1f01fb4cdfd1ed8eb63227c 100644 (file)
@@ -417,6 +417,10 @@ ALTER PUBLICATION testpub_fortable DROP TABLE testpub_tbl5;
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, d);
 -- error: system attributes "ctid" not allowed in column list
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, ctid);
+ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl1 (id, ctid);
+-- error: duplicates not allowed in column list
+ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, a);
+ALTER PUBLICATION testpub_fortable SET TABLE testpub_tbl5 (a, a);
 -- ok
 ALTER PUBLICATION testpub_fortable ADD TABLE testpub_tbl5 (a, c);
 ALTER TABLE testpub_tbl5 DROP COLUMN c;                -- no dice