Optimize WindowAgg's use of tuplestores
authorDavid Rowley <[email protected]>
Thu, 5 Sep 2024 04:18:30 +0000 (16:18 +1200)
committerDavid Rowley <[email protected]>
Thu, 5 Sep 2024 04:18:30 +0000 (16:18 +1200)
When WindowAgg finished one partition of a PARTITION BY, it previously
would call tuplestore_end() to purge all the stored tuples before again
calling tuplestore_begin_heap() and carefully setting up all of the
tuplestore read pointers exactly as required for the given frameOptions.
Since the frameOptions don't change between partitions, this part does
not make much sense.  For queries that had very few rows per partition,
the overhead of this was very large.

It seems much better to create the tuplestore and the read pointers once
and simply call tuplestore_clear() at the end of each partition.
tuplestore_clear() moves all of the read pointers back to the start
position and deletes all the previously stored tuples.

A simple test query with 1 million partitions and 1 tuple per partition
has been shown to run around 40% faster than without this change.  The
additional effort seems to have mostly been spent in malloc/free.

Making this work required adding a new bool field to WindowAggState
which had the unfortunate effect of being the 9th bool field in a group
resulting in the struct being enlarged.  Here we shuffle the fields
around a little so that the two bool fields for runcondition relating
stuff fit into existing padding.  Also, move the "runcondition" field to
be near those.  This frees up enough space with the other bool fields so
that the newly added one fits into the padding bytes.  This was done to
address a very small but apparent performance regression with queries
containing a large number of rows per partition.

Reviewed-by: Ashutosh Bapat <[email protected]>
Reviewed-by: Tatsuo Ishii <[email protected]>
Discussion: https://postgr.es/m/CAHoyFK9n-QCXKTUWT_xxtXninSMEv%2BgbJN66-y6prM3f4WkEHw%40mail.gmail.com

src/backend/executor/nodeWindowAgg.c
src/include/nodes/execnodes.h

index 88a85f556b62ffe9fa6a1aa963daf4dd9a8e86e8..51a6708a3923f7f43cc4095ca899ce6c7802cb0d 100644 (file)
@@ -1074,57 +1074,24 @@ eval_windowfunction(WindowAggState *winstate, WindowStatePerFunc perfuncstate,
 }
 
 /*
- * begin_partition
- * Start buffering rows of the next partition.
+ * prepare_tuplestore
+ *     Prepare the tuplestore and all of the required read pointers for the
+ *     WindowAggState's frameOptions.
+ *
+ * Note: We use pg_noinline to avoid bloating the calling function with code
+ * which is only called once.
  */
-static void
-begin_partition(WindowAggState *winstate)
+static pg_noinline void
+prepare_tuplestore(WindowAggState *winstate)
 {
    WindowAgg  *node = (WindowAgg *) winstate->ss.ps.plan;
-   PlanState  *outerPlan = outerPlanState(winstate);
    int         frameOptions = winstate->frameOptions;
    int         numfuncs = winstate->numfuncs;
-   int         i;
-
-   winstate->partition_spooled = false;
-   winstate->framehead_valid = false;
-   winstate->frametail_valid = false;
-   winstate->grouptail_valid = false;
-   winstate->spooled_rows = 0;
-   winstate->currentpos = 0;
-   winstate->frameheadpos = 0;
-   winstate->frametailpos = 0;
-   winstate->currentgroup = 0;
-   winstate->frameheadgroup = 0;
-   winstate->frametailgroup = 0;
-   winstate->groupheadpos = 0;
-   winstate->grouptailpos = -1;    /* see update_grouptailpos */
-   ExecClearTuple(winstate->agg_row_slot);
-   if (winstate->framehead_slot)
-       ExecClearTuple(winstate->framehead_slot);
-   if (winstate->frametail_slot)
-       ExecClearTuple(winstate->frametail_slot);
-
-   /*
-    * If this is the very first partition, we need to fetch the first input
-    * row to store in first_part_slot.
-    */
-   if (TupIsNull(winstate->first_part_slot))
-   {
-       TupleTableSlot *outerslot = ExecProcNode(outerPlan);
 
-       if (!TupIsNull(outerslot))
-           ExecCopySlot(winstate->first_part_slot, outerslot);
-       else
-       {
-           /* outer plan is empty, so we have nothing to do */
-           winstate->partition_spooled = true;
-           winstate->more_partitions = false;
-           return;
-       }
-   }
+   /* we shouldn't be called if this was done already */
+   Assert(winstate->buffer == NULL);
 
-   /* Create new tuplestore for this partition */
+   /* Create new tuplestore */
    winstate->buffer = tuplestore_begin_heap(false, false, work_mem);
 
    /*
@@ -1158,16 +1125,10 @@ begin_partition(WindowAggState *winstate)
 
        agg_winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
                                                            readptr_flags);
-       agg_winobj->markpos = -1;
-       agg_winobj->seekpos = -1;
-
-       /* Also reset the row counters for aggregates */
-       winstate->aggregatedbase = 0;
-       winstate->aggregatedupto = 0;
    }
 
    /* create mark and read pointers for each real window function */
