#define INIT_STATEMENT_LIST_SIZE 8
+#define DEADLOCK_ERROR_CODE "40P01"
+#define POOL_ERROR_QUERY "send invalid query from pgpool to abort transaction"
+
typedef struct {
char *statement_name;
char *portal_name;
static PreparedStatement *unnamed_portal = NULL;
static int is_drop_database(char *query); /* returns non 0 if this is a DROP DATABASE command */
static void query_ps_status(char *query, POOL_CONNECTION_POOL *backend); /* show ps status */
+static int detect_deadlock_error(POOL_CONNECTION *master, int major);
POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,
POOL_CONNECTION_POOL *backend,
int len;
static char *sq = "show pool_status";
POOL_STATUS status;
+ int deadlock_detected = 0;
if (query == NULL) /* need to read query from frontend? */
{
pool_debug("waiting for master completing the query");
if (synchronize(MASTER(backend)))
return POOL_END;
+
+ /*
+ * We must check deadlock error because a aborted transaction
+ * by detecting deadlock isn't same on all nodes.
+ * If a transaction is aborted on master node, pgpool send a
+ * error query to another nodes.
+ */
+ deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
+ if (deadlock_detected < 0)
+ return POOL_END;
+ else if (deadlock_detected)
+ {
+ string = POOL_ERROR_QUERY;
+ len = strlen(string) + 1;
+ }
}
#define SEQUENCE_DEBUG
char kind;
int status;
PreparedStatement *stmt;
+ int deadlock_detected = 0;
/* read Execute packet */
if (pool_read(frontend, &len, sizeof(len)) < 0)
{
POOL_CONNECTION *cp = backend->slots[i]->con;
- /* forward the query to the backend */
- pool_write(cp, "E", 1);
- sendlen = htonl(len + 4);
- pool_write(cp, &sendlen, sizeof(sendlen));
- pool_write(cp, string, len);
+ if (deadlock_detected)
+ {
+ pool_write(cp, "Q", 1);
+ len = strlen(POOL_ERROR_QUERY) + 1;
+ sendlen = htonl(len + 4);
+ pool_write(cp, &sendlen, sizeof(sendlen));
+ pool_write(cp, POOL_ERROR_QUERY, len);
+ }
+ else
+ {
+ /* forward the query to the backend */
+ pool_write(cp, "E", 1);
+ sendlen = htonl(len + 4);
+ pool_write(cp, &sendlen, sizeof(sendlen));
+ pool_write(cp, string, len);
- /*
- * send "Flush" message so that backend notices us
- * the completion of the command
- */
- pool_write(cp, "H", 1);
- sendlen = htonl(4);
+ /*
+ * send "Flush" message so that backend notices us
+ * the completion of the command
+ */
+ pool_write(cp, "H", 1);
+ sendlen = htonl(4);
+ }
if (pool_write_and_flush(cp, &sendlen, sizeof(sendlen)) < 0)
{
return POOL_ERROR;
pool_debug("waiting for backend completing the query");
if (synchronize(cp))
return POOL_END;
+
+ /*
+ * We must check deadlock error because a aborted transaction
+ * by detecting deadlock isn't same on all nodes.
+ * If a transaction is aborted on master node, pgpool send a
+ * error query to another nodes.
+ */
+ deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
+ if (deadlock_detected < 0)
+ return POOL_END;
}
}
else
return POOL_ERROR;
}
+
+static int detect_deadlock_error(POOL_CONNECTION *master, int major)
+{
+ int deadlock = 0;
+ char kind;
+ int readlen = 0, len;
+ char *buf;
+ char *p, *str;
+
+ if ((buf = malloc(1024)) == NULL)
+ {
+ pool_error("detect_deadlock_error: malloc failed");
+ return -1;
+ }
+
+ if (pool_read(master, &kind, sizeof(kind)))
+ return POOL_END;
+ readlen += sizeof(kind);
+ p = buf;
+ memcpy(p, &kind, sizeof(kind));
+ p += sizeof(kind);
+
+ if (kind == 'E') /* deadlock error? */
+ {
+ /* read actual query */
+ if (major == PROTO_MAJOR_V3)
+ {
+ char *error_code;
+
+ if (pool_read(master, &len, sizeof(len)) < 0)
+ return POOL_END;
+ readlen += sizeof(len);
+ memcpy(p, &len, sizeof(len));
+ p += sizeof(len);
+
+ len = ntohl(len) - 4;
+ str = malloc(len);
+ pool_read(master, str, len);
+ readlen += len;
+ if (readlen > 1024)
+ {
+ buf = realloc(buf, readlen);
+ if (buf == NULL)
+ {
+ pool_error("detect_deadlock_error: malloc failed");
+ return -1;
+ }
+ }
+ memcpy(p, str, len);
+
+ error_code = str;
+ while (*error_code)
+ {
+ if (*error_code == 'C')
+ {
+ if (strcmp(error_code+1, DEADLOCK_ERROR_CODE) == 0) /* deadlock error */
+ {
+ pool_debug("SimpleQuery: receive deadlock error from master node.");
+ deadlock = 1;
+ }
+ break;
+ }
+ else
+ error_code = error_code + strlen(error_code) + 1;
+ }
+ free(str);
+ }
+ else
+ {
+ str = pool_read_string(master, &len, 0);
+ readlen += len;
+ if (readlen > 1024)
+ {
+ buf = realloc(buf, readlen);
+ if (buf == NULL)
+ {
+ pool_error("detect_deadlock_error: malloc failed");
+ return -1;
+ }
+ }
+ memcpy(p, str, len);
+ }
+ }
+ if (pool_unread(master, buf, readlen) != 0)
+ deadlock = -1;
+ free(buf);
+ return deadlock;
+}