Feature: Add new PCP command to invalidate query cache.
authorTatsuo Ishii <[email protected]>
Mon, 14 Oct 2024 03:56:15 +0000 (12:56 +0900)
committerTatsuo Ishii <[email protected]>
Mon, 14 Oct 2024 03:56:15 +0000 (12:56 +0900)
Previously it was not possible to invalidate query cache without
restarting pgpool.  This commit adds new PCP command
"pcp_invalidate_query_cache" to invalidate query cache without
restarting pgpool. Note this command only places a query cache
invalidate request on shared the shared memory. The actual
invalidation is performed by pgpool child process.

The reasons for the PCP process cannot remove cache directly are:
1) the connection handle to memcached server is not managed by PCP
process.

2) removing shared memory query cache needs an interlock using
pool_shmem_ock() which may not work well on PCP process. Also a
function used here (pool_clear_memory_cache()) uses PG_TRY, which is
only usable in pgpool child process.

If pgpool child process finds such a request, the process invalidates
all query cache on the shared memory. If the query cache storage is
memcached, then pgpool issues memcached_flush() so that all query
cache on memcached are flushed immediately.

Note that the timing for pgpool child process to check the
invalidation request is after processing current query or response
from backend. This means that if all pgpool child process sit idle,
the request will not be processed until any of them receives a
messages from either frontend or backend.

Another note is, about query cache statistics shown by "show
pool_cache" command. Since the cache invalidation does not clear the
statistics, some of them (num_cache_hits and num_selects) continue to
increase even after the cache invalidation. Initializing the
statistics at the same could be possible but I am not sure if all
users want to do it.

Discussion:https://www.pgpool.net/pipermail/pgpool-hackers/2024-October/004525.html

15 files changed:
doc.ja/src/sgml/ref/allfiles.sgml
doc.ja/src/sgml/reference.sgml
doc/src/sgml/ref/allfiles.sgml
doc/src/sgml/reference.sgml
src/include/pcp/libpcp_ext.h
src/include/pool.h
src/include/query_cache/pool_memqcache.h
src/libs/pcp/pcp.c
src/pcp_con/pcp_worker.c
src/protocol/CommandComplete.c
src/protocol/pool_process_query.c
src/query_cache/pool_memqcache.c
src/test/regression/tests/006.memqcache/test.sh
src/tools/pcp/Makefile.am
src/tools/pcp/pcp_frontend_client.c

index 69fbe1efe1f20ca1fcd794191e5caea88e1825a5..6764e8f9cd30e2c86632cf9055c3fda30e5b6097 100644 (file)
@@ -19,6 +19,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pcpRecoveryNode     SYSTEM "pcp_recovery_node.sgml">
 <!ENTITY pcpReloadConfig     SYSTEM "pcp_reload_config.sgml">
 <!ENTITY pcpLogRotate        SYSTEM "pcp_log_rotate.sgml">
+<!ENTITY pcpInvalidateCache  SYSTEM "pcp_invalidate_query_cache.sgml">
 <!ENTITY pgMd5               SYSTEM "pg_md5.sgml">
 <!ENTITY pgEnc               SYSTEM "pg_enc.sgml">
 <!ENTITY wdCli               SYSTEM "wd_cli.sgml">
index 05e4d87b5ca72322763992cd18d67a4c8c7eff12..548b05f24b17f7d8adef443e7cdad485f9f11a8a 100644 (file)
   &pcpReloadConfig;
   &pcpRecoveryNode;
   &pcpLogRotate;
+  &pcpInvalidateCache;
 
  </reference>
 
index 69fbe1efe1f20ca1fcd794191e5caea88e1825a5..6764e8f9cd30e2c86632cf9055c3fda30e5b6097 100644 (file)
@@ -19,6 +19,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY pcpRecoveryNode     SYSTEM "pcp_recovery_node.sgml">
 <!ENTITY pcpReloadConfig     SYSTEM "pcp_reload_config.sgml">
 <!ENTITY pcpLogRotate        SYSTEM "pcp_log_rotate.sgml">
