Make use of initReadOnlyStringInfo() in more places
authorDavid Rowley <[email protected]>
Mon, 6 Nov 2023 22:16:43 +0000 (11:16 +1300)
committerDavid Rowley <[email protected]>
Mon, 6 Nov 2023 22:16:43 +0000 (11:16 +1300)
f0efa5aec introduced the concept of "read-only" StringInfos which makes
use of an existing, possibly not NUL terminated, buffer.

Here we adjust two places that make use of StringInfos to receive data
to avoid using appendBinaryStringInfo() in cases where a NUL termination
character is not required.  This saves a possible palloc() and saves
having to needlessly memcpy() from one buffer to another.

Here we adjust two places which were using appendBinaryStringInfo().
Neither of these cases seem particularly performance-critical.  In the
case of XLogWalRcvProcessMsg(), the appendBinaryStringInfo() was only
appending 24 bytes.  The change made here does mean that we can get rid
of the incoming_message global variable and make that local instead.

The apply_spooled_messages() case applies in logical decoding when
applying (possibly large) changes which have been serialized to a file.

Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/CAApHDvoxYUDHwqPf-ShvchsERf1RzmkGoLwg63JNvHCkDCuyKQ@mail.gmail.com

src/backend/replication/logical/worker.c
src/backend/replication/walreceiver.c

index ba67eb156f9255299b16054de9cb7d311856f810..52a9f136ab90456f874036aa1b50ccc66755d072 100644 (file)
@@ -2019,7 +2019,6 @@ void
 apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
                       XLogRecPtr lsn)
 {
-   StringInfoData s2;
    int         nchanges;
    char        path[MAXPGPATH];
    char       *buffer = NULL;
@@ -2057,7 +2056,6 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
    CurrentResourceOwner = oldowner;
 
    buffer = palloc(BLCKSZ);
-   initStringInfo(&s2);
 
    MemoryContextSwitchTo(oldcxt);
 
@@ -2079,6 +2077,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
    nchanges = 0;
    while (true)
    {
+       StringInfoData s2;
        size_t      nbytes;
        int         len;
 
@@ -2104,9 +2103,8 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
 
        BufFileTell(stream_fd, &fileno, &offset);
 
-       /* copy the buffer to the stringinfo and call apply_dis */
-       resetStringInfo(&s2);
-       appendBinaryStringInfo(&s2, buffer, len);
+       /* init a stringinfo using the buffer and call apply_dis */
+       initReadOnlyStringInfo(&s2, buffer, len);
 
        /* Ensure we are reading the data into our memory context. */
        oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
index a3128874b2ec39751bd311b853063257ee0631b5..2398167f495aa06e8712ce8ea606874fd4c9ec15 100644 (file)
@@ -132,7 +132,6 @@ typedef enum WalRcvWakeupReason
 static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
 
 static StringInfoData reply_message;
-static StringInfoData incoming_message;
 
 /* s for private functions */
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
@@ -425,7 +424,6 @@ WalReceiverMain(void)
            /* Initialize LogstreamResult and buffers for processing messages */
            LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
            initStringInfo(&reply_message);
-           initStringInfo(&incoming_message);
 
            /* Initialize nap wakeup times. */
            now = GetCurrentTimestamp();
@@ -843,19 +841,20 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
    TimestampTz sendTime;
    bool        replyRequested;
 
-   resetStringInfo(&incoming_message);
-
    switch (type)
    {
        case 'w':               /* WAL records */
            {
-               /* copy message to StringInfo */
+               StringInfoData incoming_message;
+
                hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
                if (len < hdrlen)
                    ereport(ERROR,
                            (errcode(ERRCODE_PROTOCOL_VIOLATION),
                             errmsg_internal("invalid WAL message received from primary")));
-               appendBinaryStringInfo(&incoming_message, buf, hdrlen);
+
+               /* initialize a StringInfo with the given buffer */
+               initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
 
                /* read the fields */
                dataStart = pq_getmsgint64(&incoming_message);
@@ -870,13 +869,16 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
            }
        case 'k':               /* Keepalive */
            {
-               /* copy message to StringInfo */
+               StringInfoData incoming_message;
+
                hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
                if (len != hdrlen)
                    ereport(ERROR,
                            (errcode(ERRCODE_PROTOCOL_VIOLATION),
                             errmsg_internal("invalid keepalive message received from primary")));
-               appendBinaryStringInfo(&incoming_message, buf, hdrlen);
+
+               /* initialize a StringInfo with the given buffer */
+               initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
 
                /* read the fields */
                walEnd = pq_getmsgint64(&incoming_message);