certain additional requirements) can also be set to be the replica
identity. If the table does not have any suitable key, then it can be set
to replica identity <quote>full</quote>, which means the entire row becomes
- the key. This, however, is very inefficient and should only be used as a
+ the key. When replica identity <quote>full</quote> is specified,
+ indexes can be used on the subscriber side for searching the rows. Candidate
+ indexes must be btree, non-partial, and have at least one column reference
+ (i.e. cannot consist of only expressions). These restrictions
+ on the non-unique index properties adhere to some of the restrictions that
+ are enforced for primary keys. If there are no such suitable indexes,
+ the search on the subscriber side can be very inefficient, therefore
+ replica identity <quote>full</quote> should only be used as a
fallback if no other solution is possible. If a replica identity other
than <quote>full</quote> is set on the publisher side, a replica identity
comprising the same or fewer columns must also be set on the subscriber
#include "nodes/nodeFuncs.h"
#include "parser/parse_relation.h"
#include "parser/parsetree.h"
+#include "replication/logicalrelation.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/typcache.h"
+static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
+ TypeCacheEntry **eq);
+
/*
* Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
* is setup to match 'rel' (*NOT* idxrel!).
*
- * Returns whether any column contains NULLs.
+ * Returns how many columns to use for the index scan.
+ *
+ * This is not generic routine, it expects the idxrel to be a btree, non-partial
+ * and have at least one column reference (i.e. cannot consist of only
+ * expressions).
*
- * This is not generic routine, it expects the idxrel to be replication
- * identity of a rel and meet all limitations associated with that.
+ * By definition, replication identity of a rel meets all limitations associated
+ * with that. Note that any other index could also meet these limitations.
*/
-static bool
+static int
build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
TupleTableSlot *searchslot)
{
- int attoff;
+ int index_attoff;
+ int skey_attoff = 0;
bool isnull;
Datum indclassDatum;
oidvector *opclass;
int2vector *indkey = &idxrel->rd_index->indkey;
- bool hasnulls = false;
-
- Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
- RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
Anum_pg_index_indclass, &isnull);
Assert(!isnull);
opclass = (oidvector *) DatumGetPointer(indclassDatum);
- /* Build scankey for every attribute in the index. */
- for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
+ /* Build scankey for every non-expression attribute in the index. */
+ for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
+ index_attoff++)
{
Oid operator;
+ Oid optype;
Oid opfamily;
RegProcedure regop;
- int pkattno = attoff + 1;
- int mainattno = indkey->values[attoff];
- Oid optype = get_opclass_input_type(opclass->values[attoff]);
+ int table_attno = indkey->values[index_attoff];
+
+ if (!AttributeNumberIsValid(table_attno))
+ {
+ /*
+ * XXX: Currently, we don't support expressions in the scan key,
+ * see code below.
+ */
+ continue;
+ }
/*
* Load the operator info. We need this to get the equality operator
* function for the scan key.
*/
- opfamily = get_opclass_family(opclass->values[attoff]);
+ optype = get_opclass_input_type(opclass->values[index_attoff]);
+ opfamily = get_opclass_family(opclass->values[index_attoff]);
operator = get_opfamily_member(opfamily, optype,
optype,
regop = get_opcode(operator);
/* Initialize the scankey. */
- ScanKeyInit(&skey[attoff],
- pkattno,
+ ScanKeyInit(&skey[skey_attoff],
+ index_attoff + 1,
BTEqualStrategyNumber,
regop,
- searchslot->tts_values[mainattno - 1]);
+ searchslot->tts_values[table_attno - 1]);
- skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+ skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
/* Check for null value. */
- if (searchslot->tts_isnull[mainattno - 1])
- {
- hasnulls = true;
- skey[attoff].sk_flags |= SK_ISNULL;
- }
+ if (searchslot->tts_isnull[table_attno - 1])
+ skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
+
+ skey_attoff++;
}
- return hasnulls;
+ /* There must always be at least one attribute for the index scan. */
+ Assert(skey_attoff > 0);
+
+ return skey_attoff;
}
/*
TupleTableSlot *outslot)
{
ScanKeyData skey[INDEX_MAX_KEYS];
+ int skey_attoff;
IndexScanDesc scan;
SnapshotData snap;
TransactionId xwait;
Relation idxrel;
bool found;
+ TypeCacheEntry **eq = NULL;
+ bool isIdxSafeToSkipDuplicates;
/* Open the index. */
idxrel = index_open(idxoid, RowExclusiveLock);
- /* Start an index scan. */
+ isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
+
InitDirtySnapshot(snap);
- scan = index_beginscan(rel, idxrel, &snap,
- IndexRelationGetNumberOfKeyAttributes(idxrel),
- 0);
/* Build scan key. */
- build_replindex_scan_key(skey, rel, idxrel, searchslot);
+ skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+ /* Start an index scan. */
+ scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
retry:
found = false;
- index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
+ index_rescan(scan, skey, skey_attoff, NULL, 0);
/* Try to find the tuple */
- if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+ while (index_getnext_slot(scan, ForwardScanDirection, outslot))
{
- found = true;
+ /*
+ * Avoid expensive equality check if the index is primary key or
+ * replica identity index.
+ */
+ if (!isIdxSafeToSkipDuplicates)
+ {
+ if (eq == NULL)
+ {
+#ifdef USE_ASSERT_CHECKING
+ /* apply assertions only once for the input idxoid */
+ IndexInfo *indexInfo = BuildIndexInfo(idxrel);
+
+ Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
+#endif
+
+ eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
+ }
+
+ if (!tuples_equal(outslot, searchslot, eq))
+ continue;
+ }
+
ExecMaterializeSlot(outslot);
xwait = TransactionIdIsValid(snap.xmin) ?
XactLockTableWait(xwait, NULL, NULL, XLTW_None);
goto retry;
}
+
+ /* Found our tuple and it's not locked */
+ found = true;
+ break;
}
/* Found tuple, try to lock it in the lockmode. */
#include "postgres.h"
+#include "access/genam.h"
#include "access/table.h"
#include "catalog/namespace.h"
+#include "catalog/pg_am_d.h"
#include "catalog/pg_subscription_rel.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
LogicalRepRelMapEntry relmapentry;
} LogicalRepPartMapEntry;
+static Oid FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
+ AttrMap *attrMap);
+
/*
* Relcache invalidation callback for our relation map cache.
*/
*/
logicalrep_rel_mark_updatable(entry);
+ /*
+ * Finding a usable index is an infrequent task. It occurs when an
+ * operation is first performed on the relation, or after invalidation
+ * of the relation cache entry (such as ANALYZE or CREATE/DROP index
+ * on the relation).
+ */
+ entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel,
+ entry->attrmap);
+
entry->localrelvalid = true;
}
/* Set if the table's replica identity is enough to apply update/delete. */
logicalrep_rel_mark_updatable(entry);
- entry->localrelvalid = true;
-
/* state and statelsn are left set to 0. */
MemoryContextSwitchTo(oldctx);
+ /*
+ * Finding a usable index is an infrequent task. It occurs when an
+ * operation is first performed on the relation, or after invalidation of
+ * the relation cache entry (such as ANALYZE or CREATE/DROP index on the
+ * relation).
+ *
+ * We also prefer to run this code on the oldctx so that we do not
+ * anything in the LogicalRepPartMapContext (hence CacheMemoryContext).
+ */
+ entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel,
+ entry->attrmap);
+
+ entry->localrelvalid = true;
+
return entry;
}
+
+/*
+ * Returns true if the given index consists only of expressions such as:
+ * CREATE INDEX idx ON table(foo(col));
+ *
+ * Returns false even if there is one column reference:
+ * CREATE INDEX idx ON table(foo(col), col_2);
+ */
+static bool
+IsIndexOnlyOnExpression(IndexInfo *indexInfo)
+{
+ for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+ {
+ AttrNumber attnum = indexInfo->ii_IndexAttrNumbers[i];
+
+ if (AttributeNumberIsValid(attnum))
+ return false;
+ }
+
+ return true;
+}
+
+/*
+ * Returns true if the attrmap contains the leftmost column of the index.
+ * Otherwise returns false.
+ *
+ * attrmap is a map of local attributes to remote ones. We can consult this
+ * map to check whether the local index attribute has a corresponding remote
+ * attribute.
+ */
+static bool
+RemoteRelContainsLeftMostColumnOnIdx(IndexInfo *indexInfo, AttrMap *attrmap)
+{
+ AttrNumber keycol;
+
+ Assert(indexInfo->ii_NumIndexAttrs >= 1);
+
+ keycol = indexInfo->ii_IndexAttrNumbers[0];
+ if (!AttributeNumberIsValid(keycol))
+ return false;
+
+ if (attrmap->maplen <= AttrNumberGetAttrOffset(keycol))
+ return false;
+
+ return attrmap->attnums[AttrNumberGetAttrOffset(keycol)] >= 0;
+}
+
+/*
+ * Returns the oid of an index that can be used by the apply worker to scan
+ * the relation. The index must be btree, non-partial, and have at least
+ * one column reference (i.e. cannot consist of only expressions). These
+ * limitations help to keep the index scan similar to PK/RI index scans.
+ *
+ * Note that the limitations of index scans for replica identity full only
+ * adheres to a subset of the limitations of PK/RI. For example, we support
+ * columns that are marked as [NULL] or we are not interested in the [NOT
+ * DEFERRABLE] aspect of constraints here. It works for us because we always
+ * compare the tuples for non-PK/RI index scans. See
+ * RelationFindReplTupleByIndex().
+ *
+ * XXX: There are no fundamental problems for supporting non-btree indexes.
+ * We mostly need to relax the limitations in RelationFindReplTupleByIndex().
+ * For partial indexes, the required changes are likely to be larger. If
+ * none of the tuples satisfy the expression for the index scan, we fall-back
+ * to sequential execution, which might not be a good idea in some cases.
+ *
+ * We also skip indexes if the remote relation does not contain the leftmost
+ * column of the index. This is because in most such cases sequential scan is
+ * favorable over index scan.
+ *
+ * We expect to call this function when REPLICA IDENTITY FULL is defined for
+ * the remote relation.
+ *
+ * If no suitable index is found, returns InvalidOid.
+ */
+static Oid
+FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap)
+{
+ List *idxlist = RelationGetIndexList(localrel);
+ ListCell *lc;
+
+ foreach(lc, idxlist)
+ {
+ Oid idxoid = lfirst_oid(lc);
+ bool isUsableIdx;
+ bool containsLeftMostCol;
+ Relation idxRel;
+ IndexInfo *idxInfo;
+
+ idxRel = index_open(idxoid, AccessShareLock);
+ idxInfo = BuildIndexInfo(idxRel);
+ isUsableIdx = IsIndexUsableForReplicaIdentityFull(idxInfo);
+ containsLeftMostCol =
+ RemoteRelContainsLeftMostColumnOnIdx(idxInfo, attrmap);
+ index_close(idxRel, AccessShareLock);
+
+ /* Return the first eligible index found */
+ if (isUsableIdx && containsLeftMostCol)
+ return idxoid;
+ }
+
+ return InvalidOid;
+}
+
+/*
+ * Returns true if the index is usable for replica identity full. For details,
+ * see FindUsableIndexForReplicaIdentityFull.
+ */
+bool
+IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo)
+{
+ bool is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
+ bool is_partial = (indexInfo->ii_Predicate != NIL);
+ bool is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
+
+ return is_btree && !is_partial && !is_only_on_expression;
+}
+
+/*
+ * Get replica identity index or if it is not defined a primary key.
+ *
+ * If neither is defined, returns InvalidOid
+ */
+Oid
+GetRelationIdentityOrPK(Relation rel)
+{
+ Oid idxoid;
+
+ idxoid = RelationGetReplicaIndex(rel);
+
+ if (!OidIsValid(idxoid))
+ idxoid = RelationGetPrimaryKeyIndex(rel);
+
+ return idxoid;
+}
+
+/*
+ * Returns the index oid if we can use an index for subscriber. Otherwise,
+ * returns InvalidOid.
+ */
+static Oid
+FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
+ AttrMap *attrMap)
+{
+ Oid idxoid;
+
+ /*
+ * We never need index oid for partitioned tables, always rely on leaf
+ * partition's index.
+ */
+ if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ return InvalidOid;
+
+ /*
+ * Simple case, we already have a primary key or a replica identity index.
+ */
+ idxoid = GetRelationIdentityOrPK(localrel);
+ if (OidIsValid(idxoid))
+ return idxoid;
+
+ if (remoterel->replident == REPLICA_IDENTITY_FULL)
+ {
+ /*
+ * We are looking for one more opportunity for using an index. If
+ * there are any indexes defined on the local relation, try to pick a
+ * suitable index.
+ *
+ * The index selection safely assumes that all the columns are going
+ * to be available for the index scan given that remote relation has
+ * replica identity full.
+ *
+ * Note that we are not using the planner to find the cheapest method
+ * to scan the relation as that would require us to either use lower
+ * level planner functions which would be a maintenance burden in the
+ * long run or use the full-fledged planner which could cause
+ * overhead.
+ */
+ return FindUsableIndexForReplicaIdentityFull(localrel, attrMap);
+ }
+
+ return InvalidOid;
+}
static void apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
- LogicalRepTupleData *newtup);
+ LogicalRepTupleData *newtup,
+ Oid localindexoid);
static void apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
- TupleTableSlot *remoteslot);
+ TupleTableSlot *remoteslot,
+ Oid localindexoid);
static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
+ Oid localidxoid,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot);
static void apply_handle_tuple_routing(ApplyExecutionData *edata,
logicalrep_read_typ(s, &typ);
}
-/*
- * Get replica identity index or if it is not defined a primary key.
- *
- * If neither is defined, returns InvalidOid
- */
-static Oid
-GetRelationIdentityOrPK(Relation rel)
-{
- Oid idxoid;
-
- idxoid = RelationGetReplicaIndex(rel);
-
- if (!OidIsValid(idxoid))
- idxoid = RelationGetPrimaryKeyIndex(rel);
-
- return idxoid;
-}
-
/*
* Check that we (the subscription owner) have sufficient privileges on the
* target relation to perform the given operation.
remoteslot, &newtup, CMD_UPDATE);
else
apply_handle_update_internal(edata, edata->targetRelInfo,
- remoteslot, &newtup);
+ remoteslot, &newtup, rel->localindexoid);
finish_edata(edata);
apply_handle_update_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
TupleTableSlot *remoteslot,
- LogicalRepTupleData *newtup)
+ LogicalRepTupleData *newtup,
+ Oid localindexoid)
{
EState *estate = edata->estate;
LogicalRepRelMapEntry *relmapentry = edata->targetRel;
found = FindReplTupleInLocalRel(estate, localrel,
&relmapentry->remoterel,
+ localindexoid,
remoteslot, &localslot);
ExecClearTuple(remoteslot);
remoteslot, NULL, CMD_DELETE);
else
apply_handle_delete_internal(edata, edata->targetRelInfo,
- remoteslot);
+ remoteslot, rel->localindexoid);
finish_edata(edata);
static void
apply_handle_delete_internal(ApplyExecutionData *edata,
ResultRelInfo *relinfo,
- TupleTableSlot *remoteslot)
+ TupleTableSlot *remoteslot,
+ Oid localindexoid)
{
EState *estate = edata->estate;
Relation localrel = relinfo->ri_RelationDesc;
EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
ExecOpenIndices(relinfo, false);
- found = FindReplTupleInLocalRel(estate, localrel, remoterel,
+ found = FindReplTupleInLocalRel(estate, localrel, remoterel, localindexoid,
remoteslot, &localslot);
/* If found delete it. */
/*
* Try to find a tuple received from the publication side (in 'remoteslot') in
* the corresponding local relation using either replica identity index,
- * primary key or if needed, sequential scan.
+ * primary key, index or if needed, sequential scan.
*
* Local tuple, if found, is returned in '*localslot'.
*/
static bool
FindReplTupleInLocalRel(EState *estate, Relation localrel,
LogicalRepRelation *remoterel,
+ Oid localidxoid,
TupleTableSlot *remoteslot,
TupleTableSlot **localslot)
{
- Oid idxoid;
bool found;
/*
*localslot = table_slot_create(localrel, &estate->es_tupleTable);
- idxoid = GetRelationIdentityOrPK(localrel);
- Assert(OidIsValid(idxoid) ||
+ Assert(OidIsValid(localidxoid) ||
(remoterel->replident == REPLICA_IDENTITY_FULL));
- if (OidIsValid(idxoid))
- found = RelationFindReplTupleByIndex(localrel, idxoid,
+ if (OidIsValid(localidxoid))
+ found = RelationFindReplTupleByIndex(localrel, localidxoid,
LockTupleExclusive,
remoteslot, *localslot);
else
case CMD_DELETE:
apply_handle_delete_internal(edata, partrelinfo,
- remoteslot_part);
+ remoteslot_part,
+ part_entry->localindexoid);
break;
case CMD_UPDATE:
/* Get the matching local tuple from the partition. */
found = FindReplTupleInLocalRel(estate, partrel,
&part_entry->remoterel,
+ part_entry->localindexoid,
remoteslot_part, &localslot);
if (!found)
{
/* DELETE old tuple found in the old partition. */
apply_handle_delete_internal(edata, partrelinfo,
- localslot);
+ localslot,
+ part_entry->localindexoid);
/* INSERT new tuple into the new partition. */
#define LOGICALRELATION_H
#include "access/attmap.h"
+#include "catalog/index.h"
#include "replication/logicalproto.h"
typedef struct LogicalRepRelMapEntry
Relation localrel; /* relcache entry (NULL when closed) */
AttrMap *attrmap; /* map of local attributes to remote ones */
bool updatable; /* Can apply updates/deletes? */
+ Oid localindexoid; /* which index to use, or InvalidOid if none */
/* Sync state. */
char state;
Relation partrel, AttrMap *map);
extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
LOCKMODE lockmode);
+extern bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo);
+extern Oid GetRelationIdentityOrPK(Relation rel);
#endif /* LOGICALRELATION_H */
't/029_on_error.pl',
't/030_origin.pl',
't/031_column_list.pl',
+ 't/032_subscribe_use_index.pl',
't/100_bugs.pl',
],
},