Collect statistics about conflicts in logical replication.
authorAmit Kapila <[email protected]>
Wed, 4 Sep 2024 03:25:21 +0000 (08:55 +0530)
committerAmit Kapila <[email protected]>
Wed, 4 Sep 2024 03:25:21 +0000 (08:55 +0530)
This commit adds columns in view pg_stat_subscription_stats to show the
number of times a particular conflict type has occurred during the
application of logical replication changes. The following columns are
added:

confl_insert_exists:
        Number of times a row insertion violated a NOT DEFERRABLE unique
        constraint.
confl_update_origin_differs:
        Number of times an update was performed on a row that was
        previously modified by another origin.
confl_update_exists:
        Number of times that the updated value of a row violates a
        NOT DEFERRABLE unique constraint.
confl_update_missing:
        Number of times that the tuple to be updated is missing.
confl_delete_origin_differs:
        Number of times a delete was performed on a row that was
        previously modified by another origin.
confl_delete_missing:
        Number of times that the tuple to be deleted is missing.

The update_origin_differs and delete_origin_differs conflicts can be
detected only when track_commit_timestamp is enabled.

Author: Hou Zhijie
Reviewed-by: Shveta Malik, Peter Smith, Anit Kapila
Discussion: https://postgr.es/m/OS0PR01MB57160A07BD575773045FC214948F2@OS0PR01MB5716.jpnprd01.prod.outlook.com

12 files changed:
doc/src/sgml/logical-replication.sgml
doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/replication/logical/conflict.c
src/backend/utils/activity/pgstat_subscription.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/pgstat.h
src/include/replication/conflict.h
src/test/regress/expected/rules.out
src/test/subscription/t/026_stats.pl

index 46917f9f94f8db0362290140cccee871489ff7c3..df62eb45ff81bdc0e42b7a023a645c7092fb1a84 100644 (file)
@@ -1582,10 +1582,11 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
   </para>
 
   <para>
-   Additional logging is triggered in the following <firstterm>conflict</firstterm>
-   cases:
+   Additional logging is triggered, and the conflict statistics are collected (displayed in the
+   <link linkend="monitoring-pg-stat-subscription-stats"><structname>pg_stat_subscription_stats</structname></link> view)
+   in the following <firstterm>conflict</firstterm> cases:
    <variablelist>
-    <varlistentry>
+    <varlistentry id="conflict-insert-exists" xreflabel="insert_exists">
      <term><literal>insert_exists</literal></term>
      <listitem>
       <para>
@@ -1598,7 +1599,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-origin-differs" xreflabel="update_origin_differs">
      <term><literal>update_origin_differs</literal></term>
      <listitem>
       <para>
@@ -1610,7 +1611,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-exists" xreflabel="update_exists">
      <term><literal>update_exists</literal></term>
      <listitem>
       <para>
@@ -1627,7 +1628,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-update-missing" xreflabel="update_missing">
      <term><literal>update_missing</literal></term>
      <listitem>
       <para>
@@ -1636,7 +1637,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-delete-origin-differs" xreflabel="delete_origin_differs">
      <term><literal>delete_origin_differs</literal></term>
      <listitem>
       <para>
@@ -1648,7 +1649,7 @@ test_sub=# SELECT * FROM t1 ORDER BY id;
       </para>
      </listitem>
     </varlistentry>
-    <varlistentry>
+    <varlistentry id="conflict-delete-missing" xreflabel="delete_missing">
      <term><literal>delete_missing</literal></term>
      <listitem>
       <para>
index 55417a6fa9d1981fab9be5142e077da39800cd77..933de6fe07f8786dc4a5f434b55c804315a6edf5 100644 (file)
@@ -507,7 +507,7 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
 
      <row>
       <entry><structname>pg_stat_subscription_stats</structname><indexterm><primary>pg_stat_subscription_stats</primary></indexterm></entry>
-      <entry>One row per subscription, showing statistics about errors.
+      <entry>One row per subscription, showing statistics about errors and conflicts.
       See <link linkend="monitoring-pg-stat-subscription-stats">
       <structname>pg_stat_subscription_stats</structname></link> for details.
       </entry>
