Fixed kind mismatch error when deadlock error occured.
authorYoshiyuki Asaba <y-asaba at pgfoundry.org>
Wed, 11 Jul 2007 01:40:17 +0000 (01:40 +0000)
committerYoshiyuki Asaba <y-asaba at pgfoundry.org>
Wed, 11 Jul 2007 01:40:17 +0000 (01:40 +0000)
The problem is the following senario.

  A: BEGIN:
  B: BEGIN;
  A: LOCK TABLE t1 IN SHARE ROW EXCLUSIVE MODE;
  B: LOCK TABLE t2 IN SHARE ROW EXCLUSIVE MODE;
  A: LOCK TABLE t2 IN SHARE ROW EXCLUSIVE MODE;
  B: LOCK TABLE t1 IN SHARE ROW EXCLUSIVE MODE;

Transaction "A" aborts on master node, but it completes on another
nodes. It causes wrong failover.

So pgpool checks deadlock error(code == '40P01') and sends error query
to another nodes.

pool.h
pool_process_query.c
pool_stream.c

diff --git a/pool.h b/pool.h
index e83b09fed71ae4ffbd4c203fea5bb13a8fc85b17..1a0d60b2baea3f7b76b474bef0b663d294707401 100644 (file)
--- a/pool.h
+++ b/pool.h
@@ -301,6 +301,7 @@ extern int pool_flush(POOL_CONNECTION *cp);
 extern int pool_flush_it(POOL_CONNECTION *cp);
 extern int pool_write_and_flush(POOL_CONNECTION *cp, void *buf, int len);
 extern char *pool_read_string(POOL_CONNECTION *cp, int *len, int line);
+extern int pool_unread(POOL_CONNECTION *cp, void *data, int len);
 
 extern int pool_do_auth(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend);
 extern int pool_do_reauth(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *cp);
index a8f4c8d6948ae4b0a135cbe84a699f9c84ab9c7d..e7b11fc91a7593a4e4a9416879906937f313b39b 100644 (file)
@@ -41,6 +41,9 @@
 
 #define INIT_STATEMENT_LIST_SIZE 8
 
+#define DEADLOCK_ERROR_CODE "40P01"
+#define POOL_ERROR_QUERY "send invalid query from pgpool to abort transaction"
+
 typedef struct {
        char *statement_name;
        char *portal_name;
@@ -155,6 +158,7 @@ static PreparedStatement *unnamed_statement = NULL;
 static PreparedStatement *unnamed_portal = NULL;
 static int is_drop_database(char *query);              /* returns non 0 if this is a DROP DATABASE command */
 static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend);               /* show ps status */