+<!ENTITY pcpInvalidateCache  SYSTEM "pcp_invalidate_query_cache.sgml">
 <!ENTITY pgMd5               SYSTEM "pg_md5.sgml">
 <!ENTITY pgEnc               SYSTEM "pg_enc.sgml">
 <!ENTITY wdCli               SYSTEM "wd_cli.sgml">
index 30be90bc5522446bb01a8993857f36f30b28b6a3..ffc7ddb8674be8f48600d8ea5017558cea627a28 100644 (file)
   &pcpReloadConfig;
   &pcpRecoveryNode;
   &pcpLogRotate;
+  &pcpInvalidateCache;
 
  </reference>
 
index 32dfcb0033790f3ab521efd0bf0936b45e752cff..232dc8143033f67aa76e517c8c614eb5b8aa1896 100644 (file)
@@ -4,7 +4,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2023     PgPool Global Development Group
+ * Copyright (c) 2003-2024     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -382,6 +382,7 @@ extern PCPResultInfo * pcp_process_count(PCPConnInfo * pcpConn);
 extern PCPResultInfo * pcp_process_info(PCPConnInfo * pcpConn, int pid);
 extern PCPResultInfo * pcp_reload_config(PCPConnInfo * pcpConn,char command_scope);
 extern PCPResultInfo * pcp_log_rotate(PCPConnInfo * pcpConn,char command_scope);
+extern PCPResultInfo * pcp_invalidate_query_cache(PCPConnInfo * pcpConn);
 
 extern PCPResultInfo * pcp_detach_node(PCPConnInfo * pcpConn, int nid);
 extern PCPResultInfo * pcp_detach_node_gracefully(PCPConnInfo * pcpConn, int nid);
index 8fa429fa70bf1eb6ec81c0e536df2b901e187863..42393f828d874e58e23dd41315b4502cff67f017 100644 (file)
@@ -454,6 +454,7 @@ typedef struct
        bool            follow_primary_lock_held_remotely; /* true when lock is held by
                                                                                                        watchdog coordinator*/
        bool            follow_primary_ongoing; /* true if follow primary command is ongoing */
+       bool            query_cache_invalidate_request; /* true if pcp_invalidate_query_cache requested */
 }                      POOL_REQUEST_INFO;
 
 /* description of row. corresponding to RowDescription message */
index 6e94d3479b5513cf1403efcdeed62eb93f8e61d1..5a0a1fce92573b701c494b7a55c281196e8e5392 100644 (file)
@@ -305,6 +305,6 @@ extern void InvalidateQueryCache(int tableoid, int dboid);
 
 extern void pool_init_whole_cache_blocks(void);
 
-extern int delete_all_cache_on_memcached(void);
+extern void clear_query_cache(void);
 
 #endif                                                 /* POOL_MEMQCACHE_H */
index 8b4b569d46c6e1944e44f6fbb791ed83e9f425da..01d822a9431ddada336c481f1aa43163021fe18b 100644 (file)
@@ -8,7 +8,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2021     PgPool Global Development Group
+ * Copyright (c) 2003-2024     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -390,132 +390,140 @@ static PCPResultInfo * process_pcp_response(PCPConnInfo * pcpConn, char sentMsg)
 
                switch (toc)
                {
-                       case 'r':                       /* Authentication Response */
-                               {
-                                       if (sentMsg != 'R')
-                                       {
-                                               setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
-                                       }
-                                       else if (strcmp(buf, "AuthenticationOK") == 0)
-                                       {
-                                               pcpConn->connState = PCP_CONNECTION_OK;
-                                               setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
-                                       }
-                                       else
-                                       {
-                                               pcp_internal_error(pcpConn,
-                                                                                  "ERROR: authentication failed. reason=\"%s\"", buf);
-                                               setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
-                                       }
-                               }
+                       case 'a':                       /* set configuration parameter */
+                               if (sentMsg != 'A')
+                                       setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
+                               else
+                                       process_command_complete_response(pcpConn, buf, rsize);
                                break;
-                       case 'm':
-                               if (sentMsg != 'M')
+
+                       case 'b':                       /* status request */
+                               if (sentMsg != 'B')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_salt_info_response(pcpConn, buf, rsize);
+                                       process_pool_status_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'E':
-                               setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
-                               process_error_response(pcpConn, toc, buf);
+                       case 'c':                       /* attach node */
+                               if (sentMsg != 'C' && sentMsg != 'O')
+                                       setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
+                               else
+                                       process_command_complete_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'N':
+                       case 'd':                       /* detach node */
+                               if (sentMsg != 'D' && sentMsg != 'J')
+                                       setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
+                               else
+                                       process_command_complete_response(pcpConn, buf, rsize);
+                               break;
+
+                       case 'E':                       /* error */
+                               setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
                                process_error_response(pcpConn, toc, buf);
-                               pfree(buf);
-                               continue;
                                break;
 
-                       case 'i':
-                               if (sentMsg != 'I')
+                       case 'g':                       /* invalidate query cache */
+                               if (sentMsg != 'G')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_node_info_response(pcpConn, buf, rsize);
+                                       process_command_complete_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'h':
+                       case 'h':                       /* health check stats */
                                if (sentMsg != 'H')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
                                        process_health_check_stats_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'l':
-                               if (sentMsg != 'L')
+                       case 'i':                       /* node info */
+                               if (sentMsg != 'I')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_pcp_node_count_response(pcpConn, buf, rsize);
+                                       process_node_info_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'c':
-                               if (sentMsg != 'C' && sentMsg != 'O')
+                       case 'l':                       /* node count */
+                               if (sentMsg != 'L')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_command_complete_response(pcpConn, buf, rsize);
+                                       process_pcp_node_count_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'd':
-                               if (sentMsg != 'D' && sentMsg != 'J')
+                       case 'm':                       /* salt info response */
+                               if (sentMsg != 'M')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_command_complete_response(pcpConn, buf, rsize);
+                                       process_salt_info_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'a':
-                               if (sentMsg != 'A')
-                                       setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
-                               else
-                                       process_command_complete_response(pcpConn, buf, rsize);
+                       case 'N':                       /* error response */
+                               process_error_response(pcpConn, toc, buf);
+                               pfree(buf);
+                               continue;
                                break;
 
-                       case 'z':
-                               if (sentMsg != 'Z')
+                       case 'n':                       /* process count */
+                               if (sentMsg != 'N')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_command_complete_response(pcpConn, buf, rsize);
+                                       process_process_count_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'v':                       /* pcp_log_rotate */
-                               if (sentMsg != 'V')
+                       case 'p':                       /* process info */
+                               if (sentMsg != 'P')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_command_complete_response(pcpConn, buf, rsize);
+                                       process_process_info_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'w':
-                               if (sentMsg != 'W')
-                                       setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
-                               else
-                                       process_watchdog_info_response(pcpConn, buf, rsize);
+                       case 'r':                       /* Authentication Response */
+                               {
+                                       if (sentMsg != 'R')
+                                       {
+                                               setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
+                                       }
+                                       else if (strcmp(buf, "AuthenticationOK") == 0)
+                                       {
+                                               pcpConn->connState = PCP_CONNECTION_OK;
+                                               setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
+                                       }
+                                       else
+                                       {
+                                               pcp_internal_error(pcpConn,
+                                                                                  "ERROR: authentication failed. reason=\"%s\"", buf);
+                                               setResultStatus(pcpConn, PCP_RES_BACKEND_ERROR);
+                                       }
+                               }
                                break;
 
-                       case 'p':
-                               if (sentMsg != 'P')
+                       case 't':                       /* shutdown request */
+                               if (sentMsg != 'T')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_process_info_response(pcpConn, buf, rsize);
+                                       setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
                                break;
 
-                       case 'n':
-                               if (sentMsg != 'N')
+                       case 'v':                       /* pcp_log_rotate */
+                               if (sentMsg != 'V')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_process_count_response(pcpConn, buf, rsize);
+                                       process_command_complete_response(pcpConn, buf, rsize);
                                break;
 
-                       case 'b':
-                               if (sentMsg != 'B')
+                       case 'w':                       /* watchdog info */
+                               if (sentMsg != 'W')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       process_pool_status_response(pcpConn, buf, rsize);
+                                       process_watchdog_info_response(pcpConn, buf, rsize);
                                break;
 
-                       case 't':
-                               if (sentMsg != 'T')
+                       case 'z':                       /* command complete */
+                               if (sentMsg != 'Z')
                                        setResultStatus(pcpConn, PCP_RES_BAD_RESPONSE);
                                else
-                                       setResultStatus(pcpConn, PCP_RES_COMMAND_OK);
+                                       process_command_complete_response(pcpConn, buf, rsize);
                                break;
 
                        default:
@@ -959,6 +967,28 @@ pcp_log_rotate(PCPConnInfo * pcpConn,char command_scope)
        return process_pcp_response(pcpConn, 'V');
 }
 
