Add two attributes to pg_stat_database for parallel workers activity
authorMichael Paquier <[email protected]>
Mon, 11 Nov 2024 01:40:48 +0000 (10:40 +0900)
committerMichael Paquier <[email protected]>
Mon, 11 Nov 2024 01:40:48 +0000 (10:40 +0900)
Two attributes are added to pg_stat_database:
* parallel_workers_to_launch, counting the total number of parallel
workers that were planned to be launched.
* parallel_workers_launched, counting the total number of parallel
workers actually launched.

The ratio of both fields can provide hints that there are not enough
slots available when launching parallel workers, also useful when
pg_stat_statements is not deployed on an instance (i.e. cf54a2c00254).

This commit relies on de3a2ea3b264, that has added two fields to EState,
that get incremented when executing Gather or GatherMerge nodes.

A test is added in select_parallel, where parallel workers are spawned.

Bump catalog version.

Author: Benoit LobrĂ©au
Discussion: https://postgr.es/m/783bc7f7-659a-42fa-99dd-ee0565644e25@dalibo.com

doc/src/sgml/monitoring.sgml
src/backend/catalog/system_views.sql
src/backend/executor/execMain.c
src/backend/utils/activity/pgstat_database.c
src/backend/utils/adt/pgstatfuncs.c
src/include/catalog/catversion.h
src/include/catalog/pg_proc.dat
src/include/pgstat.h
src/test/regress/expected/rules.out
src/test/regress/expected/select_parallel.out
src/test/regress/sql/select_parallel.sql

index 331315f8d3c17660d47340bc4b09e89a0fb60ede..840d7f81615e28d8f4514d14157ab03c92e5f980 100644 (file)
@@ -3611,6 +3611,24 @@ description | Waiting for a newly initialized WAL file to reach durable storage
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>parallel_workers_to_launch</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of parallel workers planned to be launched by queries on this database
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>parallel_workers_launched</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of parallel workers launched by queries on this database
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
index 3456b821bc537c6833341538ab4bcce0ffb8a9c5..da9a8fe99f24879bdb5b272e0dd39423e27bb32e 100644 (file)
@@ -1073,6 +1073,8 @@ CREATE VIEW pg_stat_database AS
             pg_stat_get_db_sessions_abandoned(D.oid) AS sessions_abandoned,
             pg_stat_get_db_sessions_fatal(D.oid) AS sessions_fatal,
             pg_stat_get_db_sessions_killed(D.oid) AS sessions_killed,
