Refactor nbtree insertion scankeys.
authorPeter Geoghegan <[email protected]>
Wed, 20 Mar 2019 16:30:57 +0000 (09:30 -0700)
committerPeter Geoghegan <[email protected]>
Wed, 20 Mar 2019 16:30:57 +0000 (09:30 -0700)
Use dedicated struct to represent nbtree insertion scan keys.  Having a
dedicated struct makes the difference between search type scankeys and
insertion scankeys a lot clearer, and simplifies the signature of
several related functions.  This is based on a suggestion by Andrey
Lepikhov.

Streamline how unique index insertions cache binary search progress.
Cache the state of in-progress binary searches within _bt_check_unique()
for later instead of having callers avoid repeating the binary search in
an ad-hoc manner.  This makes it easy to add a new optimization:
_bt_check_unique() now falls out of its loop immediately in the common
case where it's already clear that there couldn't possibly be a
duplicate.

The new _bt_check_unique() scheme makes it a lot easier to manage cached
binary search effort afterwards, from within _bt_findinsertloc().  This
is needed for the upcoming  to make nbtree tuples unique by
treating heap TID as a final tiebreaker column.  Unique key binary
searches need to restore lower and upper bounds.  They cannot simply
continue to use the >= lower bound as the offset to insert at, because
the heap TID tiebreaker column must be used in comparisons for the
restored binary search (unlike the original _bt_check_unique() binary
search, where scankey's heap TID column must be omitted).

Author: Peter Geoghegan, Heikki Linnakangas
Reviewed-By: Heikki Linnakangas, Andrey Lepikhov
Discussion: https://postgr.es/m/CAH2-WzmE6AhUdk9NdWBf4K3HjWXZBX3+umC7mH7+WDrKcRtsOw@mail.gmail.com

contrib/amcheck/verify_nbtree.c
src/backend/access/nbtree/README
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/nbtree/nbtsearch.c
src/backend/access/nbtree/nbtsort.c
src/backend/access/nbtree/nbtutils.c
src/backend/utils/sort/tuplesort.c
src/include/access/nbtree.h

index bb6442de82dd37c21d79c535557cc3766c7068d6..5426bfd8d870e871ef2bdf48bf180543746429d5 100644 (file)
@@ -127,9 +127,9 @@ static void bt_check_every_level(Relation rel, Relation heaprel,
 static BtreeLevel bt_check_level_from_leftmost(BtreeCheckState *state,
                             BtreeLevel level);
 static void bt_target_page_check(BtreeCheckState *state);
-static ScanKey bt_right_page_check_scankey(BtreeCheckState *state);
-static void bt_downlink_check(BtreeCheckState *state, BlockNumber childblock,
-                 ScanKey targetkey);
+static BTScanInsert bt_right_page_check_scankey(BtreeCheckState *state);
+static void bt_downlink_check(BtreeCheckState *state, BTScanInsert targetkey,
+                 BlockNumber childblock);
 static void bt_downlink_missing_check(BtreeCheckState *state);
 static void bt_tuple_present_callback(Relation index, HeapTuple htup,
                          Datum *values, bool *isnull,
@@ -139,14 +139,14 @@ static IndexTuple bt_normalize_tuple(BtreeCheckState *state,
 static inline bool offset_is_negative_infinity(BTPageOpaque opaque,
                            OffsetNumber offset);
 static inline bool invariant_leq_offset(BtreeCheckState *state,
-                    ScanKey key,
+                    BTScanInsert key,
                     OffsetNumber upperbound);
 static inline bool invariant_geq_offset(BtreeCheckState *state,
-                    ScanKey key,
+                    BTScanInsert key,
                     OffsetNumber lowerbound);
 static inline bool invariant_leq_nontarget_offset(BtreeCheckState *state,
-                              Page other,
-                              ScanKey key,
+                              BTScanInsert key,
+                              Page nontarget,
                               OffsetNumber upperbound);
 static Page palloc_btree_page(BtreeCheckState *state, BlockNumber blocknum);
 
@@ -838,8 +838,8 @@ bt_target_page_check(BtreeCheckState *state)
    {
        ItemId      itemid;
        IndexTuple  itup;
-       ScanKey     skey;
        size_t      tupsize;
+       BTScanInsert skey;
 
        CHECK_FOR_INTERRUPTS();
 
@@ -1030,7 +1030,7 @@ bt_target_page_check(BtreeCheckState *state)
         */
        else if (offset == max)
        {
-           ScanKey     rightkey;
+           BTScanInsert    rightkey;
 
            /* Get item in next/right page */
            rightkey = bt_right_page_check_scankey(state);
@@ -1082,7 +1082,7 @@ bt_target_page_check(BtreeCheckState *state)
        {
            BlockNumber childblock = BTreeInnerTupleGetDownLink(itup);
 
-           bt_downlink_check(state, childblock, skey);
+           bt_downlink_check(state, skey, childblock);
        }
    }
 
@@ -1111,11 +1111,12 @@ bt_target_page_check(BtreeCheckState *state)
  * Note that !readonly callers must reverify that target page has not
  * been concurrently deleted.
  */
-static ScanKey
+static BTScanInsert
 bt_right_page_check_scankey(BtreeCheckState *state)
 {
    BTPageOpaque opaque;
    ItemId      rightitem;
+   IndexTuple  firstitup;
    BlockNumber targetnext;
    Page        rightpage;
    OffsetNumber nline;
@@ -1303,8 +1304,8 @@ bt_right_page_check_scankey(BtreeCheckState *state)
     * Return first real item scankey.  Note that this relies on right page
     * memory remaining allocated.
     */
-   return _bt_mkscankey(state->rel,
-                        (IndexTuple) PageGetItem(rightpage, rightitem));
+   firstitup = (IndexTuple) PageGetItem(rightpage, rightitem);
+   return _bt_mkscankey(state->rel, firstitup);
 }
 
 /*
@@ -1317,8 +1318,8 @@ bt_right_page_check_scankey(BtreeCheckState *state)
  * verification this way around is much more practical.
  */
 static void
-bt_downlink_check(BtreeCheckState *state, BlockNumber childblock,
-                 ScanKey targetkey)
+bt_downlink_check(BtreeCheckState *state, BTScanInsert targetkey,
+                 BlockNumber childblock)
 {
    OffsetNumber offset;
    OffsetNumber maxoffset;
@@ -1423,8 +1424,7 @@ bt_downlink_check(BtreeCheckState *state, BlockNumber childblock,
        if (offset_is_negative_infinity(copaque, offset))
            continue;
 
-       if (!invariant_leq_nontarget_offset(state, child,
-                                           targetkey, offset))
+       if (!invariant_leq_nontarget_offset(state, targetkey, child, offset))
            ereport(ERROR,
                    (errcode(ERRCODE_INDEX_CORRUPTED),
                     errmsg("down-link lower bound invariant violated for index \"%s\"",
@@ -1864,13 +1864,12 @@ offset_is_negative_infinity(BTPageOpaque opaque, OffsetNumber offset)
  * to corruption.
  */
 static inline bool
-invariant_leq_offset(BtreeCheckState *state, ScanKey key,
+invariant_leq_offset(BtreeCheckState *state, BTScanInsert key,
                     OffsetNumber upperbound)
 {
-   int16       nkeyatts = IndexRelationGetNumberOfKeyAttributes(state->rel);
    int32       cmp;
 
-   cmp = _bt_compare(state->rel, nkeyatts, key, state->target, upperbound);
+   cmp = _bt_compare(state->rel, key, state->target, upperbound);
 
    return cmp <= 0;
 }
@@ -1883,13 +1882,12 @@ invariant_leq_offset(BtreeCheckState *state, ScanKey key,
  * to corruption.
  */
 static inline bool
-invariant_geq_offset(BtreeCheckState *state, ScanKey key,
+invariant_geq_offset(BtreeCheckState *state, BTScanInsert key,
                     OffsetNumber lowerbound)
 {
-   int16       nkeyatts = IndexRelationGetNumberOfKeyAttributes(state->rel);
    int32       cmp;
 
-   cmp = _bt_compare(state->rel, nkeyatts, key, state->target, lowerbound);
+   cmp = _bt_compare(state->rel, key, state->target, lowerbound);
 
    return cmp >= 0;
 }
@@ -1905,14 +1903,12 @@ invariant_geq_offset(BtreeCheckState *state, ScanKey key,
  * to corruption.
  */
 static inline bool
-invariant_leq_nontarget_offset(BtreeCheckState *state,
-                              Page nontarget, ScanKey key,
-                              OffsetNumber upperbound)
+invariant_leq_nontarget_offset(BtreeCheckState *state, BTScanInsert key,
+                              Page nontarget, OffsetNumber upperbound)
 {
-   int16       nkeyatts = IndexRelationGetNumberOfKeyAttributes(state->rel);
    int32       cmp;
 
-   cmp = _bt_compare(state->rel, nkeyatts, key, nontarget, upperbound);
+   cmp = _bt_compare(state->rel, key, nontarget, upperbound);
 
    return cmp <= 0;
 }
index b0b4ab8b76600559c9058a89b5d45e51d6b56913..a295a7a286d444eb17f30768c4a12bbb5175bb17 100644 (file)
@@ -598,19 +598,22 @@ scankey point to comparison functions that return boolean, such as int4lt.
 There might be more than one scankey entry for a given index column, or
 none at all.  (We require the keys to appear in index column order, but
 the order of multiple keys for a given column is unspecified.)  An
-insertion scankey uses the same array-of-ScanKey data structure, but the
-sk_func pointers point to btree comparison support functions (ie, 3-way
-comparators that return int4 values interpreted as <0, =0, >0).  In an
-insertion scankey there is exactly one entry per index column.  Insertion
-scankeys are built within the btree code (eg, by _bt_mkscankey()) and are
-used to locate the starting point of a scan, as well as for locating the
-place to insert a new index tuple.  (Note: in the case of an insertion
-scankey built from a search scankey, there might be fewer keys than
-index columns, indicating that we have no constraints for the remaining
-index columns.)  After we have located the starting point of a scan, the
-original search scankey is consulted as each index entry is sequentially
-scanned to decide whether to return the entry and whether the scan can
-stop (see _bt_checkkeys()).
+insertion scankey ("BTScanInsert" data structure) uses a similar
+array-of-ScanKey data structure, but the sk_func pointers point to btree
+comparison support functions (ie, 3-way comparators that return int4 values
+interpreted as <0, =0, >0).  In an insertion scankey there is at most one
+entry per index column.  There is also other data about the rules used to
+locate where to begin the scan, such as whether or not the scan is a
+"nextkey" scan.  Insertion scankeys are built within the btree code (eg, by
+_bt_mkscankey()) and are used to locate the starting point of a scan, as
+well as for locating the place to insert a new index tuple.  (Note: in the
+case of an insertion scankey built from a search scankey or built from a
+truncated pivot tuple, there might be fewer keys than index columns,
+indicating that we have no constraints for the remaining index columns.)
+After we have located the starting point of a scan, the original search
+scankey is consulted as each index entry is sequentially scanned to decide
+whether to return the entry and whether the scan can stop (see
+_bt_checkkeys()).
 
 We use term "pivot" index tuples to distinguish tuples which don't point
 to heap tuples, but rather used for tree navigation.  Pivot tuples includes
index 2997b1111a24e105ac12b8ea5ae7d428bb3c5bb9..1facd0535d87e25c2dd4e13956ff9e3f221f310d 100644 (file)
@@ -51,19 +51,16 @@ typedef struct
 
 static Buffer _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf);
 
-static TransactionId _bt_check_unique(Relation rel, IndexTuple itup,
-                Relation heapRel, Buffer buf, OffsetNumber offset,
-                ScanKey itup_scankey,
+static TransactionId _bt_check_unique(Relation rel, BTInsertState insertstate,
+                Relation heapRel,
                 IndexUniqueCheck checkUnique, bool *is_unique,
                 uint32 *speculativeToken);
-static void _bt_findinsertloc(Relation rel,
-                 Buffer *bufptr,
-                 OffsetNumber *offsetptr,
-                 int keysz,
-                 ScanKey scankey,
-                 IndexTuple newtup,
+static OffsetNumber _bt_findinsertloc(Relation rel,
+                 BTInsertState insertstate,
+                 bool checkingunique,
                  BTStack stack,
                  Relation heapRel);
+static void _bt_stepright(Relation rel, BTInsertState insertstate, BTStack stack);
 static void _bt_insertonpg(Relation rel, Buffer buf, Buffer cbuf,
               BTStack stack,
               IndexTuple itup,
@@ -83,8 +80,8 @@ static void _bt_checksplitloc(FindSplitData *state,
                  int dataitemstoleft, Size firstoldonrightsz);
 static bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
             OffsetNumber itup_off);
-static bool _bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
-           int keysz, ScanKey scankey);
+static bool _bt_isequal(TupleDesc itupdesc, BTScanInsert itup_key,
+           Page page, OffsetNumber offnum);
 static void _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel);
 
 /*
@@ -110,18 +107,26 @@ _bt_doinsert(Relation rel, IndexTuple itup,
             IndexUniqueCheck checkUnique, Relation heapRel)
 {
    bool        is_unique = false;
-   int         indnkeyatts;
-   ScanKey     itup_scankey;
+   BTInsertStateData insertstate;
+   BTScanInsert itup_key;
    BTStack     stack = NULL;
    Buffer      buf;
-   OffsetNumber offset;
    bool        fastpath;
-
-   indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
-   Assert(indnkeyatts != 0);
+   bool        checkingunique = (checkUnique != UNIQUE_CHECK_NO);
 
    /* we need an insertion scan key to do our search, so build one */
-   itup_scankey = _bt_mkscankey(rel, itup);
+   itup_key = _bt_mkscankey(rel, itup);
+
+   /*
+    * Fill in the BTInsertState working area, to track the current page and
+    * position within the page to insert on
+    */
+   insertstate.itup = itup;
+   /* PageAddItem will MAXALIGN(), but be consistent */
+   insertstate.itemsz = MAXALIGN(IndexTupleSize(itup));
+   insertstate.itup_key = itup_key;
+   insertstate.bounds_valid = false;
+   insertstate.buf = InvalidBuffer;
 
    /*
     * It's very common to have an index on an auto-incremented or
@@ -144,10 +149,8 @@ _bt_doinsert(Relation rel, IndexTuple itup,
     */
 top:
    fastpath = false;
-   offset = InvalidOffsetNumber;
    if (RelationGetTargetBlock(rel) != InvalidBlockNumber)
    {
-       Size        itemsz;
        Page        page;
        BTPageOpaque lpageop;
 
@@ -166,9 +169,6 @@ top:
            page = BufferGetPage(buf);
 
            lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
-           itemsz = IndexTupleSize(itup);
-           itemsz = MAXALIGN(itemsz);  /* be safe, PageAddItem will do this
-                                        * but we need to be consistent */
 
            /*
             * Check if the page is still the rightmost leaf page, has enough
@@ -177,10 +177,9 @@ top:
             */
            if (P_ISLEAF(lpageop) && P_RIGHTMOST(lpageop) &&
                !P_IGNORE(lpageop) &&
-               (PageGetFreeSpace(page) > itemsz) &&
+               (PageGetFreeSpace(page) > insertstate.itemsz) &&
                PageGetMaxOffsetNumber(page) >= P_FIRSTDATAKEY(lpageop) &&
-               _bt_compare(rel, indnkeyatts, itup_scankey, page,
-                           P_FIRSTDATAKEY(lpageop)) > 0)
+               _bt_compare(rel, itup_key, page, P_FIRSTDATAKEY(lpageop)) > 0)
            {
                /*
                 * The right-most block should never have an incomplete split.
@@ -219,10 +218,12 @@ top:
         * Find the first page containing this key.  Buffer returned by
         * _bt_search() is locked in exclusive mode.
         */
-       stack = _bt_search(rel, indnkeyatts, itup_scankey, false, &buf, BT_WRITE,
-                          NULL);
+       stack = _bt_search(rel, itup_key, &buf, BT_WRITE, NULL);
    }
 
+   insertstate.buf = buf;
+   buf = InvalidBuffer;        /* insertstate.buf now owns the buffer */
+
    /*
     * If we're not allowing duplicates, make sure the key isn't already in
     * the index.
@@ -244,19 +245,19 @@ top:
     * let the tuple in and return false for possibly non-unique, or true for
     * definitely unique.
     */
-   if (checkUnique != UNIQUE_CHECK_NO)
+   if (checkingunique)
    {
        TransactionId xwait;
        uint32      speculativeToken;
 
-       offset = _bt_binsrch(rel, buf, indnkeyatts, itup_scankey, false);
-       xwait = _bt_check_unique(rel, itup, heapRel, buf, offset, itup_scankey,
-                                checkUnique, &is_unique, &speculativeToken);
+       xwait = _bt_check_unique(rel, &insertstate, heapRel, checkUnique,
+                                &is_unique, &speculativeToken);
 
        if (TransactionIdIsValid(xwait))
        {
            /* Have to wait for the other guy ... */
-           _bt_relbuf(rel, buf);
+           _bt_relbuf(rel, insertstate.buf);
+           insertstate.buf = InvalidBuffer;
 
            /*
             * If it's a speculative insertion, wait for it to finish (ie. to
@@ -277,6 +278,8 @@ top:
 
    if (checkUnique != UNIQUE_CHECK_EXISTING)
    {
+       OffsetNumber newitemoff;
+
        /*
         * The only conflict predicate locking cares about for indexes is when
         * an index tuple insert conflicts with an existing lock.  Since the
@@ -286,22 +289,28 @@ top:
         * This reasoning also applies to INCLUDE indexes, whose extra
         * attributes are not considered part of the key space.
         */
-       CheckForSerializableConflictIn(rel, NULL, buf);
-       /* do the insertion */
-       _bt_findinsertloc(rel, &buf, &offset, indnkeyatts, itup_scankey, itup,
-                         stack, heapRel);
-       _bt_insertonpg(rel, buf, InvalidBuffer, stack, itup, offset, false);
+       CheckForSerializableConflictIn(rel, NULL, insertstate.buf);
+
+       /*
+        * Do the insertion.  Note that insertstate contains cached binary
+        * search bounds established within _bt_check_unique when insertion is
+        * checkingunique.
+        */
+       newitemoff = _bt_findinsertloc(rel, &insertstate, checkingunique,
+                                      stack, heapRel);
+       _bt_insertonpg(rel, insertstate.buf, InvalidBuffer, stack, itup,
+                      newitemoff, false);
    }
    else
    {
        /* just release the buffer */
-       _bt_relbuf(rel, buf);
+       _bt_relbuf(rel, insertstate.buf);
    }
 
    /* be tidy */
    if (stack)
        _bt_freestack(stack);
-   _bt_freeskey(itup_scankey);
+   pfree(itup_key);
 
    return is_unique;
 }
@@ -309,10 +318,6 @@ top:
 /*
  * _bt_check_unique() -- Check for violation of unique index constraint
  *
- * offset points to the first possible item that could conflict. It can
- * also point to end-of-page, which means that the first tuple to check
- * is the first tuple on the next page.
- *
  * Returns InvalidTransactionId if there is no conflict, else an xact ID
  * we must wait for to see if it commits a conflicting tuple.   If an actual
  * conflict is detected, no return --- just ereport().  If an xact ID is
@@ -324,16 +329,21 @@ top:
  * InvalidTransactionId because we don't want to wait.  In this case we
  * set *is_unique to false if there is a potential conflict, and the
  * core code must redo the uniqueness check later.
+ *
+ * As a side-effect, sets state in insertstate that can later be used by
+ * _bt_findinsertloc() to reuse most of the binary search work we do
+ * here.
  */
 static TransactionId
-_bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
-                Buffer buf, OffsetNumber offset, ScanKey itup_scankey,
+_bt_check_unique(Relation rel, BTInsertState insertstate, Relation heapRel,
                 IndexUniqueCheck checkUnique, bool *is_unique,
                 uint32 *speculativeToken)
 {
    TupleDesc   itupdesc = RelationGetDescr(rel);
-   int         indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
+   IndexTuple  itup = insertstate->itup;
+   BTScanInsert itup_key = insertstate->itup_key;
    SnapshotData SnapshotDirty;
+   OffsetNumber offset;
    OffsetNumber maxoff;
    Page        page;
    BTPageOpaque opaque;
@@ -345,13 +355,22 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 
    InitDirtySnapshot(SnapshotDirty);
 
-   page = BufferGetPage(buf);
+   page = BufferGetPage(insertstate->buf);
    opaque = (BTPageOpaque) PageGetSpecialPointer(page);
    maxoff = PageGetMaxOffsetNumber(page);
 
+   /*
+    * Find the first tuple with the same key.
+    *
+    * This also saves the binary search bounds in insertstate.  We use them
+    * in the fastpath below, but also in the _bt_findinsertloc() call later.
+    */
+   offset = _bt_binsrch_insert(rel, insertstate);
+
    /*
     * Scan over all equal tuples, looking for live conflicts.
     */
+   Assert(!insertstate->bounds_valid || insertstate->low == offset);
    for (;;)
    {
        ItemId      curitemid;
@@ -364,21 +383,40 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
         */
        if (offset <= maxoff)
        {
+           /*
+            * Fastpath: In most cases, we can use cached search bounds to
+            * limit our consideration to items that are definitely
+            * duplicates.  This fastpath doesn't apply when the original page
+            * is empty, or when initial offset is past the end of the
+            * original page, which may indicate that we need to examine a
+            * second or subsequent page.
+            *
+            * Note that this optimization avoids calling _bt_isequal()
+            * entirely when there are no duplicates, as long as the offset
+            * where the key will go is not at the end of the page.
+            */
+           if (nbuf == InvalidBuffer && offset == insertstate->stricthigh)
+           {
+               Assert(insertstate->bounds_valid);
+               Assert(insertstate->low >= P_FIRSTDATAKEY(opaque));
+               Assert(insertstate->low <= insertstate->stricthigh);
+               Assert(!_bt_isequal(itupdesc, itup_key, page, offset));
+               break;
+           }
+
            curitemid = PageGetItemId(page, offset);
 
            /*
             * We can skip items that are marked killed.
             *
-            * Formerly, we applied _bt_isequal() before checking the kill
-            * flag, so as to fall out of the item loop as soon as possible.
-            * However, in the presence of heavy update activity an index may
-            * contain many killed items with the same key; running
-            * _bt_isequal() on each killed item gets expensive. Furthermore
-            * it is likely that the non-killed version of each key appears
-            * first, so that we didn't actually get to exit any sooner
-            * anyway. So now we just advance over killed items as quickly as
-            * we can. We only apply _bt_isequal() when we get to a non-killed
-            * item or the end of the page.
+            * In the presence of heavy update activity an index may contain
+            * many killed items with the same key; running _bt_isequal() on
+            * each killed item gets expensive.  Just advance over killed
+            * items as quickly as we can.  We only apply _bt_isequal() when
+            * we get to a non-killed item.  Even those comparisons could be
+            * avoided (in the common case where there is only one page to
+            * visit) by reusing bounds, but just skipping dead items is fast
+            * enough.
             */
            if (!ItemIdIsDead(curitemid))
            {
@@ -391,7 +429,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                 * in real comparison, but only for ordering/finding items on
                 * pages. - vadim 03/24/97
                 */
-               if (!_bt_isequal(itupdesc, page, offset, indnkeyatts, itup_scankey))
+               if (!_bt_isequal(itupdesc, itup_key, page, offset))
                    break;      /* we're past all the equal tuples */
 
                /* okay, we gotta fetch the heap tuple ... */
@@ -488,7 +526,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                     * otherwise be masked by this unique constraint
                     * violation.
                     */
-                   CheckForSerializableConflictIn(rel, NULL, buf);
+                   CheckForSerializableConflictIn(rel, NULL, insertstate->buf);
 
                    /*
                     * This is a definite conflict.  Break the tuple down into
@@ -500,7 +538,8 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                     */
                    if (nbuf != InvalidBuffer)
                        _bt_relbuf(rel, nbuf);
-                   _bt_relbuf(rel, buf);
+                   _bt_relbuf(rel, insertstate->buf);
+                   insertstate->buf = InvalidBuffer;
 
                    {
                        Datum       values[INDEX_MAX_KEYS];
@@ -540,7 +579,7 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
                    if (nbuf != InvalidBuffer)
                        MarkBufferDirtyHint(nbuf, true);
                    else
-                       MarkBufferDirtyHint(buf, true);
+                       MarkBufferDirtyHint(insertstate->buf, true);
                }
            }
        }
@@ -552,11 +591,14 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
            offset = OffsetNumberNext(offset);
        else
        {
+           int         highkeycmp;
+
            /* If scankey == hikey we gotta check the next page too */
            if (P_RIGHTMOST(opaque))
                break;
-           if (!_bt_isequal(itupdesc, page, P_HIKEY,
-                            indnkeyatts, itup_scankey))
+           highkeycmp = _bt_compare(rel, itup_key, page, P_HIKEY);
+           Assert(highkeycmp <= 0);
+           if (highkeycmp != 0)
                break;
            /* Advance to next non-dead page --- there must be one */
            for (;;)
@@ -600,57 +642,41 @@ _bt_check_unique(Relation rel, IndexTuple itup, Relation heapRel,
 /*
  * _bt_findinsertloc() -- Finds an insert location for a tuple
  *
+ *     On entry, insertstate buffer contains the first legal page the new
+ *     tuple could be inserted to.  It is exclusive-locked and pinned by the
+ *     caller.
+ *
  *     If the new key is equal to one or more existing keys, we can
  *     legitimately place it anywhere in the series of equal keys --- in fact,
  *     if the new key is equal to the page's "high key" we can place it on
  *     the next page.  If it is equal to the high key, and there's not room
  *     to insert the new tuple on the current page without splitting, then
  *     we can move right hoping to find more free space and avoid a split.
- *     (We should not move right indefinitely, however, since that leads to
- *     O(N^2) insertion behavior in the presence of many equal keys.)
- *     Once we have chosen the page to put the key on, we'll insert it before
- *     any existing equal keys because of the way _bt_binsrch() works.
- *
- *     If there's not enough room in the space, we try to make room by
- *     removing any LP_DEAD tuples.
+ *     Furthermore, if there's not enough room on a page, we try to make
+ *     room by removing any LP_DEAD tuples.
  *
- *     On entry, *bufptr and *offsetptr point to the first legal position
- *     where the new tuple could be inserted.  The caller should hold an
- *     exclusive lock on *bufptr.  *offsetptr can also be set to
- *     InvalidOffsetNumber, in which case the function will search for the
- *     right location within the page if needed.  On exit, they point to the
- *     chosen insert location.  If _bt_findinsertloc decides to move right,
- *     the lock and pin on the original page will be released and the new
- *     page returned to the caller is exclusively locked instead.
+ *     On exit, insertstate buffer contains the chosen insertion page, and
+ *     the offset within that page is returned.  If _bt_findinsertloc needed
+ *     to move right, the lock and pin on the original page are released, and
+ *     the new buffer is exclusively locked and pinned instead.
  *
- *     newtup is the new tuple we're inserting, and scankey is an insertion
- *     type scan key for it.
+ *     If insertstate contains cached binary search bounds, we will take
+ *     advantage of them.  This avoids repeating comparisons that we made in
+ *     _bt_check_unique() already.
  */
-static void
+static OffsetNumber
 _bt_findinsertloc(Relation rel,
-                 Buffer *bufptr,
-                 OffsetNumber *offsetptr,
-                 int keysz,
-                 ScanKey scankey,
-                 IndexTuple newtup,
+                 BTInsertState insertstate,
+                 bool checkingunique,
                  BTStack stack,
                  Relation heapRel)
 {
-   Buffer      buf = *bufptr;
-   Page        page = BufferGetPage(buf);
-   Size        itemsz;
+   BTScanInsert itup_key = insertstate->itup_key;
+   Page        page = BufferGetPage(insertstate->buf);
    BTPageOpaque lpageop;
-   bool        movedright,
-               vacuumed;
-   OffsetNumber newitemoff;
-   OffsetNumber firstlegaloff = *offsetptr;
 
    lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 
-   itemsz = IndexTupleSize(newtup);
-   itemsz = MAXALIGN(itemsz);  /* be safe, PageAddItem will do this but we
-                                * need to be consistent */
-
    /*
     * Check whether the item can fit on a btree page at all. (Eventually, we
     * ought to try to apply TOAST methods if not.) We actually need to be
@@ -660,11 +686,11 @@ _bt_findinsertloc(Relation rel,
     *
     * NOTE: if you change this, see also the similar code in _bt_buildadd().
     */
-   if (itemsz > BTMaxItemSize(page))
+   if (insertstate->itemsz > BTMaxItemSize(page))
        ereport(ERROR,
                (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
                 errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
-                       itemsz, BTMaxItemSize(page),
+                       insertstate->itemsz, BTMaxItemSize(page),
                        RelationGetRelationName(rel)),
                 errhint("Values larger than 1/3 of a buffer page cannot be indexed.\n"
                         "Consider a function index of an MD5 hash of the value, "
@@ -690,100 +716,113 @@ _bt_findinsertloc(Relation rel,
     * excellent job of preventing O(N^2) behavior with many equal keys.
     *----------
     */
-   movedright = false;
-   vacuumed = false;
-   while (PageGetFreeSpace(page) < itemsz)
-   {
-       Buffer      rbuf;
-       BlockNumber rblkno;
+   Assert(P_ISLEAF(lpageop) && !P_INCOMPLETE_SPLIT(lpageop));
+   Assert(!insertstate->bounds_valid || checkingunique);
 
+   while (PageGetFreeSpace(page) < insertstate->itemsz)
+   {
        /*
         * before considering moving right, see if we can obtain enough space
         * by erasing LP_DEAD items
         */
-       if (P_ISLEAF(lpageop) && P_HAS_GARBAGE(lpageop))
+       if (P_HAS_GARBAGE(lpageop))
        {
-           _bt_vacuum_one_page(rel, buf, heapRel);
+           _bt_vacuum_one_page(rel, insertstate->buf, heapRel);
+           insertstate->bounds_valid = false;
 
-           /*
-            * remember that we vacuumed this page, because that makes the
-            * hint supplied by the caller invalid
-            */
-           vacuumed = true;
-
-           if (PageGetFreeSpace(page) >= itemsz)
+           if (PageGetFreeSpace(page) >= insertstate->itemsz)
                break;          /* OK, now we have enough space */
        }
 
        /*
-        * nope, so check conditions (b) and (c) enumerated above
+        * Nope, so check conditions (b) and (c) enumerated above
+        *
+        * The earlier _bt_check_unique() call may well have established a
+        * strict upper bound on the offset for the new item.  If it's not the
+        * last item of the page (i.e. if there is at least one tuple on the
+        * page that's greater than the tuple we're inserting to) then we know
+        * that the tuple belongs on this page.  We can skip the high key
+        * check.
         */
+       if (insertstate->bounds_valid &&
+           insertstate->low <= insertstate->stricthigh &&
+           insertstate->stricthigh <= PageGetMaxOffsetNumber(page))
+           break;
+
        if (P_RIGHTMOST(lpageop) ||
-           _bt_compare(rel, keysz, scankey, page, P_HIKEY) != 0 ||
+           _bt_compare(rel, itup_key, page, P_HIKEY) != 0 ||
            random() <= (MAX_RANDOM_VALUE / 100))
            break;
 
-       /*
-        * step right to next non-dead page
-        *
-        * must write-lock that page before releasing write lock on current
-        * page; else someone else's _bt_check_unique scan could fail to see
-        * our insertion.  write locks on intermediate dead pages won't do
-        * because we don't know when they will get de-linked from the tree.
-        */
-       rbuf = InvalidBuffer;
+       _bt_stepright(rel, insertstate, stack);
+       /* Update local state after stepping right */
+       page = BufferGetPage(insertstate->buf);
+       lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+   }
 
-       rblkno = lpageop->btpo_next;
-       for (;;)
-       {
-           rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
-           page = BufferGetPage(rbuf);
-           lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+   /*
+    * We should now be on the correct page.  Find the offset within the page
+    * for the new tuple. (Possibly reusing earlier search bounds.)
+    */
+   Assert(P_RIGHTMOST(lpageop) ||
+          _bt_compare(rel, itup_key, page, P_HIKEY) <= 0);
 
-           /*
-            * If this page was incompletely split, finish the split now. We
-            * do this while holding a lock on the left sibling, which is not
-            * good because finishing the split could be a fairly lengthy
-            * operation.  But this should happen very seldom.
-            */
-           if (P_INCOMPLETE_SPLIT(lpageop))
-           {
-               _bt_finish_split(rel, rbuf, stack);
-               rbuf = InvalidBuffer;
-               continue;
-           }
+   return _bt_binsrch_insert(rel, insertstate);
+}
 
-           if (!P_IGNORE(lpageop))
-               break;
-           if (P_RIGHTMOST(lpageop))
-               elog(ERROR, "fell off the end of index \"%s\"",
-                    RelationGetRelationName(rel));
+/*
+ * Step right to next non-dead page, during insertion.
+ *
+ * This is a bit more complicated than moving right in a search.  We must
+ * write-lock the target page before releasing write lock on current page;
+ * else someone else's _bt_check_unique scan could fail to see our insertion.
+ * Write locks on intermediate dead pages won't do because we don't know when
+ * they will get de-linked from the tree.
+ */
+static void
+_bt_stepright(Relation rel, BTInsertState insertstate, BTStack stack)
+{
+   Page        page;
+   BTPageOpaque lpageop;
+   Buffer      rbuf;
+   BlockNumber rblkno;
+
+   page = BufferGetPage(insertstate->buf);
+   lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
+
+   rbuf = InvalidBuffer;
+   rblkno = lpageop->btpo_next;
+   for (;;)
+   {
+       rbuf = _bt_relandgetbuf(rel, rbuf, rblkno, BT_WRITE);
+       page = BufferGetPage(rbuf);
+       lpageop = (BTPageOpaque) PageGetSpecialPointer(page);
 
-           rblkno = lpageop->btpo_next;
+       /*
+        * If this page was incompletely split, finish the split now.  We do
+        * this while holding a lock on the left sibling, which is not good
+        * because finishing the split could be a fairly lengthy operation.
+        * But this should happen very seldom.
+        */
+       if (P_INCOMPLETE_SPLIT(lpageop))
+       {
+           _bt_finish_split(rel, rbuf, stack);
+           rbuf = InvalidBuffer;
+           continue;
        }
-       _bt_relbuf(rel, buf);
-       buf = rbuf;
-       movedright = true;
-       vacuumed = false;
-   }
 
-   /*
-    * Now we are on the right page, so find the insert position. If we moved
-    * right at all, we know we should insert at the start of the page. If we
-    * didn't move right, we can use the firstlegaloff hint if the caller
-    * supplied one, unless we vacuumed the page which might have moved tuples
-    * around making the hint invalid. If we didn't move right or can't use
-    * the hint, find the position by searching.
-    */
-   if (movedright)
-       newitemoff = P_FIRSTDATAKEY(lpageop);
-   else if (firstlegaloff != InvalidOffsetNumber && !vacuumed)
-       newitemoff = firstlegaloff;
-   else
-       newitemoff = _bt_binsrch(rel, buf, keysz, scankey, false);
+       if (!P_IGNORE(lpageop))
+           break;
+       if (P_RIGHTMOST(lpageop))
+           elog(ERROR, "fell off the end of index \"%s\"",
+                RelationGetRelationName(rel));
 
-   *bufptr = buf;
-   *offsetptr = newitemoff;
+       rblkno = lpageop->btpo_next;
+   }
+   /* rbuf locked; unlock buf, update state for caller */
+   _bt_relbuf(rel, insertstate->buf);
+   insertstate->buf = rbuf;
+   insertstate->bounds_valid = false;
 }
 
 /*----------
@@ -2312,24 +2351,21 @@ _bt_pgaddtup(Page page,
  * Rule is simple: NOT_NULL not equal NULL, NULL not equal NULL too.
  */
 static bool
-_bt_isequal(TupleDesc itupdesc, Page page, OffsetNumber offnum,
-           int keysz, ScanKey scankey)
+_bt_isequal(TupleDesc itupdesc, BTScanInsert itup_key, Page page,
+           OffsetNumber offnum)
 {
    IndexTuple  itup;
+   ScanKey     scankey;
    int         i;
 
-   /* Better be comparing to a leaf item */
+   /* Better be comparing to a non-pivot item */
    Assert(P_ISLEAF((BTPageOpaque) PageGetSpecialPointer(page)));
+   Assert(offnum >= P_FIRSTDATAKEY((BTPageOpaque) PageGetSpecialPointer(page)));
 
+   scankey = itup_key->scankeys;
    itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offnum));
 
-   /*
-    * It's okay that we might perform a comparison against a truncated page
-    * high key when caller needs to determine if _bt_check_unique scan must
-    * continue on to the next page.  Caller never asks us to compare non-key
-    * attributes within an INCLUDE index.
-    */
-   for (i = 1; i <= keysz; i++)
+   for (i = 1; i <= itup_key->keysz; i++)
    {
        AttrNumber  attno;
        Datum       datum;
@@ -2377,6 +2413,8 @@ _bt_vacuum_one_page(Relation rel, Buffer buffer, Relation heapRel)
    Page        page = BufferGetPage(buffer);
    BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
 
+   Assert(P_ISLEAF(opaque));
+
    /*
     * Scan over all items to see which ones need to be deleted according to
     * LP_DEAD flags.
index 9c785bca95e35f956d9be55cb995923204ce84f8..56041c3d38322fad29c5a1b5aae22c57ba7eb00d 100644 (file)
@@ -1371,7 +1371,7 @@ _bt_pagedel(Relation rel, Buffer buf)
             */
            if (!stack)
            {
-               ScanKey     itup_scankey;
+               BTScanInsert itup_key;
                ItemId      itemid;
                IndexTuple  targetkey;
                Buffer      lbuf;
@@ -1421,12 +1421,10 @@ _bt_pagedel(Relation rel, Buffer buf)
                }
 
                /* we need an insertion scan key for the search, so build one */
-               itup_scankey = _bt_mkscankey(rel, targetkey);
-               /* find the leftmost leaf page containing this key */
-               stack = _bt_search(rel,
-                                  IndexRelationGetNumberOfKeyAttributes(rel),
-                                  itup_scankey, false, &lbuf, BT_READ, NULL);
-               /* don't need a pin on the page */
+               itup_key = _bt_mkscankey(rel, targetkey);
+               /* get stack to leaf page by searching index */
+               stack = _bt_search(rel, itup_key, &lbuf, BT_READ, NULL);
+               /* don't need a lock or second pin on the page */
                _bt_relbuf(rel, lbuf);
 
                /*
index a0d78b6d68209d7fd45385a1859db1a2fffdc824..5a5c30abc3a13e4e0b7907c63b1e1d5330fa99fa 100644 (file)
@@ -25,6 +25,7 @@
 
 
 static void _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp);
+static OffsetNumber _bt_binsrch(Relation rel, BTScanInsert key, Buffer buf);
 static bool _bt_readpage(IndexScanDesc scan, ScanDirection dir,
             OffsetNumber offnum);
 static void _bt_saveitem(BTScanOpaque so, int itemIndex,
@@ -70,13 +71,9 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
  * _bt_search() -- Search the tree for a particular scankey,
  *     or more precisely for the first leaf page it could be on.
  *
- * The passed scankey must be an insertion-type scankey (see nbtree/README),
+ * The passed scankey is an insertion-type scankey (see nbtree/README),
  * but it can omit the rightmost column(s) of the index.
  *
- * When nextkey is false (the usual case), we are looking for the first
- * item >= scankey.  When nextkey is true, we are looking for the first
- * item strictly greater than scankey.
- *
  * Return value is a stack of parent-page pointers.  *bufP is set to the
  * address of the leaf-page buffer, which is read-locked and pinned.
  * No locks are held on the parent pages, however!
@@ -92,8 +89,8 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
  * during the search will be finished.
  */
 BTStack
-_bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
-          Buffer *bufP, int access, Snapshot snapshot)
+_bt_search(Relation rel, BTScanInsert key, Buffer *bufP, int access,
+          Snapshot snapshot)
 {
    BTStack     stack_in = NULL;
    int         page_access = BT_READ;
@@ -129,8 +126,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
         * if the leaf page is split and we insert to the parent page).  But
         * this is a good opportunity to finish splits of internal pages too.
         */
-       *bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
-                             (access == BT_WRITE), stack_in,
+       *bufP = _bt_moveright(rel, key, *bufP, (access == BT_WRITE), stack_in,
                              page_access, snapshot);
 
        /* if this is a leaf page, we're done */
@@ -143,7 +139,7 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
         * Find the appropriate item on the internal page, and get the child
         * page that it points to.
         */
-       offnum = _bt_binsrch(rel, *bufP, keysz, scankey, nextkey);
+       offnum = _bt_binsrch(rel, key, *bufP);
        itemid = PageGetItemId(page, offnum);
        itup = (IndexTuple) PageGetItem(page, itemid);
        blkno = BTreeInnerTupleGetDownLink(itup);
@@ -197,8 +193,8 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
         * need to move right in the tree.  See Lehman and Yao for an
         * excruciatingly precise description.
         */
-       *bufP = _bt_moveright(rel, *bufP, keysz, scankey, nextkey,
-                             true, stack_in, BT_WRITE, snapshot);
+       *bufP = _bt_moveright(rel, key, *bufP, true, stack_in, BT_WRITE,
+                             snapshot);
    }
 
    return stack_in;
@@ -214,16 +210,17 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
  * or strictly to the right of it.
  *
  * This routine decides whether or not we need to move right in the
- * tree by examining the high key entry on the page.  If that entry
- * is strictly less than the scankey, or <= the scankey in the nextkey=true
- * case, then we followed the wrong link and we need to move right.
+ * tree by examining the high key entry on the page.  If that entry is
+ * strictly less than the scankey, or <= the scankey in the
+ * key.nextkey=true case, then we followed the wrong link and we need
+ * to move right.
  *
- * The passed scankey must be an insertion-type scankey (see nbtree/README),
- * but it can omit the rightmost column(s) of the index.
+ * The passed insertion-type scankey can omit the rightmost column(s) of the
+ * index. (see nbtree/README)
  *
- * When nextkey is false (the usual case), we are looking for the first
- * item >= scankey.  When nextkey is true, we are looking for the first
- * item strictly greater than scankey.
+ * When key.nextkey is false (the usual case), we are looking for the first
+ * item >= key.  When key.nextkey is true, we are looking for the first item
+ * strictly greater than key.
  *
  * If forupdate is true, we will attempt to finish any incomplete splits
  * that we encounter.  This is required when locking a target page for an
@@ -240,10 +237,8 @@ _bt_search(Relation rel, int keysz, ScanKey scankey, bool nextkey,
  */
 Buffer
 _bt_moveright(Relation rel,
+             BTScanInsert key,
              Buffer buf,
-             int keysz,
-             ScanKey scankey,
-             bool nextkey,
              bool forupdate,
              BTStack stack,
              int access,
@@ -268,7 +263,7 @@ _bt_moveright(Relation rel,
     * We also have to move right if we followed a link that brought us to a
     * dead page.
     */
-   cmpval = nextkey ? 0 : 1;
+   cmpval = key->nextkey ? 0 : 1;
 
    for (;;)
    {
@@ -303,7 +298,7 @@ _bt_moveright(Relation rel,
            continue;
        }
 
-       if (P_IGNORE(opaque) || _bt_compare(rel, keysz, scankey, page, P_HIKEY) >= cmpval)
+       if (P_IGNORE(opaque) || _bt_compare(rel, key, page, P_HIKEY) >= cmpval)
        {
            /* step right one page */
            buf = _bt_relandgetbuf(rel, buf, opaque->btpo_next, access);
@@ -323,13 +318,6 @@ _bt_moveright(Relation rel,
 /*
  * _bt_binsrch() -- Do a binary search for a key on a particular page.
  *
- * The passed scankey must be an insertion-type scankey (see nbtree/README),
- * but it can omit the rightmost column(s) of the index.
- *
- * When nextkey is false (the usual case), we are looking for the first
- * item >= scankey.  When nextkey is true, we are looking for the first
- * item strictly greater than scankey.
- *
  * On a leaf page, _bt_binsrch() returns the OffsetNumber of the first
  * key >= given scankey, or > scankey if nextkey is true.  (NOTE: in
  * particular, this means it is possible to return a value 1 greater than the
@@ -347,12 +335,10 @@ _bt_moveright(Relation rel,
  * the given page.  _bt_binsrch() has no lock or refcount side effects
  * on the buffer.
  */
-OffsetNumber
+static OffsetNumber
 _bt_binsrch(Relation rel,
-           Buffer buf,
-           int keysz,
-           ScanKey scankey,
-           bool nextkey)
+           BTScanInsert key,
+           Buffer buf)
 {
    Page        page;
    BTPageOpaque opaque;
@@ -374,7 +360,7 @@ _bt_binsrch(Relation rel,
     * This can never happen on an internal page, however, since they are
     * never empty (an internal page must have children).
     */
-   if (high < low)
+   if (unlikely(high < low))
        return low;
 
    /*
@@ -391,7 +377,7 @@ _bt_binsrch(Relation rel,
     */
    high++;                     /* establish the loop invariant for high */
 
-   cmpval = nextkey ? 0 : 1;   /* select comparison value */
+   cmpval = key->nextkey ? 0 : 1;  /* select comparison value */
 
    while (high > low)
    {
@@ -399,7 +385,7 @@ _bt_binsrch(Relation rel,
 
        /* We have low <= mid < high, so mid points at a real slot */
 
-       result = _bt_compare(rel, keysz, scankey, page, mid);
+       result = _bt_compare(rel, key, page, mid);
 
        if (result >= cmpval)
            low = mid + 1;
@@ -426,14 +412,120 @@ _bt_binsrch(Relation rel,
    return OffsetNumberPrev(low);
 }
 
-/*----------
- * _bt_compare() -- Compare scankey to a particular tuple on the page.
+/*
  *
- * The passed scankey must be an insertion-type scankey (see nbtree/README),
- * but it can omit the rightmost column(s) of the index.
+ * bt_binsrch_insert() -- Cacheable, incremental leaf page binary search.
+ *
+ * Like _bt_binsrch(), but with support for caching the binary search
+ * bounds.  Only used during insertion, and only on the leaf page that it
+ * looks like caller will insert tuple on.  Exclusive-locked and pinned
+ * leaf page is contained within insertstate.
+ *
+ * Caches the bounds fields in insertstate so that a subsequent call can
+ * reuse the low and strict high bounds of original binary search.  Callers
+ * that use these fields directly must be prepared for the case where low
+ * and/or stricthigh are not on the same page (one or both exceed maxoff
+ * for the page).  The case where there are no items on the page (high <
+ * low) makes bounds invalid.
+ *
+ * Caller is responsible for invalidating bounds when it modifies the page
+ * before calling here a second time.
+ */
+OffsetNumber
+_bt_binsrch_insert(Relation rel, BTInsertState insertstate)
+{
+   BTScanInsert key = insertstate->itup_key;
+   Page        page;
+   BTPageOpaque opaque;
+   OffsetNumber low,
+               high,
+               stricthigh;
+   int32       result,
+               cmpval;
+
+   page = BufferGetPage(insertstate->buf);
+   opaque = (BTPageOpaque) PageGetSpecialPointer(page);
+
+   Assert(P_ISLEAF(opaque));
+   Assert(!key->nextkey);
+
+   if (!insertstate->bounds_valid)
+   {
+       /* Start new binary search */
+       low = P_FIRSTDATAKEY(opaque);
+       high = PageGetMaxOffsetNumber(page);
+   }
+   else
+   {
+       /* Restore result of previous binary search against same page */
+       low = insertstate->low;
+       high = insertstate->stricthigh;
+   }
+
+   /* If there are no keys on the page, return the first available slot */
+   if (unlikely(high < low))
+   {
+       /* Caller can't reuse bounds */
+       insertstate->low = InvalidOffsetNumber;
+       insertstate->stricthigh = InvalidOffsetNumber;
+       insertstate->bounds_valid = false;
+       return low;
+   }
+
+   /*
+    * Binary search to find the first key on the page >= scan key. (nextkey
+    * is always false when inserting).
+    *
+    * The loop invariant is: all slots before 'low' are < scan key, all slots
+    * at or after 'high' are >= scan key.  'stricthigh' is > scan key, and is
+    * maintained to save additional search effort for caller.
+    *
+    * We can fall out when high == low.
+    */
+   if (!insertstate->bounds_valid)
+       high++;                 /* establish the loop invariant for high */
+   stricthigh = high;          /* high initially strictly higher */
+
+   cmpval = 1;                 /* !nextkey comparison value */
+
+   while (high > low)
+   {
+       OffsetNumber mid = low + ((high - low) / 2);
+
+       /* We have low <= mid < high, so mid points at a real slot */
+
+       result = _bt_compare(rel, key, page, mid);
+
+       if (result >= cmpval)
+           low = mid + 1;
+       else
+       {
+           high = mid;
+           if (result != 0)
+               stricthigh = high;
+       }
+   }
+
+   /*
+    * On a leaf page, a binary search always returns the first key >= scan
+    * key (at least in !nextkey case), which could be the last slot + 1. This
+    * is also the lower bound of cached search.
+    *
+    * stricthigh may also be the last slot + 1, which prevents caller from
+    * using bounds directly, but is still useful to us if we're called a
+    * second time with cached bounds (cached low will be < stricthigh when
+    * that happens).
+    */
+   insertstate->low = low;
+   insertstate->stricthigh = stricthigh;
+   insertstate->bounds_valid = true;
+
+   return low;
+}
+
+/*----------
+ * _bt_compare() -- Compare insertion-type scankey to tuple on a page.
  *
- * keysz: number of key conditions to be checked (might be less than the
- *     number of index columns!)
  * page/offnum: location of btree item to be compared to.
  *
  *     This routine returns:
@@ -446,25 +538,26 @@ _bt_binsrch(Relation rel,
  *
  * CRUCIAL NOTE: on a non-leaf page, the first data key is assumed to be
  * "minus infinity": this routine will always claim it is less than the
- * scankey.  The actual key value stored (if any, which there probably isn't)
- * does not matter.  This convention allows us to implement the Lehman and
- * Yao convention that the first down-link pointer is before the first key.
- * See backend/access/nbtree/README for details.
+ * scankey.  The actual key value stored is explicitly truncated to 0
+ * attributes (explicitly minus infinity) with version 3+ indexes, but
+ * that isn't relied upon.  This allows us to implement the Lehman and
+ * Yao convention that the first down-link pointer is before the first
+ * key.  See backend/access/nbtree/README for details.
  *----------
  */
 int32
 _bt_compare(Relation rel,
-           int keysz,
-           ScanKey scankey,
+           BTScanInsert key,
            Page page,
            OffsetNumber offnum)
 {
    TupleDesc   itupdesc = RelationGetDescr(rel);
    BTPageOpaque opaque = (BTPageOpaque) PageGetSpecialPointer(page);
    IndexTuple  itup;
-   int         i;
+   ScanKey     scankey;
 
    Assert(_bt_check_natts(rel, page, offnum));
+   Assert(key->keysz <= IndexRelationGetNumberOfKeyAttributes(rel));
 
    /*
     * Force result ">" if target item is first data item on an internal page
@@ -487,7 +580,8 @@ _bt_compare(Relation rel,
     * _bt_first).
     */
 
-   for (i = 1; i <= keysz; i++)
+   scankey = key->scankeys;
+   for (int i = 1; i <= key->keysz; i++)
    {
        Datum       datum;
        bool        isNull;
@@ -573,8 +667,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    StrategyNumber strat;
    bool        nextkey;
    bool        goback;
+   BTScanInsertData inskey;
    ScanKey     startKeys[INDEX_MAX_KEYS];
-   ScanKeyData scankeys[INDEX_MAX_KEYS];
    ScanKeyData notnullkeys[INDEX_MAX_KEYS];
    int         keysCount = 0;
    int         i;
@@ -820,8 +914,9 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    /*
     * We want to start the scan somewhere within the index.  Set up an
     * insertion scankey we can use to search for the boundary point we
-    * identified above.  The insertion scankey is built in the local
-    * scankeys[] array, using the keys identified by startKeys[].
+    * identified above.  The insertion scankey is built using the keys
+    * identified by startKeys[].  (Remaining insertion scankey fields are
+    * initialized after initial-positioning strategy is finalized.)
     */
    Assert(keysCount <= INDEX_MAX_KEYS);
    for (i = 0; i < keysCount; i++)
@@ -849,7 +944,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                _bt_parallel_done(scan);
                return false;
            }
-           memcpy(scankeys + i, subkey, sizeof(ScanKeyData));
+           memcpy(inskey.scankeys + i, subkey, sizeof(ScanKeyData));
 
            /*
             * If the row comparison is the last positioning key we accepted,
@@ -881,7 +976,8 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                    if (subkey->sk_flags & SK_ISNULL)
                        break;  /* can't use null keys */
                    Assert(keysCount < INDEX_MAX_KEYS);
-                   memcpy(scankeys + keysCount, subkey, sizeof(ScanKeyData));
+                   memcpy(inskey.scankeys + keysCount, subkey,
+                          sizeof(ScanKeyData));
                    keysCount++;
                    if (subkey->sk_flags & SK_ROW_END)
                    {
@@ -927,7 +1023,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                FmgrInfo   *procinfo;
 
                procinfo = index_getprocinfo(rel, cur->sk_attno, BTORDER_PROC);
-               ScanKeyEntryInitializeWithInfo(scankeys + i,
+               ScanKeyEntryInitializeWithInfo(inskey.scankeys + i,
                                               cur->sk_flags,
                                               cur->sk_attno,
                                               InvalidStrategy,
@@ -948,7 +1044,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
                    elog(ERROR, "missing support function %d(%u,%u) for attribute %d of index \"%s\"",
                         BTORDER_PROC, rel->rd_opcintype[i], cur->sk_subtype,
                         cur->sk_attno, RelationGetRelationName(rel));
-               ScanKeyEntryInitialize(scankeys + i,
+               ScanKeyEntryInitialize(inskey.scankeys + i,
                                       cur->sk_flags,
                                       cur->sk_attno,
                                       InvalidStrategy,
@@ -1051,12 +1147,15 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
            return false;
    }
 
+   /* Initialize remaining insertion scan key fields */
+   inskey.nextkey = nextkey;
+   inskey.keysz = keysCount;
+
    /*
     * Use the manufactured insertion scan key to descend the tree and
     * position ourselves on the target leaf page.
     */
-   stack = _bt_search(rel, keysCount, scankeys, nextkey, &buf, BT_READ,
-                      scan->xs_snapshot);
+   stack = _bt_search(rel, &inskey, &buf, BT_READ, scan->xs_snapshot);
 
    /* don't need to keep the stack around... */
    _bt_freestack(stack);
@@ -1085,7 +1184,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    _bt_initialize_more_data(so, dir);
 
    /* position to the precise item on the page */
-   offnum = _bt_binsrch(rel, buf, keysCount, scankeys, nextkey);
+   offnum = _bt_binsrch(rel, &inskey, buf);
 
    /*
     * If nextkey = false, we are positioned at the first item >= scan key, or
index 363dceb5b1c21be023e59cc4accfa055c8d61a5e..a0e2e70cefc92f2bd6eceb99f34185e602a189e6 100644 (file)
@@ -263,6 +263,7 @@ typedef struct BTWriteState
 {
    Relation    heap;
    Relation    index;
+   BTScanInsert inskey;        /* generic insertion scankey */
    bool        btws_use_wal;   /* dump pages to WAL? */
    BlockNumber btws_pages_alloced; /* # pages allocated */
    BlockNumber btws_pages_written; /* # pages written out */
@@ -540,6 +541,7 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 
    wstate.heap = btspool->heap;
    wstate.index = btspool->index;
+   wstate.inskey = _bt_mkscankey(wstate.index, NULL);
 
    /*
     * We need to log index creation in WAL iff WAL archiving/ is
@@ -1085,7 +1087,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
    TupleDesc   tupdes = RelationGetDescr(wstate->index);
    int         i,
                keysz = IndexRelationGetNumberOfKeyAttributes(wstate->index);
-   ScanKey     indexScanKey = NULL;
    SortSupport sortKeys;
 
    if (merge)
@@ -1098,7 +1099,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
        /* the preparation of merge */
        itup = tuplesort_getindextuple(btspool->sortstate, true);
        itup2 = tuplesort_getindextuple(btspool2->sortstate, true);
-       indexScanKey = _bt_mkscankey_nodata(wstate->index);
 
        /* Prepare SortSupport data for each column */
        sortKeys = (SortSupport) palloc0(keysz * sizeof(SortSupportData));
@@ -1106,7 +1106,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
        for (i = 0; i < keysz; i++)
        {
            SortSupport sortKey = sortKeys + i;
-           ScanKey     scanKey = indexScanKey + i;
+           ScanKey     scanKey = wstate->inskey->scankeys + i;
            int16       strategy;
 
            sortKey->ssup_cxt = CurrentMemoryContext;
@@ -1125,8 +1125,6 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
            PrepareSortSupportFromIndexRel(wstate->index, strategy, sortKey);
        }
 
-       _bt_freeskey(indexScanKey);
-
        for (;;)
        {
            load1 = true;       /* load BTSpool next ? */
index 2c05fb5e45121e3952dee85e19aeb38944076825..0250e089a654d2f6edbabc1dd77bb3b3afbf6df2 100644 (file)
@@ -56,34 +56,37 @@ static bool _bt_check_rowcompare(ScanKey skey,
  *     Build an insertion scan key that contains comparison data from itup
  *     as well as comparator routines appropriate to the key datatypes.
  *
- *     The result is intended for use with _bt_compare().
+ *     Result is intended for use with _bt_compare().  Callers that don't
+ *     need to fill out the insertion scankey arguments (e.g. they use an
+ *     ad-hoc comparison routine) can pass a NULL index tuple.
  */
-ScanKey
+BTScanInsert
 _bt_mkscankey(Relation rel, IndexTuple itup)
 {
+   BTScanInsert key;
    ScanKey     skey;
    TupleDesc   itupdesc;
-   int         indnatts PG_USED_FOR_ASSERTS_ONLY;
    int         indnkeyatts;
    int16      *indoption;
+   int         tupnatts;
    int         i;
 
    itupdesc = RelationGetDescr(rel);
-   indnatts = IndexRelationGetNumberOfAttributes(rel);
    indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
    indoption = rel->rd_indoption;
+   tupnatts = itup ? BTreeTupleGetNAtts(itup, rel) : 0;
 
-   Assert(indnkeyatts > 0);
-   Assert(indnkeyatts <= indnatts);
-   Assert(BTreeTupleGetNAtts(itup, rel) == indnatts ||
-          BTreeTupleGetNAtts(itup, rel) == indnkeyatts);
+   Assert(tupnatts <= IndexRelationGetNumberOfAttributes(rel));
 
    /*
     * We'll execute search using scan key constructed on key columns. Non-key
     * (INCLUDE index) columns are always omitted from scan keys.
     */
-   skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
-
+   key = palloc(offsetof(BTScanInsertData, scankeys) +
+                sizeof(ScanKeyData) * indnkeyatts);
+   key->nextkey = false;
+   key->keysz = Min(indnkeyatts, tupnatts);
+   skey = key->scankeys;
    for (i = 0; i < indnkeyatts; i++)
    {
        FmgrInfo   *procinfo;
@@ -96,56 +99,20 @@ _bt_mkscankey(Relation rel, IndexTuple itup)
         * comparison can be needed.
         */
        procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
-       arg = index_getattr(itup, i + 1, itupdesc, &null);
-       flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
-       ScanKeyEntryInitializeWithInfo(&skey[i],
-                                      flags,
-                                      (AttrNumber) (i + 1),
-                                      InvalidStrategy,
-                                      InvalidOid,
-                                      rel->rd_indcollation[i],
-                                      procinfo,
-                                      arg);
-   }
-
-   return skey;
-}
-
-/*
- * _bt_mkscankey_nodata
- *     Build an insertion scan key that contains 3-way comparator routines
- *     appropriate to the key datatypes, but no comparison data.  The
- *     comparison data ultimately used must match the key datatypes.
- *
- *     The result cannot be used with _bt_compare(), unless comparison
- *     data is first stored into the key entries.  Currently this
- *     routine is only called by nbtsort.c and tuplesort.c, which have
- *     their own comparison routines.
- */
-ScanKey
-_bt_mkscankey_nodata(Relation rel)
-{
-   ScanKey     skey;
-   int         indnkeyatts;
-   int16      *indoption;
-   int         i;
-
-   indnkeyatts = IndexRelationGetNumberOfKeyAttributes(rel);
-   indoption = rel->rd_indoption;
-
-   skey = (ScanKey) palloc(indnkeyatts * sizeof(ScanKeyData));
-
-   for (i = 0; i < indnkeyatts; i++)
-   {
-       FmgrInfo   *procinfo;
-       int         flags;
 
        /*
-        * We can use the cached (default) support procs since no cross-type
-        * comparison can be needed.
+        * Key arguments built when caller provides no tuple are
+        * defensively represented as NULL values.  They should never be
+        * used.
         */
-       procinfo = index_getprocinfo(rel, i + 1, BTORDER_PROC);
-       flags = SK_ISNULL | (indoption[i] << SK_BT_INDOPTION_SHIFT);
+       if (i < tupnatts)
+           arg = index_getattr(itup, i + 1, itupdesc, &null);
+       else
+       {
+           arg = (Datum) 0;
+           null = true;
+       }
+       flags = (null ? SK_ISNULL : 0) | (indoption[i] << SK_BT_INDOPTION_SHIFT);
        ScanKeyEntryInitializeWithInfo(&skey[i],
                                       flags,
                                       (AttrNumber) (i + 1),
@@ -153,19 +120,10 @@ _bt_mkscankey_nodata(Relation rel)
                                       InvalidOid,
                                       rel->rd_indcollation[i],
                                       procinfo,
-                                      (Datum) 0);
+                                      arg);
    }
 
-   return skey;
-}
-
-/*
- * free a scan key made by either _bt_mkscankey or _bt_mkscankey_nodata.
- */
-void
-_bt_freeskey(ScanKey skey)
-{
-   pfree(skey);
+   return key;
 }
 
 /*
index 2946b47b4651117d0a9e332569cefb2c79be1893..16bda5c586a6cd3214881e4fb9c92da5f02bfc62 100644 (file)
@@ -884,7 +884,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 {
    Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
                                                   randomAccess);
-   ScanKey     indexScanKey;
+   BTScanInsert indexScanKey;
    MemoryContext oldcontext;
    int         i;
 
@@ -919,7 +919,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 
    state->tupDesc = tupDesc;   /* assume we need not copy tupDesc */
 
-   indexScanKey = _bt_mkscankey_nodata(indexRel);
+   indexScanKey = _bt_mkscankey(indexRel, NULL);
 
    if (state->indexInfo->ii_Expressions != NULL)
    {
@@ -945,7 +945,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
    for (i = 0; i < state->nKeys; i++)
    {
        SortSupport sortKey = state->sortKeys + i;
-       ScanKey     scanKey = indexScanKey + i;
+       ScanKey     scanKey = indexScanKey->scankeys + i;
        int16       strategy;
 
        sortKey->ssup_cxt = CurrentMemoryContext;
@@ -964,7 +964,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
        PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
    }
 
-   _bt_freeskey(indexScanKey);
+   pfree(indexScanKey);
 
    MemoryContextSwitchTo(oldcontext);
 
@@ -981,7 +981,7 @@ tuplesort_begin_index_btree(Relation heapRel,
 {
    Tuplesortstate *state = tuplesort_begin_common(workMem, coordinate,
                                                   randomAccess);
-   ScanKey     indexScanKey;
+   BTScanInsert indexScanKey;
    MemoryContext oldcontext;
    int         i;
 
@@ -1014,7 +1014,7 @@ tuplesort_begin_index_btree(Relation heapRel,
    state->indexRel = indexRel;
    state->enforceUnique = enforceUnique;
 
-   indexScanKey = _bt_mkscankey_nodata(indexRel);
+   indexScanKey = _bt_mkscankey(indexRel, NULL);
 
    /* Prepare SortSupport data for each column */
    state->sortKeys = (SortSupport) palloc0(state->nKeys *
@@ -1023,7 +1023,7 @@ tuplesort_begin_index_btree(Relation heapRel,
    for (i = 0; i < state->nKeys; i++)
    {
        SortSupport sortKey = state->sortKeys + i;
-       ScanKey     scanKey = indexScanKey + i;
+       ScanKey     scanKey = indexScanKey->scankeys + i;
        int16       strategy;
 
        sortKey->ssup_cxt = CurrentMemoryContext;
@@ -1042,7 +1042,7 @@ tuplesort_begin_index_btree(Relation heapRel,
        PrepareSortSupportFromIndexRel(indexRel, strategy, sortKey);
    }
 
-   _bt_freeskey(indexScanKey);
+   pfree(indexScanKey);
 
    MemoryContextSwitchTo(oldcontext);
 
index 60622ea7906a0c23397de99a4d9281da408a6e52..8b3c9dea25650ee9eadfd1e8bdbab6dfd8eee2a0 100644 (file)
@@ -319,6 +319,64 @@ typedef struct BTStackData
 
 typedef BTStackData *BTStack;
 
+/*
+ * BTScanInsert is the btree-private state needed to find an initial position
+ * for an indexscan, or to insert new tuples -- an "insertion scankey" (not to
+ * be confused with a search scankey).  It's used to descend a B-Tree using
+ * _bt_search.
+ *
+ * When nextkey is false (the usual case), _bt_search and _bt_binsrch will
+ * locate the first item >= scankey.  When nextkey is true, they will locate
+ * the first item > scan key.
+ *
+ * scankeys is an array of scan key entries for attributes that are compared.
+ * keysz is the size of the array.  During insertion, there must be a scan key
+ * for every attribute, but when starting a regular index scan some can be
+ * omitted.  The array is used as a flexible array member, though it's sized
+ * in a way that makes it possible to use stack allocations.  See
+ * nbtree/README for full details.
+ */
+typedef struct BTScanInsertData
+{
+   bool        nextkey;
+   int         keysz;          /* Size of scankeys array */
+   ScanKeyData scankeys[INDEX_MAX_KEYS];   /* Must appear last */
+} BTScanInsertData;
+
+typedef BTScanInsertData *BTScanInsert;
+
+/*
+ * BTInsertStateData is a working area used during insertion.
+ *
+ * This is filled in after descending the tree to the first leaf page the new
+ * tuple might belong on.  Tracks the current position while performing
+ * uniqueness check, before we have determined which exact page to insert
+ * to.
+ *
+ * (This should be private to nbtinsert.c, but it's also used by
+ * _bt_binsrch_insert)
+ */
+typedef struct BTInsertStateData
+{
+   IndexTuple  itup;           /* Item we're inserting */
+   Size        itemsz;         /* Size of itup -- should be MAXALIGN()'d */
+   BTScanInsert itup_key;      /* Insertion scankey */
+
+   /* Buffer containing leaf page we're likely to insert itup on */
+   Buffer      buf;
+
+   /*
+    * Cache of bounds within the current buffer.  Only used for insertions
+    * where _bt_check_unique is called.  See _bt_binsrch_insert and
+    * _bt_findinsertloc for details.
+    */
+   bool        bounds_valid;
+   OffsetNumber low;
+   OffsetNumber stricthigh;
+} BTInsertStateData;
+
+typedef BTInsertStateData *BTInsertState;
+
 /*
  * BTScanOpaqueData is the btree-private state needed for an indexscan.
  * This consists of preprocessed scan keys (see _bt_preprocess_keys() for
@@ -558,16 +616,12 @@ extern int    _bt_pagedel(Relation rel, Buffer buf);
 /*
  * s for functions in nbtsearch.c
  */
-extern BTStack _bt_search(Relation rel,
-          int keysz, ScanKey scankey, bool nextkey,
-          Buffer *bufP, int access, Snapshot snapshot);
-extern Buffer _bt_moveright(Relation rel, Buffer buf, int keysz,
-             ScanKey scankey, bool nextkey, bool forupdate, BTStack stack,
-             int access, Snapshot snapshot);
-extern OffsetNumber _bt_binsrch(Relation rel, Buffer buf, int keysz,
-           ScanKey scankey, bool nextkey);
-extern int32 _bt_compare(Relation rel, int keysz, ScanKey scankey,
-           Page page, OffsetNumber offnum);
+extern BTStack _bt_search(Relation rel, BTScanInsert key, Buffer *bufP,
+          int access, Snapshot snapshot);
+extern Buffer _bt_moveright(Relation rel, BTScanInsert key, Buffer buf,
+             bool forupdate, BTStack stack, int access, Snapshot snapshot);
+extern OffsetNumber _bt_binsrch_insert(Relation rel, BTInsertState insertstate);
+extern int32 _bt_compare(Relation rel, BTScanInsert key, Page page, OffsetNumber offnum);
 extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
 extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
 extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
@@ -576,9 +630,7 @@ extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
 /*
  * s for functions in nbtutils.c
  */
-extern ScanKey _bt_mkscankey(Relation rel, IndexTuple itup);
-extern ScanKey _bt_mkscankey_nodata(Relation rel);
-extern void _bt_freeskey(ScanKey skey);
+extern BTScanInsert _bt_mkscankey(Relation rel, IndexTuple itup);
 extern void _bt_freestack(BTStack stack);
 extern void _bt_preprocess_array_keys(IndexScanDesc scan);
 extern void _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir);