</para>
<para>
- Additional logging is triggered in the following <firstterm>conflict</firstterm>
- cases:
+ Additional logging is triggered, and the conflict statistics are collected (displayed in the
+ <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+ in the following <firstterm>conflict</firstterm> cases:
<variablelist>
- <varlistentry>
+ <varlistentry id="conflict-insert-exists" xreflabel="insert_exists">
<term><literal>insert_exists</literal></term>
<listitem>
<para>
</para>
</listitem>
</varlistentry>
- <varlistentry>
+ <varlistentry id="conflict-update-origin-differs" xreflabel="update_origin_differs">
<term><literal>update_origin_differs</literal></term>
<listitem>
<para>
</para>
</listitem>
</varlistentry>
- <varlistentry>
+ <varlistentry id="conflict-update-exists" xreflabel="update_exists">
<term><literal>update_exists</literal></term>
<listitem>
<para>
</para>
</listitem>
</varlistentry>
- <varlistentry>
+ <varlistentry id="conflict-update-missing" xreflabel="update_missing">
<term><literal>update_missing</literal></term>
<listitem>
<para>
</para>
</listitem>
</varlistentry>
- <varlistentry>
+ <varlistentry id="conflict-delete-origin-differs" xreflabel="delete_origin_differs">
<term><literal>delete_origin_differs</literal></term>
<listitem>
<para>
</para>
</listitem>
</varlistentry>
- <varlistentry>
+ <varlistentry id="conflict-delete-missing" xreflabel="delete_missing">
<term><literal>delete_missing</literal></term>
<listitem>
<para>
<row>
<entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
- <entry>One row per subscription, showing statistics about errors.
+ <entry>One row per subscription, showing statistics about errors and conflicts.
See <link linkend="monitoring-pg-stat-subscription-stats">
<structname>pg_stat_subscription_stats</structname></link> for details.
</entry>
<structfield>apply_error_count</structfield> <type>bigint</type>
</para>
<para>
- Number of times an error occurred while applying changes
+ Number of times an error occurred while applying changes. Note that any
+ conflict resulting in an apply error will be counted in both
+ <literal>apply_error_count</literal> and the corresponding conflict
+ count (e.g., <literal>confl_*</literal>).
</para></entry>
</row>
</para></entry>
</row>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_insert_exists</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times a row insertion violated a
+ <literal>NOT DEFERRABLE</literal> unique constraint during the
+ application of changes. See <xref linkend="conflict-insert-exists"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_update_origin_differs</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times an update was applied to a row that had been previously
+ modified by another source during the application of changes. See
+ <xref linkend="conflict-update-origin-differs"/> for details about this
+ conflict.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_update_exists</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times that an updated row value violated a
+ <literal>NOT DEFERRABLE</literal> unique constraint during the
+ application of changes. See <xref linkend="conflict-update-exists"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_update_missing</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times the tuple to be updated was not found during the
+ application of changes. See <xref linkend="conflict-update-missing"/>
+ for details about this conflict.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_delete_origin_differs</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times a delete operation was applied to row that had been
+ previously modified by another source during the application of changes.
+ See <xref linkend="conflict-delete-origin-differs"/> for details about
+ this conflict.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>confl_delete_missing</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times the tuple to be deleted was not found during the application
+ of changes. See <xref linkend="conflict-delete-missing"/> for details
+ about this conflict.
+ </para></entry>
+ </row>
+
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>stats_reset</structfield> <type>timestamp with time zone</type>
s.subname,
ss.apply_error_count,
ss.sync_error_count,
+ ss.confl_insert_exists,
+ ss.confl_update_origin_differs,
+ ss.confl_update_exists,
+ ss.confl_update_missing,
+ ss.confl_delete_origin_differs,
+ ss.confl_delete_missing,
ss.stats_reset
FROM pg_subscription as s,
pg_stat_get_subscription_stats(s.oid) as ss;
#include "access/commit_ts.h"
#include "access/tableam.h"
#include "executor/executor.h"
+#include "pgstat.h"
#include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
#include "storage/lmgr.h"
#include "utils/lsyscache.h"
Assert(!OidIsValid(indexoid) ||
CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
+ pgstat_report_subscription_conflict(MySubscription->oid, type);
+
ereport(elevel,
errcode_apply_conflict(type),
errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
pending->sync_error_count++;
}
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+ PgStat_EntryRef *entry_ref;
+ PgStat_BackendSubEntry *pending;
+
+ entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+ InvalidOid, subid, NULL);
+ pending = entry_ref->pending;
+ pending->conflict_count[type]++;
+}
+
/*
* Report creating the subscription.
*/
#define SUB_ACC(fld) shsubent->stats.fld += localent->fld
SUB_ACC(apply_error_count);
SUB_ACC(sync_error_count);
+ for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+ SUB_ACC(conflict_count[i]);
#undef SUB_ACC
pgstat_unlock_entry(entry_ref);
Datum
pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 10
Oid subid = PG_GETARG_OID(0);
TupleDesc tupdesc;
Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
bool nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
PgStat_StatSubEntry *subentry;
PgStat_StatSubEntry allzero;
+ int i = 0;
/* Get subscription stats */
subentry = pgstat_fetch_stat_subscription(subid);
INT8OID, -1, 0);
TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
INT8OID, -1, 0);
- TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+ TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+ INT8OID, -1, 0);
+ TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
TIMESTAMPTZOID, -1, 0);
BlessTupleDesc(tupdesc);
}
/* subid */
- values[0] = ObjectIdGetDatum(subid);
+ values[i++] = ObjectIdGetDatum(subid);
/* apply_error_count */
- values[1] = Int64GetDatum(subentry->apply_error_count);
+ values[i++] = Int64GetDatum(subentry->apply_error_count);
/* sync_error_count */
- values[2] = Int64GetDatum(subentry->sync_error_count);
+ values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+ /* conflict count */
+ for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+ values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
/* stats_reset */
if (subentry->stat_reset_timestamp == 0)
- nulls[3] = true;
+ nulls[i] = true;
else
- values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+ values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+ Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202408301
+#define CATALOG_VERSION_NO 202409041
#endif
{ oid => '6231', descr => 'statistics: information about subscription stats',
proname => 'pg_stat_get_subscription_stats', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
- proallargtypes => '{oid,oid,int8,int8,timestamptz}',
- proargmodes => '{i,o,o,o,o}',
- proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+ proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
prosrc => 'pg_stat_get_subscription_stats' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
#include "datatype/timestamp.h"
#include "portability/instr_time.h"
#include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
#include "utils/backend_progress.h" /* for backward compatibility */
#include "utils/backend_status.h" /* for backward compatibility */
#include "utils/relcache.h"
{
PgStat_Counter apply_error_count;
PgStat_Counter sync_error_count;
+ PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
} PgStat_BackendSubEntry;
/* ----------
{
PgStat_Counter apply_error_count;
PgStat_Counter sync_error_count;
+ PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
TimestampTz stat_reset_timestamp;
} PgStat_StatSubEntry;
*/
extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
extern void pgstat_create_subscription(Oid subid);
extern void pgstat_drop_subscription(Oid subid);
extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
/*
* Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count and
+ * PgStat_BackendSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
*/
typedef enum
{
*/
} ConflictType;
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
TransactionId *xmin,
RepOriginId *localorigin,
s.subname,
ss.apply_error_count,
ss.sync_error_count,
+ ss.confl_insert_exists,
+ ss.confl_update_origin_differs,
+ ss.confl_update_exists,
+ ss.confl_update_missing,
+ ss.confl_delete_origin_differs,
+ ss.confl_delete_missing,
ss.stats_reset
FROM pg_subscription s,
- LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+ LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
pg_stat_sys_indexes| SELECT relid,
indexrelid,
schemaname,
qq[
BEGIN;
CREATE TABLE $table_name(a int);
+ ALTER TABLE $table_name REPLICA IDENTITY FULL;
INSERT INTO $table_name VALUES (1);
COMMIT;
]);
# subscriber due to violation of the unique constraint on test table.
$node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
- # Wait for the apply error to be reported.
+ # Wait for the subscriber to report both an apply error and an
+ # insert_exists conflict.
$node_subscriber->poll_query_until(
$db,
qq[
- SELECT apply_error_count > 0
+ SELECT apply_error_count > 0 AND confl_insert_exists > 0
FROM pg_stat_subscription_stats
WHERE subname = '$sub_name'
])
or die
- qq(Timed out while waiting for apply error for subscription '$sub_name');
+ qq(Timed out while waiting for apply error and insert_exists conflict for subscription '$sub_name');
# Truncate test table so that apply worker can continue.
$node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
+ # Delete data from the test table on the publisher. This delete operation
+ # should be skipped on the subscriber since the table is already empty.
+ $node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+ # Wait for the subscriber to report a delete_missing conflict.
+ $node_subscriber->poll_query_until(
+ $db,
+ qq[
+ SELECT confl_delete_missing > 0
+ FROM pg_stat_subscription_stats
+ WHERE subname = '$sub_name'
+ ])
+ or die
+ qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
+
return ($pub_name, $sub_name);
}
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
$table1_name);
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
sync_error_count > 0,
+ confl_insert_exists > 0,
+ confl_delete_missing > 0,
stats_reset IS NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t),
- qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+ qq(t|t|t|t|t),
+ qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
);
# Reset a single subscription
qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
);
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
sync_error_count = 0,
+ confl_insert_exists = 0,
+ confl_delete_missing = 0,
stats_reset IS NOT NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t),
- qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+ qq(t|t|t|t|t),
+ qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
);
# Get reset timestamp
create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
$table2_name);
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count > 0,
sync_error_count > 0,
+ confl_insert_exists > 0,
+ confl_delete_missing > 0,
stats_reset IS NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
- qq(t|t|t),
- qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+ qq(t|t|t|t|t),
+ qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
);
# Reset all subscriptions
$node_subscriber->safe_psql($db,
qq(SELECT pg_stat_reset_subscription_stats(NULL)));
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
sync_error_count = 0,
+ confl_insert_exists = 0,
+ confl_delete_missing = 0,
stats_reset IS NOT NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub1_name')
),
- qq(t|t|t),
- qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+ qq(t|t|t|t|t),
+ qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
);
is( $node_subscriber->safe_psql(
$db,
qq(SELECT apply_error_count = 0,
sync_error_count = 0,
+ confl_insert_exists = 0,
+ confl_delete_missing = 0,
stats_reset IS NOT NULL
FROM pg_stat_subscription_stats
WHERE subname = '$sub2_name')
),
- qq(t|t|t),
- qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+ qq(t|t|t|t|t),
+ qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
);
$reset_time1 = $node_subscriber->safe_psql($db,