+PCPResultInfo *
+pcp_invalidate_query_cache(PCPConnInfo * pcpConn)
+{
+       int                     wsize;
+
+       if (PCPConnectionStatus(pcpConn) != PCP_CONNECTION_OK)
+       {
+          pcp_internal_error(pcpConn, "invalid PCP connection");
+          return NULL;
+       }
+
+       pcp_write(pcpConn->pcpConn, "G", 1);
+       wsize = htonl(sizeof(int));
+       pcp_write(pcpConn->pcpConn, &wsize, sizeof(int));
+       if (PCPFlush(pcpConn) < 0)
+          return NULL;
+       if (pcpConn->Pfdebug)
+          fprintf(pcpConn->Pfdebug, "DEBUG: send: tos=\"G\", len=%d\n", ntohl(wsize));
+
+       return process_pcp_response(pcpConn, 'G');
+}
+
 /*
  * Process health check response from PCP server.
  * pcpConn: connection to the server
index 02f76eecb60ad62363ee3a220a99e5f3b2357654..de2658d5e2b657b75c37ecd3025fdaae4956d7e9 100644 (file)
@@ -4,7 +4,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2022     PgPool Global Development Group
+ * Copyright (c) 2003-2024     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -93,6 +93,7 @@ static void process_status_request(PCP_CONNECTION * frontend);
 static void process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos);
 static void process_shutdown_request(PCP_CONNECTION * frontend, char mode, char tos);
 static void process_set_configuration_parameter(PCP_CONNECTION * frontend, char *buf, int len);
+static void process_invalidate_query_cache(PCP_CONNECTION * frontend);
 
 static void pcp_worker_will_go_down(int code, Datum arg);
 
@@ -268,9 +269,30 @@ pcp_process_command(char tos, char *buf, int buf_len)
                        process_set_configuration_parameter(pcp_frontend, buf, buf_len);
                        break;
 
-               case 'L':                               /* node count */
-                       set_ps_display("PCP: processing node count request", false);
-                       inform_node_count(pcp_frontend);
+               case 'B':                               /* status request */
+                       set_ps_display("PCP: processing status request request", false);
+                       process_status_request(pcp_frontend);
+                       break;
+
+               case 'C':                               /* attach node */
+                       set_ps_display("PCP: processing attach node request", false);
+                       process_attach_node(pcp_frontend, buf);
+                       break;
+
+               case 'D':                               /* detach node */
+               case 'd':                               /* detach node gracefully */
+                       set_ps_display("PCP: processing detach node request", false);
+                       process_detach_node(pcp_frontend, buf, tos);
+                       break;
+
+               case 'F':
+                       ereport(DEBUG1,
+                                       (errmsg("PCP processing request, stop online recovery")));
+                       break;
+
+               case 'G':                               /* invalidate query cache */
+                       set_ps_display("PCP: processing invalidate query cache request", false);
+                       process_invalidate_query_cache(pcp_frontend);
                        break;
 
                case 'H':                               /* health check stats */
@@ -283,30 +305,30 @@ pcp_process_command(char tos, char *buf, int buf_len)
                        inform_node_info(pcp_frontend, buf);
                        break;
 