@@ -2157,7 +2157,10 @@ description | Waiting for a newly initialized WAL file to reach durable storage
        <structfield>apply_error_count</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of times an error occurred while applying changes
+       Number of times an error occurred while applying changes. Note that any
+       conflict resulting in an apply error will be counted in both
+       <literal>apply_error_count</literal> and the corresponding conflict
+       count (e.g., <literal>confl_*</literal>).
       </para></entry>
      </row>
 
@@ -2171,6 +2174,76 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_insert_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a row insertion violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes. See <xref linkend="conflict-insert-exists"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times an update was applied to a row that had been previously
+       modified by another source during the application of changes. See
+       <xref linkend="conflict-update-origin-differs"/> for details about this
+       conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_exists</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times that an updated row value violated a
+       <literal>NOT DEFERRABLE</literal> unique constraint during the
+       application of changes. See <xref linkend="conflict-update-exists"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was not found during the
+       application of changes. See <xref linkend="conflict-update-missing"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_origin_differs</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times a delete operation was applied to row that had been
+       previously modified by another source during the application of changes.
+       See <xref linkend="conflict-delete-origin-differs"/> for details about
+       this conflict.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_delete_missing</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be deleted was not found during the application
+       of changes. See <xref linkend="conflict-delete-missing"/> for details
+       about this conflict.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
index 19cabc9a47fa0872a5810fd8444dbf522dc7d595..7fd5d256a18cda1fd1a6949f4bdcbf607b599d77 100644 (file)
@@ -1365,6 +1365,12 @@ CREATE VIEW pg_stat_subscription_stats AS
         s.subname,
         ss.apply_error_count,
         ss.sync_error_count,
+        ss.confl_insert_exists,
+        ss.confl_update_origin_differs,
+        ss.confl_update_exists,
+        ss.confl_update_missing,
+        ss.confl_delete_origin_differs,
+        ss.confl_delete_missing,
         ss.stats_reset
     FROM pg_subscription as s,
          pg_stat_get_subscription_stats(s.oid) as ss;
index a1437d4f770861e133e31060b6459b3e3244053b..5d9ff626bdefd6eaafde878584a0cd2b1b634171 100644 (file)
@@ -17,8 +17,9 @@
 #include "access/commit_ts.h"
 #include "access/tableam.h"
 #include "executor/executor.h"
+#include "pgstat.h"
 #include "replication/conflict.h"
-#include "replication/logicalrelation.h"
+#include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 #include "utils/lsyscache.h"
 