-   for (i = 0; i < numfuncs; i++)
+   for (int i = 0; i < numfuncs; i++)
    {
        WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
 
@@ -1179,8 +1140,6 @@ begin_partition(WindowAggState *winstate)
                                                            0);
            winobj->readptr = tuplestore_alloc_read_pointer(winstate->buffer,
                                                            EXEC_FLAG_BACKWARD);
-           winobj->markpos = -1;
-           winobj->seekpos = -1;
        }
    }
 
@@ -1224,6 +1183,88 @@ begin_partition(WindowAggState *winstate)
        winstate->grouptail_ptr =
            tuplestore_alloc_read_pointer(winstate->buffer, 0);
    }
+}
+
+/*
+ * begin_partition
+ * Start buffering rows of the next partition.
+ */
+static void
+begin_partition(WindowAggState *winstate)
+{
+   PlanState  *outerPlan = outerPlanState(winstate);
+   int         numfuncs = winstate->numfuncs;
+
+   winstate->partition_spooled = false;
+   winstate->framehead_valid = false;
+   winstate->frametail_valid = false;
+   winstate->grouptail_valid = false;
+   winstate->spooled_rows = 0;
+   winstate->currentpos = 0;
+   winstate->frameheadpos = 0;
+   winstate->frametailpos = 0;
+   winstate->currentgroup = 0;
+   winstate->frameheadgroup = 0;
+   winstate->frametailgroup = 0;
+   winstate->groupheadpos = 0;
+   winstate->grouptailpos = -1;    /* see update_grouptailpos */
+   ExecClearTuple(winstate->agg_row_slot);
+   if (winstate->framehead_slot)
+       ExecClearTuple(winstate->framehead_slot);
+   if (winstate->frametail_slot)
+       ExecClearTuple(winstate->frametail_slot);
+
+   /*
+    * If this is the very first partition, we need to fetch the first input
+    * row to store in first_part_slot.
+    */
+   if (TupIsNull(winstate->first_part_slot))
+   {
+       TupleTableSlot *outerslot = ExecProcNode(outerPlan);
+
+       if (!TupIsNull(outerslot))
+           ExecCopySlot(winstate->first_part_slot, outerslot);
+       else
+       {
+           /* outer plan is empty, so we have nothing to do */
+           winstate->partition_spooled = true;
+           winstate->more_partitions = false;
+           return;
+       }
+   }
+
+   /* Create new tuplestore if not done already. */
+   if (unlikely(winstate->buffer == NULL))
+       prepare_tuplestore(winstate);
+
+   winstate->next_partition = false;
+
+   if (winstate->numaggs > 0)
+   {
+       WindowObject agg_winobj = winstate->agg_winobj;
+
+       /* reset mark and see positions for aggregate functions */
+       agg_winobj->markpos = -1;
+       agg_winobj->seekpos = -1;
+
+       /* Also reset the row counters for aggregates */
+       winstate->aggregatedbase = 0;
+       winstate->aggregatedupto = 0;
+   }
+
+   /* reset mark and seek positions for each real window function */
+   for (int i = 0; i < numfuncs; i++)
+   {
+       WindowStatePerFunc perfuncstate = &(winstate->perfunc[i]);
+
+       if (!perfuncstate->plain_agg)
+       {
+           WindowObject winobj = perfuncstate->winobj;
+
+           winobj->markpos = -1;
+           winobj->seekpos = -1;
+       }
+   }
 
    /*
     * Store the first tuple into the tuplestore (it's always available now;
@@ -1360,9 +1401,9 @@ release_partition(WindowAggState *winstate)
    }
 
    if (winstate->buffer)
-       tuplestore_end(winstate->buffer);
-   winstate->buffer = NULL;
+       tuplestore_clear(winstate->buffer);
    winstate->partition_spooled = false;
+   winstate->next_partition = true;
 }
 
 /*
@@ -2143,7 +2184,7 @@ ExecWindowAgg(PlanState *pstate)
    /* We need to loop as the runCondition or qual may filter out tuples */
    for (;;)
    {
-       if (winstate->buffer == NULL)
+       if (winstate->next_partition)
        {
            /* Initialize for first partition and set current row = 0 */
            begin_partition(winstate);
@@ -2686,6 +2727,7 @@ ExecInitWindowAgg(WindowAgg *node, EState *estate, int eflags)
    winstate->all_first = true;
    winstate->partition_spooled = false;
    winstate->more_partitions = false;
+   winstate->next_partition = true;
 
    return winstate;
 }
@@ -2700,6 +2742,14 @@ ExecEndWindowAgg(WindowAggState *node)
    PlanState  *outerPlan;
    int         i;
 
+   if (node->buffer != NULL)
+   {
+       tuplestore_end(node->buffer);
+
+       /* nullify so that release_partition skips the tuplestore_clear() */
+       node->buffer = NULL;
+   }
+
    release_partition(node);
 
    for (i = 0; i < node->numaggs; i++)
index af7d8fd1e72a14eeb20e521f4aae476c470faf33..627f99c13d1c5d25ac24b6d89f48a799001ac190 100644 (file)
@@ -2619,6 +2619,17 @@ typedef struct WindowAggState
    bool        inRangeAsc;     /* use ASC sort order for in_range tests? */
    bool        inRangeNullsFirst;  /* nulls sort first for in_range tests? */
 
+   /* fields relating to runconditions */
+   bool        use_pass_through;   /* When false, stop execution when
+                                    * runcondition is no longer true.  Else
+                                    * just stop evaluating window funcs. */
+   bool        top_window;     /* true if this is the top-most WindowAgg or
+                                * the only WindowAgg in this query level */
+   ExprState  *runcondition;   /* Condition which must remain true otherwise
+                                * execution of the WindowAgg will finish or
+                                * go into pass-through mode.  NULL when there
+                                * is no such condition. */
+
    /* these fields are used in GROUPS mode: */
    int64       currentgroup;   /* peer group # of current row in partition */
    int64       frameheadgroup; /* peer group # of frame head row */
@@ -2631,19 +2642,10 @@ typedef struct WindowAggState
    MemoryContext curaggcontext;    /* current aggregate's working data */
    ExprContext *tmpcontext;    /* short-term evaluation context */
 
-   ExprState  *runcondition;   /* Condition which must remain true otherwise
-                                * execution of the WindowAgg will finish or
-                                * go into pass-through mode.  NULL when there
-                                * is no such condition. */
-
-   bool        use_pass_through;   /* When false, stop execution when
-                                    * runcondition is no longer true.  Else
-                                    * just stop evaluating window funcs. */
-   bool        top_window;     /* true if this is the top-most WindowAgg or
-                                * the only WindowAgg in this query level */
    bool        all_first;      /* true if the scan is starting */
    bool        partition_spooled;  /* true if all tuples in current partition
                                     * have been spooled into tuplestore */
+   bool        next_partition; /* true if begin_partition needs to be called */
    bool        more_partitions;    /* true if there's more partitions after
                                     * this one */
    bool        framehead_valid;    /* true if frameheadpos is known up to