-               case 'N':                               /* process count */
-                       set_ps_display("PCP: processing process count request", false);
-                       inform_process_count(pcp_frontend);
+               case 'J':                               /* promote node */
+               case 'j':                               /* promote node gracefully */
+                       set_ps_display("PCP: processing promote node request", false);
+                       process_promote_node(pcp_frontend, buf, tos);
                        break;
 
-               case 'P':                               /* process info */
-                       set_ps_display("PCP: processing process info request", false);
-                       inform_process_info(pcp_frontend, buf);
+               case 'L':                               /* node count */
+                       set_ps_display("PCP: processing node count request", false);
+                       inform_node_count(pcp_frontend);
                        break;
 
-               case 'W':                               /* watchdog info */
-                       set_ps_display("PCP: processing watchdog info request", false);
-                       inform_watchdog_info(pcp_frontend, buf);
+               case 'N':                               /* process count */
+                       set_ps_display("PCP: processing process count request", false);
+                       inform_process_count(pcp_frontend);
                        break;
 
-               case 'D':                               /* detach node */
-               case 'd':                               /* detach node gracefully */
-                       set_ps_display("PCP: processing detach node request", false);
-                       process_detach_node(pcp_frontend, buf, tos);
+               case 'O':                               /* recovery request */
+                       set_ps_display("PCP: processing recovery request", false);
+                       process_recovery_request(pcp_frontend, buf);
                        break;
 
-               case 'C':                               /* attach node */
-                       set_ps_display("PCP: processing attach node request", false);
-                       process_attach_node(pcp_frontend, buf);
+               case 'P':                               /* process info */
+                       set_ps_display("PCP: processing process info request", false);
+                       inform_process_info(pcp_frontend, buf);
                        break;
 
                case 'T':
@@ -315,35 +337,19 @@ pcp_process_command(char tos, char *buf, int buf_len)
                        process_shutdown_request(pcp_frontend, buf[0], tos);
                        break;
 
-               case 'O':                               /* recovery request */
-                       set_ps_display("PCP: processing recovery request", false);
-                       process_recovery_request(pcp_frontend, buf);
-                       break;
-
-               case 'B':                               /* status request */
-                       set_ps_display("PCP: processing status request request", false);
-                       process_status_request(pcp_frontend);
-                       break;
-
-               case 'Z':                               /*reload config file */
-                       set_ps_display("PCP: processing reload config request", false);
-                       process_reload_config(pcp_frontend, buf[0]);
-                       break;
-
                case 'V':                               /* log rotate */
                        set_ps_display("PCP: processing log rotation request", false);
                        process_log_rotate(pcp_frontend, buf[0]);
                        break;
 
-               case 'J':                               /* promote node */
-               case 'j':                               /* promote node gracefully */
-                       set_ps_display("PCP: processing promote node request", false);
-                       process_promote_node(pcp_frontend, buf, tos);
+               case 'W':                               /* watchdog info */
+                       set_ps_display("PCP: processing watchdog info request", false);
+                       inform_watchdog_info(pcp_frontend, buf);
                        break;
 
-               case 'F':
-                       ereport(DEBUG1,
-                                       (errmsg("PCP processing request, stop online recovery")));
+               case 'Z':                               /*reload config file */
+                       set_ps_display("PCP: processing reload config request", false);
+                       process_reload_config(pcp_frontend, buf[0]);
                        break;
 
                case 'X':                               /* disconnect */
@@ -1375,6 +1381,32 @@ process_promote_node(PCP_CONNECTION * frontend, char *buf, char tos)
        do_pcp_flush(frontend);
 }
 