@@ -114,6 +115,8 @@ ReportApplyConflict(EState *estate, ResultRelInfo *relinfo, int elevel,
    Assert(!OidIsValid(indexoid) ||
           CheckRelationOidLockedByMe(indexoid, RowExclusiveLock, true));
 
+   pgstat_report_subscription_conflict(MySubscription->oid, type);
+
    ereport(elevel,
            errcode_apply_conflict(type),
            errmsg("conflict detected on relation \"%s.%s\": conflict=%s",
index d9af8de6587e89650e22297308ae483289d5b881..e06c92727e990abaf06ad4f6b9b08be840f2c6d8 100644 (file)
@@ -39,6 +39,21 @@ pgstat_report_subscription_error(Oid subid, bool is_apply_error)
        pending->sync_error_count++;
 }
 
+/*
+ * Report a subscription conflict.
+ */
+void
+pgstat_report_subscription_conflict(Oid subid, ConflictType type)
+{
+   PgStat_EntryRef *entry_ref;
+   PgStat_BackendSubEntry *pending;
+
+   entry_ref = pgstat_prep_pending_entry(PGSTAT_KIND_SUBSCRIPTION,
+                                         InvalidOid, subid, NULL);
+   pending = entry_ref->pending;
+   pending->conflict_count[type]++;
+}
+
 /*
  * Report creating the subscription.
  */
@@ -101,6 +116,8 @@ pgstat_subscription_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
 #define SUB_ACC(fld) shsubent->stats.fld += localent->fld
    SUB_ACC(apply_error_count);
    SUB_ACC(sync_error_count);
+   for (int i = 0; i < CONFLICT_NUM_TYPES; i++)
+       SUB_ACC(conflict_count[i]);
 #undef SUB_ACC
 
    pgstat_unlock_entry(entry_ref);
index 3221137123703c5d3c0a874b7ddb5bd161e13398..97dc09ac0d919dc79ba86580898676eff7377c63 100644 (file)
@@ -1966,13 +1966,14 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS    4
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS    10
    Oid         subid = PG_GETARG_OID(0);
    TupleDesc   tupdesc;
    Datum       values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
    bool        nulls[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
    PgStat_StatSubEntry *subentry;
    PgStat_StatSubEntry allzero;
+   int         i = 0;
 
    /* Get subscription stats */
    subentry = pgstat_fetch_stat_subscription(subid);
@@ -1985,7 +1986,19 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
                       INT8OID, -1, 0);
    TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count",
                       INT8OID, -1, 0);
-   TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset",
+   TupleDescInitEntry(tupdesc, (AttrNumber) 4, "confl_insert_exists",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 5, "confl_update_origin_differs",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 10, "stats_reset",
                       TIMESTAMPTZOID, -1, 0);
    BlessTupleDesc(tupdesc);
 
@@ -1997,19 +2010,25 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
    }
 
    /* subid */
-   values[0] = ObjectIdGetDatum(subid);
+   values[i++] = ObjectIdGetDatum(subid);
 
    /* apply_error_count */
-   values[1] = Int64GetDatum(subentry->apply_error_count);
+   values[i++] = Int64GetDatum(subentry->apply_error_count);
 
    /* sync_error_count */
-   values[2] = Int64GetDatum(subentry->sync_error_count);
+   values[i++] = Int64GetDatum(subentry->sync_error_count);
+
+   /* conflict count */
+   for (int nconflict = 0; nconflict < CONFLICT_NUM_TYPES; nconflict++)
+       values[i++] = Int64GetDatum(subentry->conflict_count[nconflict]);
 
    /* stats_reset */
    if (subentry->stat_reset_timestamp == 0)
-       nulls[3] = true;
+       nulls[i] = true;
    else
-       values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+       values[i] = TimestampTzGetDatum(subentry->stat_reset_timestamp);
+
+   Assert(i + 1 == PG_STAT_GET_SUBSCRIPTION_STATS_COLS);
 
    /* Returns the record as Datum */
    PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
index 1980d492c3dba3b5904d80970655bd0feb41be43..be6815593b29063d68b18dab14804e0f29a063f8 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202408301
+#define CATALOG_VERSION_NO 202409041
 
 #endif
index 85f42be1b3cdfc65a852e8ff02dcd9bf32e3024e..ff5436acacfaf30a20e67082e3381c99b2d577d9 100644 (file)
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
index f63159c55ca631b447a92ccf466abee480244599..be2c91168a1b6ef8efdc78270d34b7487abca4ce 100644 (file)
@@ -15,6 +15,7 @@
 #include "datatype/timestamp.h"
 #include "portability/instr_time.h"
 #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */
+#include "replication/conflict.h"
 #include "utils/backend_progress.h" /* for backward compatibility */
 #include "utils/backend_status.h"  /* for backward compatibility */
 #include "utils/relcache.h"
@@ -165,6 +166,7 @@ typedef struct PgStat_BackendSubEntry
 {
    PgStat_Counter apply_error_count;
    PgStat_Counter sync_error_count;
+   PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
 } PgStat_BackendSubEntry;
 
 /* ----------
@@ -423,6 +425,7 @@ typedef struct PgStat_StatSubEntry
 {
    PgStat_Counter apply_error_count;
    PgStat_Counter sync_error_count;
+   PgStat_Counter conflict_count[CONFLICT_NUM_TYPES];
    TimestampTz stat_reset_timestamp;
 } PgStat_StatSubEntry;
 
@@ -725,6 +728,7 @@ extern PgStat_SLRUStats *pgstat_fetch_slru(void);
  */
 
 extern void pgstat_report_subscription_error(Oid subid, bool is_apply_error);
