/* Time to sleep between reconnection attempts */
#define RECONNECT_SLEEP_TIME 5
+typedef enum
+{
+ STREAM_STOP_NONE,
+ STREAM_STOP_END_OF_WAL,
+ STREAM_STOP_KEEPALIVE,
+ STREAM_STOP_SIGNAL
+} StreamStopReason;
+
/* Global Options */
static char *outfile = NULL;
static int verbose = 0;
/* Global State */
static int outfd = -1;
static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
static volatile sig_atomic_t output_reopen = false;
static bool output_isfile;
static TimestampTz output_last_fsync = -1;
static void StreamLogicalLog(void);
static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
- bool keepalive, XLogRecPtr lsn);
+ StreamStopReason reason,
+ XLogRecPtr lsn);
static void
usage(void)
TimestampTz last_status = -1;
int i;
PQExpBuffer query;
+ XLogRecPtr cur_record_lsn;
output_written_lsn = InvalidXLogRecPtr;
output_fsync_lsn = InvalidXLogRecPtr;
+ cur_record_lsn = InvalidXLogRecPtr;
/*
* Connect in replication mode to the server
int bytes_written;
TimestampTz now;
int hdr_len;
- XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
+
+ cur_record_lsn = InvalidXLogRecPtr;
if (copybuf != NULL)
{
if (endposReached)
{
- prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+ stop_reason = STREAM_STOP_KEEPALIVE;
time_to_abort = true;
break;
}
*/
if (!flushAndSendFeedback(conn, &now))
goto error;
- prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ stop_reason = STREAM_STOP_END_OF_WAL;
time_to_abort = true;
break;
}
/* endpos was exactly the record we just processed, we're done */
if (!flushAndSendFeedback(conn, &now))
goto error;
- prepareToTerminate(conn, endpos, false, cur_record_lsn);
+ stop_reason = STREAM_STOP_END_OF_WAL;
time_to_abort = true;
break;
}
}
+ /* Clean up connection state if stream has been aborted */
+ if (time_to_abort)
+ prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
+
res = PQgetResult(conn);
if (PQresultStatus(res) == PGRES_COPY_OUT)
{
static void
sigexit_handler(SIGNAL_ARGS)
{
+ stop_reason = STREAM_STOP_SIGNAL;
time_to_abort = true;
}
* retry on failure.
*/
static void
-prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
+ XLogRecPtr lsn)
{
(void) PQputCopyEnd(conn, NULL);
(void) PQflush(conn);
if (verbose)
{
- if (keepalive)
- pg_log_info("end position %X/%X reached by keepalive",
- LSN_FORMAT_ARGS(endpos));
- else
- pg_log_info("end position %X/%X reached by WAL record at %X/%X",
- LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+ switch (reason)
+ {
+ case STREAM_STOP_SIGNAL:
+ pg_log_info("received interrupt signal, exiting");
+ break;
+ case STREAM_STOP_KEEPALIVE:
+ pg_log_info("end position %X/%X reached by keepalive",
+ LSN_FORMAT_ARGS(endpos));
+ break;
+ case STREAM_STOP_END_OF_WAL:
+ Assert(!XLogRecPtrIsInvalid(lsn));
+ pg_log_info("end position %X/%X reached by WAL record at %X/%X",
+ LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+ break;
+ case STREAM_STOP_NONE:
+ Assert(false);
+ break;
+ }
}
}