+/*
+ * Process pcp_invalidate_query_cache
+ */
+static void
+process_invalidate_query_cache(PCP_CONNECTION * frontend)
+{
+       int                     wsize;
+       char            code[] = "CommandComplete";
+
+       if (!pool_config->memory_cache_enabled)
+               ereport(ERROR,
+                               (errmsg("query cache is not enabled")));
+
+       ereport(DEBUG1,
+                       (errmsg("PCP: processing invalidate query cache")));
+
+       /* Set query cache invalidation request flag */
+       Req_info->query_cache_invalidate_request = true;
+
+       pcp_write(frontend, "g", 1);
+       wsize = htonl(sizeof(code) + sizeof(int));
+       pcp_write(frontend, &wsize, sizeof(int));
+       pcp_write(frontend, code, sizeof(code));
+       do_pcp_flush(frontend);
+}
+
 static void
 process_authentication(PCP_CONNECTION * frontend, char *buf, char *salt, int *random_salt)
 {
index 92e1e99a9a2a96808ce1ee0ec279dc93cf2965d3..4b02b793d26aef6afb11f3a1ab069b3e8e20db51 100644 (file)
@@ -45,7 +45,6 @@ static int    forward_command_complete(POOL_CONNECTION * frontend, char *packet, in
 static int     forward_empty_query(POOL_CONNECTION * frontend, char *packet, int packetlen);
 static int     forward_packet_to_frontend(POOL_CONNECTION * frontend, char kind, char *packet, int packetlen);
 static void process_clear_cache(POOL_CONNECTION_POOL * backend);
-static void clear_query_cache(void);
 
 POOL_STATUS
 CommandComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, bool command_complete)
@@ -721,37 +720,3 @@ process_clear_cache(POOL_CONNECTION_POOL * backend)
                }
        }
 }
-
-/*
- * Clear query cache on shmem or memcached
- */
-static
-void clear_query_cache(void)
-{
-       /*
-        * Clear all the shared memory cache and oid maps.
-        */
-       if (pool_is_shmem_cache())
-       {
-               pool_clear_memory_cache();
-               ereport(LOG,
-                               (errmsg("all query cache in shared memory deleted")));
-       }
-       else
-#ifdef USE_MEMCACHED
-       {
-               /*
-                * Clear all the memcached cache and oid maps.
-                */
-               delete_all_cache_on_memcached();
-               pool_discard_oid_maps();
-               ereport(LOG,
-                               (errmsg("all query cache in memcached deleted")));
-       }
-#else
-       {
-               ereport(WARNING,
-                               (errmsg("failed to clear cache on memcached, memcached support is not enabled")));
-       }
-#endif
-}
index a17475ea2ea99f5802fb5830b82d29663e72fadb..d40f21fad6a9797a53b0d5bbf429552f53771d62 100644 (file)
@@ -466,6 +466,22 @@ pool_process_query(POOL_CONNECTION * frontend,
                                load_hba(get_hba_file_name());
                        got_sighup = 0;
                }
+
+               /*
+                * Process query cache invalidation request if any.
+                */
+               if (pool_config->memory_cache_enabled)
+               {
+                       volatile bool invalidate_request = Req_info->query_cache_invalidate_request;
+                       if (invalidate_request)
+                       {
+                               /*
+                                * Delete all query cache in shared memory or memcached.
+                                */
+                               clear_query_cache();
+                               Req_info->query_cache_invalidate_request = false;
+                       }
+               }
        }
 }
 
index 7ff8a80d972c37c942cabb647bcd531e3c23a776..de50729d1e488f0d1dd312c9a5e3f6305b9d20a8 100644 (file)
@@ -125,6 +125,7 @@ static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
 static void put_back_hash_element(volatile POOL_HASH_ELEMENT * element);
 static bool is_free_hash_element(void);
 static void inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen);
+static int delete_all_cache_on_memcached(void);
 
 /*
  * if true, shared memory is locked in this process now.
@@ -2160,7 +2161,7 @@ pool_clear_memory_cache(void)
 /*
  * delete all query cache on memcached
  */
-int
+static int
 delete_all_cache_on_memcached(void)
 {
        memcached_return rc;
@@ -2178,6 +2179,39 @@ delete_all_cache_on_memcached(void)
 }
 #endif
 