+            pg_stat_get_db_parallel_workers_to_launch(D.oid) as parallel_workers_to_launch,
+            pg_stat_get_db_parallel_workers_launched(D.oid) as parallel_workers_launched,
             pg_stat_get_db_stat_reset_time(D.oid) AS stats_reset
     FROM (
         SELECT 0 AS oid, NULL::name AS datname
index cc9a594cba5772bf568e29bcc375bd85fd4c32d2..5ca856fd279a3e2ca0b3e668d372e90eb1373445 100644 (file)
@@ -52,6 +52,7 @@
 #include "miscadmin.h"
 #include "nodes/queryjumble.h"
 #include "parser/parse_relation.h"
+#include "pgstat.h"
 #include "rewrite/rewriteHandler.h"
 #include "tcop/utility.h"
 #include "utils/acl.h"
@@ -483,6 +484,10 @@ standard_ExecutorEnd(QueryDesc *queryDesc)
 
    Assert(estate != NULL);
 
+   if (estate->es_parallel_workers_to_launch > 0)
+       pgstat_update_parallel_workers_stats((PgStat_Counter) estate->es_parallel_workers_to_launch,
+                                            (PgStat_Counter) estate->es_parallel_workers_launched);
+
    /*
     * Check that ExecutorFinish was called, unless in EXPLAIN-only mode. This
     * Assert is needed because ExecutorFinish is new as of 9.1, and callers
index 29bc0909748030fc982626e1391356813bd94a0b..7757d2ace748b855926ed4915926e0b66556c598 100644 (file)
@@ -262,6 +262,23 @@ AtEOXact_PgStat_Database(bool isCommit, bool parallel)
    }
 }
 
+/*
+ * Notify the stats system about parallel worker information.
+ */
+void
+pgstat_update_parallel_workers_stats(PgStat_Counter workers_to_launch,
+                                    PgStat_Counter workers_launched)
+{
+   PgStat_StatDBEntry *dbentry;
+
+   if (!OidIsValid(MyDatabaseId))
+       return;
+
+   dbentry = pgstat_prep_database_pending(MyDatabaseId);
+   dbentry->parallel_workers_to_launch += workers_to_launch;
+   dbentry->parallel_workers_launched += workers_launched;
+}
+
 /*
  * Subroutine for pgstat_report_stat(): Handle xact commit/rollback and I/O
  * timings.
@@ -425,6 +442,8 @@ pgstat_database_flush_cb(PgStat_EntryRef *entry_ref, bool nowait)
    PGSTAT_ACCUM_DBCOUNT(sessions_abandoned);
    PGSTAT_ACCUM_DBCOUNT(sessions_fatal);
    PGSTAT_ACCUM_DBCOUNT(sessions_killed);
+   PGSTAT_ACCUM_DBCOUNT(parallel_workers_to_launch);
+   PGSTAT_ACCUM_DBCOUNT(parallel_workers_launched);
 #undef PGSTAT_ACCUM_DBCOUNT
 
    pgstat_unlock_entry(entry_ref);
index f7b50e0b5af6a20a8d80ebb3d8b0417069459ac8..60a397dc56120fd60eafb35c1f04343815dc2bf9 100644 (file)
@@ -1039,6 +1039,12 @@ PG_STAT_GET_DBENTRY_INT64(sessions_fatal)
 /* pg_stat_get_db_sessions_killed */
 PG_STAT_GET_DBENTRY_INT64(sessions_killed)
 
+/* pg_stat_get_db_parallel_workers_to_launch */
+PG_STAT_GET_DBENTRY_INT64(parallel_workers_to_launch)
+
+/* pg_stat_get_db_parallel_workers_launched */
+PG_STAT_GET_DBENTRY_INT64(parallel_workers_launched)
+
 /* pg_stat_get_db_temp_bytes */
 PG_STAT_GET_DBENTRY_INT64(temp_bytes)
 
index 86436e0356641138713affefa7c9283c0e7a7209..5dd91e190ae6ba76c2b9552e140ee87ff20c677b 100644 (file)
@@ -57,6 +57,6 @@
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202411081
+#define CATALOG_VERSION_NO 202411111
 
 #endif
index f23321a41f1b20b1a50691d9fa1bf1d77d50755c..cbbe8acd3826e6ed9f86721ace110802571027ab 100644 (file)
   proname => 'pg_stat_get_db_sessions_killed', provolatile => 's',
   proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
   prosrc => 'pg_stat_get_db_sessions_killed' },