+extern void pgstat_report_subscription_conflict(Oid subid, ConflictType type);
 extern void pgstat_create_subscription(Oid subid);
 extern void pgstat_drop_subscription(Oid subid);
 extern PgStat_StatSubEntry *pgstat_fetch_stat_subscription(Oid subid);
index ca797fb41c64af8e5812758097966b117ad45a3a..c759677ff54db3ea8399f26fb79a5885089472d2 100644 (file)
 
 /*
  * Conflict types that could occur while applying remote changes.
+ *
+ * This enum is used in statistics collection (see
+ * PgStat_StatSubEntry::conflict_count and
+ * PgStat_BackendSubEntry::conflict_count) as well, therefore, when adding new
+ * values or reordering existing ones, ensure to review and potentially adjust
+ * the corresponding statistics collection codes.
  */
 typedef enum
 {
@@ -42,6 +48,8 @@ typedef enum
     */
 } ConflictType;
 
+#define CONFLICT_NUM_TYPES (CT_DELETE_MISSING + 1)
+
 extern bool GetTupleTransactionInfo(TupleTableSlot *localslot,
                                    TransactionId *xmin,
                                    RepOriginId *localorigin,
index 862433ee52bab59c688523605e258c7752306f67..a1626f3fae91521988a8d44f93a7f94b2020f59f 100644 (file)
@@ -2139,9 +2139,15 @@ pg_stat_subscription_stats| SELECT ss.subid,
     s.subname,
     ss.apply_error_count,
     ss.sync_error_count,
+    ss.confl_insert_exists,
+    ss.confl_update_origin_differs,
+    ss.confl_update_exists,
+    ss.confl_update_missing,
+    ss.confl_delete_origin_differs,
+    ss.confl_delete_missing,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
index fb3e5629b3c4370be59ea78199f34cc85e7c6581..6b6a5b0b1b6251450d35c3ed5cd2b9214564f6c0 100644 (file)
@@ -30,6 +30,7 @@ sub create_sub_pub_w_errors
        qq[
    BEGIN;
    CREATE TABLE $table_name(a int);
+   ALTER TABLE $table_name REPLICA IDENTITY FULL;
    INSERT INTO $table_name VALUES (1);
    COMMIT;
    ]);
@@ -91,20 +92,36 @@ sub create_sub_pub_w_errors
    # subscriber due to violation of the unique constraint on test table.
    $node_publisher->safe_psql($db, qq(INSERT INTO $table_name VALUES (1)));
 
-   # Wait for the apply error to be reported.
+   # Wait for the subscriber to report both an apply error and an
+   # insert_exists conflict.
    $node_subscriber->poll_query_until(
        $db,
        qq[
-   SELECT apply_error_count > 0
+   SELECT apply_error_count > 0 AND confl_insert_exists > 0
    FROM pg_stat_subscription_stats
    WHERE subname = '$sub_name'
    ])
      or die
-     qq(Timed out while waiting for apply error for subscription '$sub_name');
+     qq(Timed out while waiting for apply error and insert_exists conflict for subscription '$sub_name');
 
    # Truncate test table so that apply worker can continue.
    $node_subscriber->safe_psql($db, qq(TRUNCATE $table_name));
 
+   # Delete data from the test table on the publisher. This delete operation
+   # should be skipped on the subscriber since the table is already empty.
+   $node_publisher->safe_psql($db, qq(DELETE FROM $table_name;));
+
+   # Wait for the subscriber to report a delete_missing conflict.
+   $node_subscriber->poll_query_until(
+       $db,
+       qq[
+   SELECT confl_delete_missing > 0
+   FROM pg_stat_subscription_stats
+   WHERE subname = '$sub_name'
+   ])
+     or die
+     qq(Timed out while waiting for delete_missing conflict for subscription '$sub_name');
+
    return ($pub_name, $sub_name);
 }
 