+static int detect_deadlock_error(POOL_CONNECTION *master, int major);
 
 POOL_STATUS pool_process_query(POOL_CONNECTION *frontend, 
                                                           POOL_CONNECTION_POOL *backend,
@@ -519,6 +523,7 @@ static POOL_STATUS Query(POOL_CONNECTION *frontend,
        int len;
        static char *sq = "show pool_status";
        POOL_STATUS status;
+       int deadlock_detected = 0;
 
        if (query == NULL)      /* need to read query from frontend? */
        {
@@ -747,6 +752,21 @@ static POOL_STATUS Query(POOL_CONNECTION *frontend,
                        pool_debug("waiting for master completing the query");
                        if (synchronize(MASTER(backend)))
                                return POOL_END;
+
+                       /*
+                        * We must check deadlock error because a aborted transaction
+                        * by detecting deadlock isn't same on all nodes.
+                        * If a transaction is aborted on master node, pgpool send a
+                        * error query to another nodes.
+                        */
+                       deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
+                       if (deadlock_detected < 0)
+                               return POOL_END;
+                       else if (deadlock_detected)
+                       {
+                               string = POOL_ERROR_QUERY;
+                               len = strlen(string) + 1;
+                       }
                }
 
 #define SEQUENCE_DEBUG
@@ -792,6 +812,7 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend,
        char kind;
        int status;
        PreparedStatement *stmt;
+       int deadlock_detected = 0;
 
        /* read Execute packet */
        if (pool_read(frontend, &len, sizeof(len)) < 0)
@@ -834,18 +855,29 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend,
        {
                POOL_CONNECTION *cp = backend->slots[i]->con;
 
-               /* forward the query to the backend */
-               pool_write(cp, "E", 1);
-               sendlen = htonl(len + 4);
-               pool_write(cp, &sendlen, sizeof(sendlen));
-               pool_write(cp, string, len);
+               if (deadlock_detected)
+               {
+                       pool_write(cp, "Q", 1);
+                       len = strlen(POOL_ERROR_QUERY) + 1;
+                       sendlen = htonl(len + 4);
+                       pool_write(cp, &sendlen, sizeof(sendlen));
+                       pool_write(cp, POOL_ERROR_QUERY, len);
+               }
+               else
+               {
+                       /* forward the query to the backend */
+                       pool_write(cp, "E", 1);
+                       sendlen = htonl(len + 4);
+                       pool_write(cp, &sendlen, sizeof(sendlen));
+                       pool_write(cp, string, len);
 
-               /*
-                * send "Flush" message so that backend notices us
-                * the completion of the command
-                */
-               pool_write(cp, "H", 1);
-               sendlen = htonl(4);
+                       /*
+                        * send "Flush" message so that backend notices us
+                        * the completion of the command
+                        */
+                       pool_write(cp, "H", 1);
+                       sendlen = htonl(4);
+               }
                if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
                {
                        return POOL_ERROR;
@@ -858,6 +890,16 @@ static POOL_STATUS Execute(POOL_CONNECTION *frontend,
                        pool_debug("waiting for backend completing the query");
                        if (synchronize(cp))
                                return POOL_END;
+
+                       /*
+                        * We must check deadlock error because a aborted transaction
+                        * by detecting deadlock isn't same on all nodes.
+                        * If a transaction is aborted on master node, pgpool send a
+                        * error query to another nodes.
+                        */
+                       deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
+                       if (deadlock_detected < 0)
+                               return POOL_END;
                }
        }
 
@@ -3821,3 +3863,91 @@ static POOL_STATUS error_kind_mismatch(POOL_CONNECTION *frontend, POOL_CONNECTIO
        else
                return POOL_ERROR;
 }
+
+static int detect_deadlock_error(POOL_CONNECTION *master, int major)
+{
+       int deadlock = 0;
+       char kind;
+       int readlen = 0, len;
+       char *buf;
+       char *p, *str;
+
+       if ((buf = malloc(1024)) == NULL)
+       {
+               pool_error("detect_deadlock_error: malloc failed");
+               return -1;
+       }
+
+       if (pool_read(master, &kind, sizeof(kind)))
+               return POOL_END;
+       readlen += sizeof(kind);
+       p = buf;
+       memcpy(p, &kind, sizeof(kind));
+       p += sizeof(kind);
+
+       if (kind == 'E') /* deadlock error? */
+       {
+               /* read actual query */
+               if (major == PROTO_MAJOR_V3)
+               {
+                       char *error_code;
+                       
+                       if (pool_read(master, &len, sizeof(len)) < 0)
+                               return POOL_END;
+                       readlen += sizeof(len);
+                       memcpy(p, &len, sizeof(len));
+                       p += sizeof(len);
+                       
+                       len = ntohl(len) - 4;
+                       str = malloc(len);
+                       pool_read(master, str, len);
+                       readlen += len;
+                       if (readlen > 1024)
+                       {
+                               buf = realloc(buf, readlen);
+                               if (buf == NULL)
+                               {
+                                       pool_error("detect_deadlock_error: malloc failed");
+                                       return -1;
+                               }
+                       }
+                       memcpy(p, str, len);
+
+                       error_code = str;
+                       while (*error_code)
+                       {
+                               if (*error_code == 'C')
+                               {
+                                       if (strcmp(error_code+1, DEADLOCK_ERROR_CODE) == 0) /* deadlock error */
+                                       {
+                                               pool_debug("SimpleQuery: receive deadlock error from master node.");
+                                               deadlock = 1;
+                                       }
+                                       break;
+                               }
+                               else
+                                       error_code = error_code + strlen(error_code) + 1;
+                       }
+                       free(str);
+               }
+               else
+               {
+                       str = pool_read_string(master, &len, 0);
+                       readlen += len;
+                       if (readlen > 1024)
+                       {
+                               buf = realloc(buf, readlen);
+                               if (buf == NULL)
+                               {
+                                       pool_error("detect_deadlock_error: malloc failed");
+                                       return -1;
+                               }
+                       }
+                       memcpy(p, str, len);
+               }
+       }
+       if (pool_unread(master, buf, readlen) != 0)
+               deadlock = -1;
+       free(buf);
+       return deadlock;
+}
index 6634091793f5db6831957da04933f891bffbe2dd..04f3d1185c107724a55650195e2db35ef21d8f7e 100644 (file)
@@ -764,3 +764,31 @@ static int consume_pending_data(POOL_CONNECTION *cp, void *data, int len)
 
        return consume_size;
 }
+
+/*
+ * pool_unread: Put back data to input buffer
+ */
+int pool_unread(POOL_CONNECTION *cp, void *data, int len)
+{
+       void *p = cp->hp;
+       int n = cp->len + len;
+       int realloc_size;
+       
+       if (cp->bufsz < n)
+       {
+               realloc_size = (n/READBUFSZ+1)*READBUFSZ;
+               p = realloc(cp->hp, realloc_size);
+               if (p == NULL)
+               {
+                       pool_error("pool_unread: realloc failed");
+                       return -1;
+               }
+               cp->hp = p;
+       }
+       if (cp->len != 0)
+               memmove(p + len, cp->hp + cp->po, cp->len);
+       memmove(p, data, len);
+       cp->len = n;
+       cp->po = 0;
+       return 0;
+}