+{ oid => '8403',
+  descr => 'statistics: number of parallel workers planned to be launched by queries',
+  proname => 'pg_stat_get_db_parallel_workers_to_launch', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_parallel_workers_to_launch' },
+{ oid => '8404',
+  descr => 'statistics: number of parallel workers effectively launched by queries',
+  proname => 'pg_stat_get_db_parallel_workers_launched', provolatile => 's',
+  proparallel => 'r', prorettype => 'int8', proargtypes => 'oid',
+  prosrc => 'pg_stat_get_db_parallel_workers_launched' },
 { oid => '3195', descr => 'statistics: information about WAL archiver',
   proname => 'pg_stat_get_archiver', proisstrict => 'f', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => '',
index df53fa2d4f94250be89c37adff90949f331bc622..59c28b4aca8cfc5165cf7fa8befc40469e673e66 100644 (file)
@@ -386,6 +386,8 @@ typedef struct PgStat_StatDBEntry
    PgStat_Counter sessions_abandoned;
    PgStat_Counter sessions_fatal;
    PgStat_Counter sessions_killed;
+   PgStat_Counter parallel_workers_to_launch;
+   PgStat_Counter parallel_workers_launched;
 
    TimestampTz stat_reset_timestamp;
 } PgStat_StatDBEntry;
@@ -583,6 +585,8 @@ extern void pgstat_report_deadlock(void);
 extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
 extern void pgstat_report_checksum_failure(void);
 extern void pgstat_report_connect(Oid dboid);
+extern void pgstat_update_parallel_workers_stats(PgStat_Counter workers_to_launch,
+                                                PgStat_Counter workers_launched);
 
 #define pgstat_count_buffer_read_time(n)                           \
    (pgStatBlockReadTime += (n))
index 2b47013f113e373e928d76197ba6cadcde79870e..3014d047fefc6488e60fa63d4908ee013cda5093 100644 (file)
@@ -1863,6 +1863,8 @@ pg_stat_database| SELECT oid AS datid,
     pg_stat_get_db_sessions_abandoned(oid) AS sessions_abandoned,
     pg_stat_get_db_sessions_fatal(oid) AS sessions_fatal,
     pg_stat_get_db_sessions_killed(oid) AS sessions_killed,
+    pg_stat_get_db_parallel_workers_to_launch(oid) AS parallel_workers_to_launch,
+    pg_stat_get_db_parallel_workers_launched(oid) AS parallel_workers_launched,
     pg_stat_get_db_stat_reset_time(oid) AS stats_reset
    FROM ( SELECT 0 AS oid,
             NULL::name AS datname
index d17ade278b28995c34a9b656e609add093b09f79..8c31f6460d339ddb66c7810113b171d55cf90630 100644 (file)
@@ -1,6 +1,17 @@
 --
 -- PARALLEL
 --
+-- Save parallel worker stats, used for comparison at the end
+select pg_stat_force_next_flush();
+ pg_stat_force_next_flush 
+--------------------------
+(1 row)
+
+select parallel_workers_to_launch as parallel_workers_to_launch_before,
+       parallel_workers_launched as parallel_workers_launched_before
+  from pg_stat_database
+  where datname = current_database() \gset
 create function sp_parallel_restricted(int) returns int as
   $$begin return $1; end$$ language plpgsql parallel restricted;
 begin;
@@ -1407,3 +1418,19 @@ CREATE UNIQUE INDEX parallel_hang_idx
 SET debug_parallel_query = on;
 DELETE FROM parallel_hang WHERE 380 <= i AND i <= 420;
 ROLLBACK;
+-- Check parallel worker stats
+select pg_stat_force_next_flush();
+ pg_stat_force_next_flush 
+--------------------------
+(1 row)
+
+select parallel_workers_to_launch > :'parallel_workers_to_launch_before'  AS wrk_to_launch,
+       parallel_workers_launched > :'parallel_workers_launched_before' AS wrk_launched
+  from pg_stat_database
+  where datname = current_database();
+ wrk_to_launch | wrk_launched 
+---------------+--------------
+ t             | t
+(1 row)
+
index 9ba1328fd2ef76eae6c9f1095fbc8555b1b691f5..5b4a6e1088fb6d7ae4cce2d9b7b6aad44a26e62c 100644 (file)
@@ -2,6 +2,13 @@
 -- PARALLEL
 --
 
+-- Save parallel worker stats, used for comparison at the end
+select pg_stat_force_next_flush();
+select parallel_workers_to_launch as parallel_workers_to_launch_before,
+       parallel_workers_launched as parallel_workers_launched_before
+  from pg_stat_database
+  where datname = current_database() \gset
+
 create function sp_parallel_restricted(int) returns int as
   $$begin return $1; end$$ language plpgsql parallel restricted;
 
@@ -574,3 +581,10 @@ SET debug_parallel_query = on;
 DELETE FROM parallel_hang WHERE 380 <= i AND i <= 420;
 
 ROLLBACK;
+
+-- Check parallel worker stats
+select pg_stat_force_next_flush();
+select parallel_workers_to_launch > :'parallel_workers_to_launch_before'  AS wrk_to_launch,
+       parallel_workers_launched > :'parallel_workers_launched_before' AS wrk_launched
+  from pg_stat_database
+  where datname = current_database();