+/*
+ * Clear query cache on shmem or memcached
+ */
+void clear_query_cache(void)
+{
+       /*
+        * Clear all the shared memory cache and oid maps.
+        */
+       if (pool_is_shmem_cache())
+       {
+               pool_clear_memory_cache();
+               ereport(LOG,
+                               (errmsg("all query cache in shared memory deleted")));
+       }
+       else
+#ifdef USE_MEMCACHED
+       {
+               /*
+                * Clear all the memcached cache and oid maps.
+                */
+               delete_all_cache_on_memcached();
+               pool_discard_oid_maps();
+               ereport(LOG,
+                               (errmsg("all query cache in memcached deleted")));
+       }
+#else
+       {
+               ereport(WARNING,
+                               (errmsg("failed to clear cache on memcached, memcached support is not enabled")));
+       }
+#endif
+}
+
 /*
  * Return shared memory cache address
  */
index e7e2353b99b34afc4ca1e9d08a17f18b2b8baade..e915d8c5dc4431b1154d0df2ce584ecff59eb252 100755 (executable)
@@ -10,6 +10,7 @@ TESTDIR=testdir
 
 PSQL=$PGBIN/psql
 PGPROTO=$PGPOOL_INSTALL_DIR/bin/pgproto
+PCP_INVALIDATE_QUERY_CACHE=$PGPOOL_INSTALL_DIR/bin/pcp_invalidate_query_cache
 
 # remove error/notice details (message and so on) from
 # ErrorResponse or NoticeResponse messages.
@@ -459,6 +460,37 @@ EOF
            exit 1
        fi
        rm $log
+
+       cd $TESTDIR
+       ./startall
+       wait_for_pgpool_startup
+
+       # test for pcp_invalidate_query_cache
+       res1=`$PSQL -t -c "/*FORCE QUERY CACHE*/SELECT current_timestamp" test`
+       res2=`$PSQL -t -c "/*FORCE QUERY CACHE*/SELECT current_timestamp" test`
+       # make sure query cache created
+       if [ "$res1" != "$res2" ];then
+           echo "query cache was not created in pcp_invalidate_query_cache test"
+           ./shutdownall
+           exit 1
+       fi
+       # remove query cache
+       $PCP_INVALIDATE_QUERY_CACHE -p $PCP_PORT
+       if [ $? != 0 ];then
+           echo "pcp_invalidate_query_cache failed"
+           ./shutdownall
+           exit 1
+       fi
+       # make sure query cache has gone
+       res1=`$PSQL -t -c "/*FORCE QUERY CACHE*/SELECT current_timestamp" test`
+       if [ "$res1" = "$res2" ];then
+           echo "query cache was not invalidated"
+           ./shutdownall
+           exit 1
+       fi
+       ./shutdownall
+
+       cd ..
 done
 
 exit 0
index 53152f34c0446e6fa0f24776bf1616aace0c7927..3f82234b08eaa26a9f0eb74c8c7df18ef67499a1 100644 (file)
@@ -14,7 +14,8 @@ bin_PROGRAMS =  pcp_stop_pgpool \
                                pcp_pool_status \
                                pcp_watchdog_info \
                                pcp_reload_config \
-                               pcp_log_rotate
+                               pcp_log_rotate \
+                               pcp_invalidate_query_cache
 
 client_sources = pcp_frontend_client.c ../fe_memutils.c ../../utils/sprompt.c ../../utils/pool_path.c
 
@@ -46,3 +47,5 @@ pcp_reload_config_SOURCES = $(client_sources)
 pcp_reload_config_LDADD = $(libs_dir)/pcp/libpcp.la
 pcp_log_rotate_SOURCES = $(client_sources)
 pcp_log_rotate_LDADD = $(libs_dir)/pcp/libpcp.la
+pcp_invalidate_query_cache_SOURCES = $(client_sources)
+pcp_invalidate_query_cache_LDADD = $(libs_dir)/pcp/libpcp.la
index 930e2a811f02d013f980f26b2d708f7255f47c9d..a72b1327f74999667e26f427da3cc84eabe2ec63 100644 (file)
@@ -4,7 +4,7 @@
  * pgpool: a language independent connection pool server for PostgreSQL
  * written by Tatsuo Ishii
  *