@@ -123,17 +140,19 @@ my ($pub1_name, $sub1_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
    $table1_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
        $db,
        qq(SELECT apply_error_count > 0,
    sync_error_count > 0,
+   confl_insert_exists > 0,
+   confl_delete_missing > 0,
    stats_reset IS NULL
    FROM pg_stat_subscription_stats
    WHERE subname = '$sub1_name')
    ),
-   qq(t|t|t),
-   qq(Check that apply errors and sync errors are both > 0 and stats_reset is NULL for subscription '$sub1_name'.)
+   qq(t|t|t|t|t),
+   qq(Check that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for subscription '$sub1_name'.)
 );
 
 # Reset a single subscription
@@ -141,17 +160,19 @@ $node_subscriber->safe_psql($db,
    qq(SELECT pg_stat_reset_subscription_stats((SELECT subid FROM pg_stat_subscription_stats WHERE subname = '$sub1_name')))
 );
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
        $db,
        qq(SELECT apply_error_count = 0,
    sync_error_count = 0,
+   confl_insert_exists = 0,
+   confl_delete_missing = 0,
    stats_reset IS NOT NULL
    FROM pg_stat_subscription_stats
    WHERE subname = '$sub1_name')
    ),
-   qq(t|t|t),
-   qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
+   qq(t|t|t|t|t),
+   qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL after reset for subscription '$sub1_name'.)
 );
 
 # Get reset timestamp
@@ -181,46 +202,52 @@ my ($pub2_name, $sub2_name) =
   create_sub_pub_w_errors($node_publisher, $node_subscriber, $db,
    $table2_name);
 
-# Apply and Sync errors are > 0 and reset timestamp is NULL
+# Apply errors, sync errors, and conflicts are > 0 and stats_reset timestamp is NULL
 is( $node_subscriber->safe_psql(
        $db,
        qq(SELECT apply_error_count > 0,
    sync_error_count > 0,
+   confl_insert_exists > 0,
+   confl_delete_missing > 0,
    stats_reset IS NULL
    FROM pg_stat_subscription_stats
    WHERE subname = '$sub2_name')
    ),
-   qq(t|t|t),
-   qq(Confirm that apply errors and sync errors are both > 0 and stats_reset is NULL for sub '$sub2_name'.)
+   qq(t|t|t|t|t),
+   qq(Confirm that apply errors, sync errors, and conflicts are > 0 and stats_reset is NULL for sub '$sub2_name'.)
 );
 
 # Reset all subscriptions
 $node_subscriber->safe_psql($db,
    qq(SELECT pg_stat_reset_subscription_stats(NULL)));
 
-# Apply and Sync errors are 0 and stats reset is not NULL
+# Apply errors, sync errors, and conflicts are 0 and stats_reset timestamp is not NULL
 is( $node_subscriber->safe_psql(
        $db,
        qq(SELECT apply_error_count = 0,
    sync_error_count = 0,
+   confl_insert_exists = 0,
+   confl_delete_missing = 0,
    stats_reset IS NOT NULL
    FROM pg_stat_subscription_stats
    WHERE subname = '$sub1_name')
    ),
-   qq(t|t|t),
-   qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
+   qq(t|t|t|t|t),
+   qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub1_name' after reset.)
 );
 
 is( $node_subscriber->safe_psql(
        $db,
        qq(SELECT apply_error_count = 0,
    sync_error_count = 0,
+   confl_insert_exists = 0,
+   confl_delete_missing = 0,
    stats_reset IS NOT NULL
    FROM pg_stat_subscription_stats
    WHERE subname = '$sub2_name')
    ),
-   qq(t|t|t),
-   qq(Confirm that apply errors and sync errors are both 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
+   qq(t|t|t|t|t),
+   qq(Confirm that apply errors, sync errors, and conflicts are 0 and stats_reset is not NULL for sub '$sub2_name' after reset.)
 );
 
 $reset_time1 = $node_subscriber->safe_psql($db,