- * Copyright (c) 2003-2022     PgPool Global Development Group
+ * Copyright (c) 2003-2024     PgPool Global Development Group
  *
  * Permission to use, copy, modify, and distribute this software and
  * its documentation for any purpose and without fee is hereby
@@ -58,18 +58,19 @@ typedef enum
 {
        PCP_ATTACH_NODE,
        PCP_DETACH_NODE,
+       PCP_HEALTH_CHECK_STATS,
+       PCP_INVALIDATE_QUERY_CACHE,
+       PCP_LOG_ROTATE,
        PCP_NODE_COUNT,
        PCP_NODE_INFO,
-       PCP_HEALTH_CHECK_STATS,
        PCP_POOL_STATUS,
        PCP_PROC_COUNT,
        PCP_PROC_INFO,
        PCP_PROMOTE_NODE,
        PCP_RECOVERY_NODE,
-       PCP_STOP_PGPOOL,
-       PCP_WATCHDOG_INFO,
        PCP_RELOAD_CONFIG,
-       PCP_LOG_ROTATE,
+       PCP_WATCHDOG_INFO,
+       PCP_STOP_PGPOOL,
        UNKNOWN,
 }                      PCP_UTILITIES;
 
@@ -97,6 +98,7 @@ struct AppTypes AllAppTypes[] =
        {"pcp_watchdog_info", PCP_WATCHDOG_INFO, "n:h:p:U:wWvd", "display a pgpool-II watchdog's information"},
        {"pcp_reload_config",PCP_RELOAD_CONFIG,"h:p:U:s:wWvd", "reload a pgpool-II config file"},
        {"pcp_log_rotate",PCP_LOG_ROTATE,"h:p:U:s:wWvd", "rotate the Pgpool-II's log file"},
+       {"pcp_invalidate_query_cache",PCP_INVALIDATE_QUERY_CACHE,"h:p:U:s:wWvd", "invalidate query cache"},
        {NULL, UNKNOWN, NULL, NULL},
 };
 struct AppTypes *current_app_type;
@@ -395,6 +397,11 @@ main(int argc, char **argv)
                        pcpResInfo = pcp_detach_node(pcpConn, nodeID);
        }
 
+       else if (current_app_type->app_type == PCP_LOG_ROTATE)
+       {
+               pcpResInfo = pcp_log_rotate(pcpConn,command_scope);
+       }
+
        else if (current_app_type->app_type == PCP_NODE_COUNT)
        {
                pcpResInfo = pcp_node_count(pcpConn);
@@ -410,6 +417,11 @@ main(int argc, char **argv)
                pcpResInfo = pcp_health_check_stats(pcpConn, nodeID);
        }
 
+       else if (current_app_type->app_type == PCP_INVALIDATE_QUERY_CACHE)
+       {
+               pcpResInfo = pcp_invalidate_query_cache(pcpConn);
+       }
+
        else if (current_app_type->app_type == PCP_POOL_STATUS)
        {
                pcpResInfo = pcp_pool_status(pcpConn);
@@ -438,6 +450,11 @@ main(int argc, char **argv)
                pcpResInfo = pcp_recovery_node(pcpConn, nodeID);
        }
 
+       else if (current_app_type->app_type == PCP_RELOAD_CONFIG)
+       {
+               pcpResInfo = pcp_reload_config(pcpConn,command_scope);
+       }
+
        else if (current_app_type->app_type == PCP_STOP_PGPOOL)
        {
                pcpResInfo = pcp_terminate_pgpool(pcpConn, shutdown_mode, command_scope);
@@ -448,16 +465,6 @@ main(int argc, char **argv)
                pcpResInfo = pcp_watchdog_info(pcpConn, nodeID);
        }
 
-       else if (current_app_type->app_type == PCP_RELOAD_CONFIG)
-       {
-               pcpResInfo = pcp_reload_config(pcpConn,command_scope);
-       }
-
-       else if (current_app_type->app_type == PCP_LOG_ROTATE)
-       {
-               pcpResInfo = pcp_log_rotate(pcpConn,command_scope);
-       }
-
        else
        {
                /* should never happen */