nbtree: Allocate new pages in separate function.
authorPeter Geoghegan <[email protected]>
Sat, 10 Jun 2023 21:08:25 +0000 (14:08 -0700)
committerPeter Geoghegan <[email protected]>
Sat, 10 Jun 2023 21:08:25 +0000 (14:08 -0700)
Split nbtree's _bt_getbuf function is two: code that read locks or write
locks existing pages remains in _bt_getbuf, while code that deals with
allocating new pages is moved to a new, dedicated function called
_bt_allocbuf.  This simplifies most _bt_getbuf callers, since it is no
longer necessary for them to pass a heaprel argument.  Many of the
changes to nbtree from commit 61b313e4 can be reverted.  This minimizes
the divergence between HEAD/PostgreSQL 16 and earlier release branches.

_bt_allocbuf replaces the previous nbtree idiom of passing P_NEW to
_bt_getbuf.  There are only 3 affected call sites, all of which continue
to pass a heaprel for recovery conflict purposes.  Note that nbtree's
use of P_NEW was superficial; nbtree never actually relied on the P_NEW
code paths in bufmgr.c, so this change is strictly mechanical.

GiST already took the same approach; it has a dedicated function for
allocating new pages called gistNewBuffer().  That factor allowed commit
61b313e4 to make much more targeted changes to GiST.

Author: Peter Geoghegan <[email protected]>
Reviewed-By: Heikki Linnakangas <[email protected]>
Discussion: https://postgr.es/m/CAH2-Wz=8Z9qY58bjm_7TAHgtW6RzZ5Ke62q5emdCEy9BAzwhmg@mail.gmail.com

12 files changed:
contrib/amcheck/verify_nbtree.c
src/backend/access/heap/heapam_handler.c
src/backend/access/nbtree/nbtinsert.c
src/backend/access/nbtree/nbtpage.c
src/backend/access/nbtree/nbtree.c
src/backend/access/nbtree/nbtsearch.c
src/backend/access/nbtree/nbtsort.c
src/backend/access/nbtree/nbtutils.c
src/backend/optimizer/util/plancat.c
src/backend/utils/sort/tuplesortvariants.c
src/include/access/nbtree.h
src/include/utils/tuplesort.h

index 6979aff727be0dfd604183ec43f62545a6b4db90..94a9759322e43aa175b7f761ae1e8d352bbc2598 100644 (file)
@@ -183,7 +183,6 @@ static inline bool invariant_l_nontarget_offset(BtreeCheckState *state,
                                                OffsetNumber upperbound);
 static Page palloc_btree_page(BtreeCheckState *state, BlockNumber blocknum);
 static inline BTScanInsert bt_mkscankey_pivotsearch(Relation rel,
-                                                   Relation heaprel,
                                                    IndexTuple itup);
 static ItemId PageGetItemIdCareful(BtreeCheckState *state, BlockNumber block,
                                   Page page, OffsetNumber offset);
@@ -332,7 +331,7 @@ bt_index_check_internal(Oid indrelid, bool parentcheck, bool heapallindexed,
                            RelationGetRelationName(indrel))));
 
        /* Extract metadata from metapage, and sanitize it in passing */
-       _bt_metaversion(indrel, heaprel, &heapkeyspace, &allequalimage);
+       _bt_metaversion(indrel, &heapkeyspace, &allequalimage);
        if (allequalimage && !heapkeyspace)
            ereport(ERROR,
                    (errcode(ERRCODE_INDEX_CORRUPTED),
@@ -1259,7 +1258,7 @@ bt_target_page_check(BtreeCheckState *state)
        }
 
        /* Build insertion scankey for current page offset */
-       skey = bt_mkscankey_pivotsearch(state->rel, state->heaprel, itup);
+       skey = bt_mkscankey_pivotsearch(state->rel, itup);
 
        /*
         * Make sure tuple size does not exceed the relevant BTREE_VERSION
@@ -1769,7 +1768,7 @@ bt_right_page_check_scankey(BtreeCheckState *state)
     * memory remaining allocated.
     */
    firstitup = (IndexTuple) PageGetItem(rightpage, rightitem);
-   return bt_mkscankey_pivotsearch(state->rel, state->heaprel, firstitup);
+   return bt_mkscankey_pivotsearch(state->rel, firstitup);
 }
 
 /*
@@ -2682,7 +2681,7 @@ bt_rootdescend(BtreeCheckState *state, IndexTuple itup)
    Buffer      lbuf;
    bool        exists;
 
-   key = _bt_mkscankey(state->rel, state->heaprel, itup);
+   key = _bt_mkscankey(state->rel, itup);
    Assert(key->heapkeyspace && key->scantid != NULL);
 
    /*
@@ -2695,7 +2694,7 @@ bt_rootdescend(BtreeCheckState *state, IndexTuple itup)
     */
    Assert(state->readonly && state->rootdescend);
    exists = false;
-   stack = _bt_search(state->rel, state->heaprel, key, &lbuf, BT_READ, NULL);
+   stack = _bt_search(state->rel, NULL, key, &lbuf, BT_READ, NULL);
 
    if (BufferIsValid(lbuf))
    {
@@ -3134,11 +3133,11 @@ palloc_btree_page(BtreeCheckState *state, BlockNumber blocknum)
  * the scankey is greater.
  */
 static inline BTScanInsert
-bt_mkscankey_pivotsearch(Relation rel, Relation heaprel, IndexTuple itup)
+bt_mkscankey_pivotsearch(Relation rel, IndexTuple itup)
 {
    BTScanInsert skey;
 
-   skey = _bt_mkscankey(rel, heaprel, itup);
+   skey = _bt_mkscankey(rel, itup);
    skey->pivotsearch = true;
 
    return skey;
index 646135cc21c5a22603b654f087b89e3b052c4203..0755be8390112287c77e49ce7075bf9d94672c76 100644 (file)
@@ -731,14 +731,9 @@ heapam_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
                                 *multi_cutoff);
 
 
-   /*
-    * Set up sorting if wanted. NewHeap is being passed to
-    * tuplesort_begin_cluster(), it could have been OldHeap too. It does not
-    * really matter, as the goal is to have a heap relation being passed to
-    * _bt_log_reuse_page() (which should not be called from this code path).
-    */
+   /* Set up sorting if wanted */
    if (use_sort)
-       tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex, NewHeap,
+       tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex,
                                            maintenance_work_mem,
                                            NULL, TUPLESORT_NONE);
    else
index 84bbd78d59ca13435d0c691642aa583a40e1eac6..d33f814a938acb282f9cd7c8841ad511a75469da 100644 (file)
@@ -59,7 +59,7 @@ static Buffer _bt_split(Relation rel, Relation heaprel, BTScanInsert itup_key,
                        IndexTuple nposting, uint16 postingoff);
 static void _bt_insert_parent(Relation rel, Relation heaprel, Buffer buf,
                              Buffer rbuf, BTStack stack, bool isroot, bool isonly);
-static Buffer _bt_newroot(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf);
+static Buffer _bt_newlevel(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf);
 static inline bool _bt_pgaddtup(Page page, Size itemsize, IndexTuple itup,
                                OffsetNumber itup_off, bool newfirstdataitem);
 static void _bt_delete_or_dedup_one_page(Relation rel, Relation heapRel,
@@ -110,7 +110,7 @@ _bt_doinsert(Relation rel, IndexTuple itup,
    bool        checkingunique = (checkUnique != UNIQUE_CHECK_NO);
 
    /* we need an insertion scan key to do our search, so build one */
-   itup_key = _bt_mkscankey(rel, heapRel, itup);
+   itup_key = _bt_mkscankey(rel, itup);
 
    if (checkingunique)
    {
@@ -1024,13 +1024,15 @@ _bt_findinsertloc(Relation rel,
  * indexes.
  */
 static void
-_bt_stepright(Relation rel, Relation heaprel, BTInsertState insertstate, BTStack stack)
+_bt_stepright(Relation rel, Relation heaprel, BTInsertState insertstate,
+             BTStack stack)
 {
    Page        page;
    BTPageOpaque opaque;
    Buffer      rbuf;
    BlockNumber rblkno;
 
+   Assert(heaprel != NULL);
    page = BufferGetPage(insertstate->buf);
    opaque = BTPageGetOpaque(page);
 
@@ -1145,7 +1147,7 @@ _bt_insertonpg(Relation rel,
 
    /*
     * Every internal page should have exactly one negative infinity item at
-    * all times.  Only _bt_split() and _bt_newroot() should add items that
+    * all times.  Only _bt_split() and _bt_newlevel() should add items that
     * become negative infinity items through truncation, since they're the
     * only routines that allocate new internal pages.
     */
@@ -1250,14 +1252,14 @@ _bt_insertonpg(Relation rel,
         * only one on its tree level, but was not the root, it may have been
         * the "fast root".  We need to ensure that the fast root link points
         * at or above the current page.  We can safely acquire a lock on the
-        * metapage here --- see comments for _bt_newroot().
+        * metapage here --- see comments for _bt_newlevel().
         */
        if (unlikely(split_only_page))
        {
            Assert(!isleaf);
            Assert(BufferIsValid(cbuf));
 
-           metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_WRITE);
+           metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
            metapg = BufferGetPage(metabuf);
            metad = BTPageGetMeta(metapg);
 
@@ -1421,7 +1423,7 @@ _bt_insertonpg(Relation rel,
         * call _bt_getrootheight while holding a buffer lock.
         */
        if (BlockNumberIsValid(blockcache) &&
-           _bt_getrootheight(rel, heaprel) >= BTREE_FASTPATH_MIN_LEVEL)
+           _bt_getrootheight(rel) >= BTREE_FASTPATH_MIN_LEVEL)
            RelationSetTargetBlock(rel, blockcache);
    }
 
@@ -1715,7 +1717,7 @@ _bt_split(Relation rel, Relation heaprel, BTScanInsert itup_key, Buffer buf,
     * way because it avoids an unnecessary PANIC when either origpage or its
     * existing sibling page are corrupt.
     */
-   rbuf = _bt_getbuf(rel, heaprel, P_NEW, BT_WRITE);
+   rbuf = _bt_allocbuf(rel, heaprel);
    rightpage = BufferGetPage(rbuf);
    rightpagenumber = BufferGetBlockNumber(rbuf);
    /* rightpage was initialized by _bt_getbuf */
@@ -1888,7 +1890,7 @@ _bt_split(Relation rel, Relation heaprel, BTScanInsert itup_key, Buffer buf,
     */
    if (!isrightmost)
    {
-       sbuf = _bt_getbuf(rel, heaprel, oopaque->btpo_next, BT_WRITE);
+       sbuf = _bt_getbuf(rel, oopaque->btpo_next, BT_WRITE);
        spage = BufferGetPage(sbuf);
        sopaque = BTPageGetOpaque(spage);
        if (sopaque->btpo_prev != origpagenumber)
@@ -2102,6 +2104,8 @@ _bt_insert_parent(Relation rel,
                  bool isroot,
                  bool isonly)
 {
+   Assert(heaprel != NULL);
+
    /*
     * Here we have to do something Lehman and Yao don't talk about: deal with
     * a root split and construction of a new root.  If our stack is empty
@@ -2121,8 +2125,8 @@ _bt_insert_parent(Relation rel,
 
        Assert(stack == NULL);
        Assert(isonly);
-       /* create a new root node and update the metapage */
-       rootbuf = _bt_newroot(rel, heaprel, buf, rbuf);
+       /* create a new root node one level up and update the metapage */
+       rootbuf = _bt_newlevel(rel, heaprel, buf, rbuf);
        /* release the split buffers */
        _bt_relbuf(rel, rootbuf);
        _bt_relbuf(rel, rbuf);
@@ -2161,8 +2165,7 @@ _bt_insert_parent(Relation rel,
                     BlockNumberIsValid(RelationGetTargetBlock(rel))));
 
            /* Find the leftmost page at the next level up */
-           pbuf = _bt_get_endpoint(rel, heaprel, opaque->btpo_level + 1, false,
-                                   NULL);
+           pbuf = _bt_get_endpoint(rel, opaque->btpo_level + 1, false, NULL);
            /* Set up a phony stack entry pointing there */
            stack = &fakestack;
            stack->bts_blkno = BufferGetBlockNumber(pbuf);
@@ -2230,6 +2233,9 @@ _bt_insert_parent(Relation rel,
  *
  * On entry, 'lbuf' must be locked in write-mode.  On exit, it is unlocked
  * and unpinned.
+ *
+ * Caller must provide a valid heaprel, since finishing a page split requires
+ * allocating a new page if and when the parent page splits in turn.
  */
 void
 _bt_finish_split(Relation rel, Relation heaprel, Buffer lbuf, BTStack stack)
@@ -2243,9 +2249,10 @@ _bt_finish_split(Relation rel, Relation heaprel, Buffer lbuf, BTStack stack)
    bool        wasonly;
 
    Assert(P_INCOMPLETE_SPLIT(lpageop));
+   Assert(heaprel != NULL);
 
    /* Lock right sibling, the one missing the downlink */
-   rbuf = _bt_getbuf(rel, heaprel, lpageop->btpo_next, BT_WRITE);
+   rbuf = _bt_getbuf(rel, lpageop->btpo_next, BT_WRITE);
    rpage = BufferGetPage(rbuf);
    rpageop = BTPageGetOpaque(rpage);
 
@@ -2257,7 +2264,7 @@ _bt_finish_split(Relation rel, Relation heaprel, Buffer lbuf, BTStack stack)
        BTMetaPageData *metad;
 
        /* acquire lock on the metapage */
-       metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_WRITE);
+       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
        metapg = BufferGetPage(metabuf);
        metad = BTPageGetMeta(metapg);
 
@@ -2323,10 +2330,11 @@ _bt_getstackbuf(Relation rel, Relation heaprel, BTStack stack, BlockNumber child
        Page        page;
        BTPageOpaque opaque;
 
-       buf = _bt_getbuf(rel, heaprel, blkno, BT_WRITE);
+       buf = _bt_getbuf(rel, blkno, BT_WRITE);
        page = BufferGetPage(buf);
        opaque = BTPageGetOpaque(page);
 
+       Assert(heaprel != NULL);
        if (P_INCOMPLETE_SPLIT(opaque))
        {
            _bt_finish_split(rel, heaprel, buf, stack->bts_parent);
@@ -2415,7 +2423,7 @@ _bt_getstackbuf(Relation rel, Relation heaprel, BTStack stack, BlockNumber child
 }
 
 /*
- * _bt_newroot() -- Create a new root page for the index.
+ * _bt_newlevel() -- Create a new level above root page.
  *
  *     We've just split the old root page and need to create a new one.
  *     In order to do this, we add a new root page to the file, then lock
@@ -2433,7 +2441,7 @@ _bt_getstackbuf(Relation rel, Relation heaprel, BTStack stack, BlockNumber child
  *     lbuf, rbuf & rootbuf.
  */
 static Buffer
-_bt_newroot(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf)
+_bt_newlevel(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf)
 {
    Buffer      rootbuf;
    Page        lpage,
@@ -2459,12 +2467,12 @@ _bt_newroot(Relation rel, Relation heaprel, Buffer lbuf, Buffer rbuf)
    lopaque = BTPageGetOpaque(lpage);
 
    /* get a new root page */
-   rootbuf = _bt_getbuf(rel, heaprel, P_NEW, BT_WRITE);
+   rootbuf = _bt_allocbuf(rel, heaprel);
    rootpage = BufferGetPage(rootbuf);
    rootblknum = BufferGetBlockNumber(rootbuf);
 
    /* acquire lock on the metapage */
-   metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_WRITE);
+   metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
    metapg = BufferGetPage(metabuf);
    metad = BTPageGetMeta(metapg);
 
index 486d1563d8a479cc106c99f3ec898c58f33ce81c..c2050656e43a87be583594d9a3fda4a1a20b1933 100644 (file)
 #include "utils/snapmgr.h"
 
 static BTMetaPageData *_bt_getmeta(Relation rel, Buffer metabuf);
-static void _bt_log_reuse_page(Relation rel, Relation heaprel, BlockNumber blkno,
-                              FullTransactionId safexid);
-static void _bt_delitems_delete(Relation rel, Relation heaprel, Buffer buf,
+static void _bt_delitems_delete(Relation rel, Buffer buf,
                                TransactionId snapshotConflictHorizon,
+                               bool isCatalogRel,
                                OffsetNumber *deletable, int ndeletable,
                                BTVacuumPosting *updatable, int nupdatable);
 static char *_bt_delitems_update(BTVacuumPosting *updatable, int nupdatable,
@@ -177,7 +176,7 @@ _bt_getmeta(Relation rel, Buffer metabuf)
  * index tuples needed to be deleted.
  */
 bool
-_bt_vacuum_needs_cleanup(Relation rel, Relation heaprel)
+_bt_vacuum_needs_cleanup(Relation rel)
 {
    Buffer      metabuf;
    Page        metapg;
@@ -190,7 +189,7 @@ _bt_vacuum_needs_cleanup(Relation rel, Relation heaprel)
     *
     * Note that we deliberately avoid using cached version of metapage here.
     */
-   metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+   metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
    metapg = BufferGetPage(metabuf);
    metad = BTPageGetMeta(metapg);
    btm_version = metad->btm_version;
@@ -230,7 +229,7 @@ _bt_vacuum_needs_cleanup(Relation rel, Relation heaprel)
  * finalized.
  */
 void
-_bt_set_cleanup_info(Relation rel, Relation heaprel, BlockNumber num_delpages)
+_bt_set_cleanup_info(Relation rel, BlockNumber num_delpages)
 {
    Buffer      metabuf;
    Page        metapg;
@@ -254,7 +253,7 @@ _bt_set_cleanup_info(Relation rel, Relation heaprel, BlockNumber num_delpages)
     * no longer used as of PostgreSQL 14.  We set it to -1.0 on rewrite, just
     * to be consistent.
     */
-   metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+   metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
    metapg = BufferGetPage(metabuf);
    metad = BTPageGetMeta(metapg);
 
@@ -326,6 +325,9 @@ _bt_set_cleanup_info(Relation rel, Relation heaprel, BlockNumber num_delpages)
  *     NOTE that the returned root page will have only a read lock set
  *     on it even if access = BT_WRITE!
  *
+ *     If access = BT_WRITE, heaprel must be set; otherwise caller can just
+ *     pass NULL.  See _bt_allocbuf for an explanation.
+ *
  *     The returned page is not necessarily the true root --- it could be
  *     a "fast root" (a page that is alone in its level due to deletions).
  *     Also, if the root page is split while we are "in flight" to it,
@@ -349,6 +351,8 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
    uint32      rootlevel;
    BTMetaPageData *metad;
 
+   Assert(access == BT_READ || heaprel != NULL);
+
    /*
     * Try to use previously-cached metapage data to find the root.  This
     * normally saves one buffer access per index search, which is a very
@@ -369,7 +373,7 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
        Assert(rootblkno != P_NONE);
        rootlevel = metad->btm_fastlevel;
 
-       rootbuf = _bt_getbuf(rel, heaprel, rootblkno, BT_READ);
+       rootbuf = _bt_getbuf(rel, rootblkno, BT_READ);
        rootpage = BufferGetPage(rootbuf);
        rootopaque = BTPageGetOpaque(rootpage);
 
@@ -395,7 +399,7 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
        rel->rd_amcache = NULL;
    }
 
-   metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+   metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
    metad = _bt_getmeta(rel, metabuf);
 
    /* if no root page initialized yet, do it */
@@ -436,7 +440,7 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
         * the new root page.  Since this is the first page in the tree, it's
         * a leaf as well as the root.
         */
-       rootbuf = _bt_getbuf(rel, heaprel, P_NEW, BT_WRITE);
+       rootbuf = _bt_allocbuf(rel, heaprel);
        rootblkno = BufferGetBlockNumber(rootbuf);
        rootpage = BufferGetPage(rootbuf);
        rootopaque = BTPageGetOpaque(rootpage);
@@ -573,7 +577,7 @@ _bt_getroot(Relation rel, Relation heaprel, int access)
  * moving to the root --- that'd deadlock against any concurrent root split.)
  */
 Buffer
-_bt_gettrueroot(Relation rel, Relation heaprel)
+_bt_gettrueroot(Relation rel)
 {
    Buffer      metabuf;
    Page        metapg;
@@ -595,7 +599,7 @@ _bt_gettrueroot(Relation rel, Relation heaprel)
        pfree(rel->rd_amcache);
    rel->rd_amcache = NULL;
 
-   metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+   metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
    metapg = BufferGetPage(metabuf);
    metaopaque = BTPageGetOpaque(metapg);
    metad = BTPageGetMeta(metapg);
@@ -668,7 +672,7 @@ _bt_gettrueroot(Relation rel, Relation heaprel)
  *     about updating previously cached data.
  */
 int
-_bt_getrootheight(Relation rel, Relation heaprel)
+_bt_getrootheight(Relation rel)
 {
    BTMetaPageData *metad;
 
@@ -676,7 +680,7 @@ _bt_getrootheight(Relation rel, Relation heaprel)
    {
        Buffer      metabuf;
 
-       metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
        metad = _bt_getmeta(rel, metabuf);
 
        /*
@@ -732,7 +736,7 @@ _bt_getrootheight(Relation rel, Relation heaprel)
  *     pg_upgrade'd from Postgres 12.
  */
 void
-_bt_metaversion(Relation rel, Relation heaprel, bool *heapkeyspace, bool *allequalimage)
+_bt_metaversion(Relation rel, bool *heapkeyspace, bool *allequalimage)
 {
    BTMetaPageData *metad;
 
@@ -740,7 +744,7 @@ _bt_metaversion(Relation rel, Relation heaprel, bool *heapkeyspace, bool *allequ
    {
        Buffer      metabuf;
 
-       metabuf = _bt_getbuf(rel, heaprel, BTREE_METAPAGE, BT_READ);
+       metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_READ);
        metad = _bt_getmeta(rel, metabuf);
 
        /*
@@ -821,37 +825,7 @@ _bt_checkpage(Relation rel, Buffer buf)
 }
 
 /*
- * Log the reuse of a page from the FSM.
- */
-static void
-_bt_log_reuse_page(Relation rel, Relation heaprel, BlockNumber blkno,
-                  FullTransactionId safexid)
-{
-   xl_btree_reuse_page xlrec_reuse;
-
-   /*
-    * Note that we don't register the buffer with the record, because this
-    * operation doesn't modify the page. This record only exists to provide a
-    * conflict point for Hot Standby.
-    */
-
-   /* XLOG stuff */
-   xlrec_reuse.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
-   xlrec_reuse.locator = rel->rd_locator;
-   xlrec_reuse.block = blkno;
-   xlrec_reuse.snapshotConflictHorizon = safexid;
-
-   XLogBeginInsert();
-   XLogRegisterData((char *) &xlrec_reuse, SizeOfBtreeReusePage);
-
-   XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE);
-}
-
-/*
- * _bt_getbuf() -- Get a buffer by block number for read or write.
- *
- *     blkno == P_NEW means to get an unallocated index page.  The page
- *     will be initialized before returning it.
+ * _bt_getbuf() -- Get an existing block in a buffer, for read or write.
  *
  *     The general rule in nbtree is that it's never okay to access a
  *     page without holding both a buffer pin and a buffer lock on
@@ -860,136 +834,165 @@ _bt_log_reuse_page(Relation rel, Relation heaprel, BlockNumber blkno,
  *     When this routine returns, the appropriate lock is set on the
  *     requested buffer and its reference count has been incremented
  *     (ie, the buffer is "locked and pinned").  Also, we apply
- *     _bt_checkpage to sanity-check the page (except in P_NEW case),
- *     and perform Valgrind client requests that help Valgrind detect
- *     unsafe page accesses.
+ *     _bt_checkpage to sanity-check the page, and perform Valgrind
+ *     client requests that help Valgrind detect unsafe page accesses.
  *
  *     Note: raw LockBuffer() calls are disallowed in nbtree; all
  *     buffer lock requests need to go through wrapper functions such
  *     as _bt_lockbuf().
  */
 Buffer
-_bt_getbuf(Relation rel, Relation heaprel, BlockNumber blkno, int access)
+_bt_getbuf(Relation rel, BlockNumber blkno, int access)
 {
    Buffer      buf;
 
-   if (blkno != P_NEW)
-   {
-       /* Read an existing block of the relation */
-       buf = ReadBuffer(rel, blkno);
-       _bt_lockbuf(rel, buf, access);
-       _bt_checkpage(rel, buf);
-   }
-   else
-   {
-       Page        page;
+   Assert(BlockNumberIsValid(blkno));
 
-       Assert(access == BT_WRITE);
+   /* Read an existing block of the relation */
+   buf = ReadBuffer(rel, blkno);
+   _bt_lockbuf(rel, buf, access);
+   _bt_checkpage(rel, buf);
 
-       /*
-        * First see if the FSM knows of any free pages.
-        *
-        * We can't trust the FSM's report unreservedly; we have to check that
-        * the page is still free.  (For example, an already-free page could
-        * have been re-used between the time the last VACUUM scanned it and
-        * the time the VACUUM made its FSM updates.)
-        *
-        * In fact, it's worse than that: we can't even assume that it's safe
-        * to take a lock on the reported page.  If somebody else has a lock
-        * on it, or even worse our own caller does, we could deadlock.  (The
-        * own-caller scenario is actually not improbable. Consider an index
-        * on a serial or timestamp column.  Nearly all splits will be at the
-        * rightmost page, so it's entirely likely that _bt_split will call us
-        * while holding a lock on the page most recently acquired from FSM. A
-        * VACUUM running concurrently with the previous split could well have
-        * placed that page back in FSM.)
-        *
-        * To get around that, we ask for only a conditional lock on the
-        * reported page.  If we fail, then someone else is using the page,
-        * and we may reasonably assume it's not free.  (If we happen to be
-        * wrong, the worst consequence is the page will be lost to use till
-        * the next VACUUM, which is no big problem.)
-        */
-       for (;;)
+   return buf;
+}
+
+/*
+ * _bt_allocbuf() -- Allocate a new block/page.
+ *
+ * Returns a write-locked buffer containing an unallocated nbtree page.
+ *
+ * Callers are required to pass a valid heaprel.  We need heaprel so that we
+ * can handle generating a snapshotConflictHorizon that makes reusing a page
+ * from the FSM safe for queries that may be running on standbys.
+ */
+Buffer
+_bt_allocbuf(Relation rel, Relation heaprel)
+{
+   Buffer      buf;
+   BlockNumber blkno;
+   Page        page;
+
+   Assert(heaprel != NULL);
+
+   /*
+    * First see if the FSM knows of any free pages.
+    *
+    * We can't trust the FSM's report unreservedly; we have to check that the
+    * page is still free.  (For example, an already-free page could have been
+    * re-used between the time the last VACUUM scanned it and the time the
+    * VACUUM made its FSM updates.)
+    *
+    * In fact, it's worse than that: we can't even assume that it's safe to
+    * take a lock on the reported page.  If somebody else has a lock on it,
+    * or even worse our own caller does, we could deadlock.  (The own-caller
+    * scenario is actually not improbable. Consider an index on a serial or
+    * timestamp column.  Nearly all splits will be at the rightmost page, so
+    * it's entirely likely that _bt_split will call us while holding a lock
+    * on the page most recently acquired from FSM. A VACUUM running
+    * concurrently with the previous split could well have placed that page
+    * back in FSM.)
+    *
+    * To get around that, we ask for only a conditional lock on the reported
+    * page.  If we fail, then someone else is using the page, and we may
+    * reasonably assume it's not free.  (If we happen to be wrong, the worst
+    * consequence is the page will be lost to use till the next VACUUM, which
+    * is no big problem.)
+    */
+   for (;;)
+   {
+       blkno = GetFreeIndexPage(rel);
+       if (blkno == InvalidBlockNumber)
+           break;
+       buf = ReadBuffer(rel, blkno);
+       if (_bt_conditionallockbuf(rel, buf))
        {
-           blkno = GetFreeIndexPage(rel);
-           if (blkno == InvalidBlockNumber)
-               break;
-           buf = ReadBuffer(rel, blkno);
-           if (_bt_conditionallockbuf(rel, buf))
+           page = BufferGetPage(buf);
+
+           /*
+            * It's possible to find an all-zeroes page in an index.  For
+            * example, a backend might successfully extend the relation one
+            * page and then crash before it is able to make a WAL entry for
+            * adding the page.  If we find a zeroed page then reclaim it
+            * immediately.
+            */
+           if (PageIsNew(page))
            {
-               page = BufferGetPage(buf);
+               /* Okay to use page.  Initialize and return it. */
+               _bt_pageinit(page, BufferGetPageSize(buf));
+               return buf;
+           }
 
+           if (BTPageIsRecyclable(page, heaprel))
+           {
                /*
-                * It's possible to find an all-zeroes page in an index.  For
-                * example, a backend might successfully extend the relation
-                * one page and then crash before it is able to make a WAL
-                * entry for adding the page.  If we find a zeroed page then
-                * reclaim it immediately.
+                * If we are generating WAL for Hot Standby then create a WAL
+                * record that will allow us to conflict with queries running
+                * on standby, in case they have snapshots older than safexid
+                * value
                 */
-               if (PageIsNew(page))
+               if (RelationNeedsWAL(rel) && XLogStandbyInfoActive())
                {
-                   /* Okay to use page.  Initialize and return it. */
-                   _bt_pageinit(page, BufferGetPageSize(buf));
-                   return buf;
-               }
+                   xl_btree_reuse_page xlrec_reuse;
 
-               if (BTPageIsRecyclable(page, heaprel))
-               {
                    /*
-                    * If we are generating WAL for Hot Standby then create a
-                    * WAL record that will allow us to conflict with queries
-                    * running on standby, in case they have snapshots older
-                    * than safexid value
+                    * Note that we don't register the buffer with the record,
+                    * because this operation doesn't modify the page (that
+                    * already happened, back when VACUUM deleted the page).
+                    * This record only exists to provide a conflict point for
+                    * Hot Standby.  See record REDO routine comments.
                     */
-                   if (XLogStandbyInfoActive() && RelationNeedsWAL(rel))
-                       _bt_log_reuse_page(rel, heaprel, blkno,
-                                          BTPageGetDeleteXid(page));
+                   xlrec_reuse.locator = rel->rd_locator;
+                   xlrec_reuse.block = blkno;
+                   xlrec_reuse.snapshotConflictHorizon = BTPageGetDeleteXid(page);
+                   xlrec_reuse.isCatalogRel =
+                       RelationIsAccessibleInLogicalDecoding(heaprel);
+
+                   XLogBeginInsert();
+                   XLogRegisterData((char *) &xlrec_reuse, SizeOfBtreeReusePage);
 
-                   /* Okay to use page.  Re-initialize and return it. */
-                   _bt_pageinit(page, BufferGetPageSize(buf));
-                   return buf;
+                   XLogInsert(RM_BTREE_ID, XLOG_BTREE_REUSE_PAGE);
                }
-               elog(DEBUG2, "FSM returned nonrecyclable page");
-               _bt_relbuf(rel, buf);
-           }
-           else
-           {
-               elog(DEBUG2, "FSM returned nonlockable page");
-               /* couldn't get lock, so just drop pin */
-               ReleaseBuffer(buf);
+
+               /* Okay to use page.  Re-initialize and return it. */
+               _bt_pageinit(page, BufferGetPageSize(buf));
+               return buf;
            }
+           elog(DEBUG2, "FSM returned nonrecyclable page");
+           _bt_relbuf(rel, buf);
        }
+       else
+       {
+           elog(DEBUG2, "FSM returned nonlockable page");
+           /* couldn't get lock, so just drop pin */
+           ReleaseBuffer(buf);
+       }
+   }
 
-       /*
-        * Extend the relation by one page. Need to use RBM_ZERO_AND_LOCK or
-        * we risk a race condition against btvacuumscan --- see comments
-        * therein. This forces us to repeat the valgrind request that
-        * _bt_lockbuf() otherwise would make, as we can't use _bt_lockbuf()
-        * without introducing a race.
-        */
-       buf = ExtendBufferedRel(EB_REL(rel), MAIN_FORKNUM, NULL,
-                               EB_LOCK_FIRST);
-       if (!RelationUsesLocalBuffers(rel))
-           VALGRIND_MAKE_MEM_DEFINED(BufferGetPage(buf), BLCKSZ);
+   /*
+    * Extend the relation by one page. Need to use RBM_ZERO_AND_LOCK or we
+    * risk a race condition against btvacuumscan --- see comments therein.
+    * This forces us to repeat the valgrind request that _bt_lockbuf()
+    * otherwise would make, as we can't use _bt_lockbuf() without introducing
+    * a race.
+    */
+   buf = ExtendBufferedRel(EB_REL(rel), MAIN_FORKNUM, NULL, EB_LOCK_FIRST);
+   if (!RelationUsesLocalBuffers(rel))
+       VALGRIND_MAKE_MEM_DEFINED(BufferGetPage(buf), BLCKSZ);
 
-       /* Initialize the new page before returning it */
-       page = BufferGetPage(buf);
-       Assert(PageIsNew(page));
-       _bt_pageinit(page, BufferGetPageSize(buf));
-   }
+   /* Initialize the new page before returning it */
+   page = BufferGetPage(buf);
+   Assert(PageIsNew(page));
+   _bt_pageinit(page, BufferGetPageSize(buf));
 
-   /* ref count and lock type are correct */
    return buf;
 }
 
 /*
  * _bt_relandgetbuf() -- release a locked buffer and get another one.
  *
- * This is equivalent to _bt_relbuf followed by _bt_getbuf, with the
- * exception that blkno may not be P_NEW.  Also, if obuf is InvalidBuffer
- * then it reduces to just _bt_getbuf; allowing this case simplifies some
- * callers.
+ * This is equivalent to _bt_relbuf followed by _bt_getbuf.  Also, if obuf is
+ * InvalidBuffer then it reduces to just _bt_getbuf; allowing this case
+ * simplifies some callers.
  *
  * The original motivation for using this was to avoid two entries to the
  * bufmgr when one would do.  However, now it's mainly just a notational
@@ -1001,7 +1004,7 @@ _bt_relandgetbuf(Relation rel, Buffer obuf, BlockNumber blkno, int access)
 {
    Buffer      buf;
 
-   Assert(blkno != P_NEW);
+   Assert(BlockNumberIsValid(blkno));
    if (BufferIsValid(obuf))
        _bt_unlockbuf(rel, obuf);
    buf = ReleaseAndReadBuffer(obuf, rel, blkno);
@@ -1272,14 +1275,14 @@ _bt_delitems_vacuum(Relation rel, Buffer buf,
  * (a version that lacks the TIDs that are to be deleted).
  *
  * This is nearly the same as _bt_delitems_vacuum as far as what it does to
- * the page, but it needs its own snapshotConflictHorizon (caller gets this
- * from tableam).  This is used by the REDO routine to generate recovery
+ * the page, but it needs its own snapshotConflictHorizon and isCatalogRel
+ * (from the tableam).  This is used by the REDO routine to generate recovery
  * conflicts.  The other difference is that only _bt_delitems_vacuum will
  * clear page's VACUUM cycle ID.
  */
 static void
-_bt_delitems_delete(Relation rel, Relation heaprel, Buffer buf,
-                   TransactionId snapshotConflictHorizon,
+_bt_delitems_delete(Relation rel, Buffer buf,
+                   TransactionId snapshotConflictHorizon, bool isCatalogRel,
                    OffsetNumber *deletable, int ndeletable,
                    BTVacuumPosting *updatable, int nupdatable)
 {
@@ -1343,10 +1346,10 @@ _bt_delitems_delete(Relation rel, Relation heaprel, Buffer buf,
        XLogRecPtr  recptr;
        xl_btree_delete xlrec_delete;
 
-       xlrec_delete.isCatalogRel = RelationIsAccessibleInLogicalDecoding(heaprel);
        xlrec_delete.snapshotConflictHorizon = snapshotConflictHorizon;
        xlrec_delete.ndeleted = ndeletable;
        xlrec_delete.nupdated = nupdatable;
+       xlrec_delete.isCatalogRel = isCatalogRel;
 
        XLogBeginInsert();
        XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
@@ -1517,6 +1520,7 @@ _bt_delitems_delete_check(Relation rel, Buffer buf, Relation heapRel,
 {
    Page        page = BufferGetPage(buf);
    TransactionId snapshotConflictHorizon;
+   bool        isCatalogRel;
    OffsetNumber postingidxoffnum = InvalidOffsetNumber;
    int         ndeletable = 0,
                nupdatable = 0;
@@ -1525,6 +1529,7 @@ _bt_delitems_delete_check(Relation rel, Buffer buf, Relation heapRel,
 
    /* Use tableam interface to determine which tuples to delete first */
    snapshotConflictHorizon = table_index_delete_tuples(heapRel, delstate);
+   isCatalogRel = RelationIsAccessibleInLogicalDecoding(heapRel);
 
    /* Should not WAL-log snapshotConflictHorizon unless it's required */
    if (!XLogStandbyInfoActive())
@@ -1670,8 +1675,8 @@ _bt_delitems_delete_check(Relation rel, Buffer buf, Relation heapRel,
    }
 
    /* Physically delete tuples (or TIDs) using deletable (or updatable) */
-   _bt_delitems_delete(rel, heapRel, buf, snapshotConflictHorizon, deletable,
-                       ndeletable, updatable, nupdatable);
+   _bt_delitems_delete(rel, buf, snapshotConflictHorizon, isCatalogRel,
+                       deletable, ndeletable, updatable, nupdatable);
 
    /* be tidy */
    for (int i = 0; i < nupdatable; i++)
@@ -1692,8 +1697,7 @@ _bt_delitems_delete_check(Relation rel, Buffer buf, Relation heapRel,
  * same level must always be locked left to right to avoid deadlocks.
  */
 static bool
-_bt_leftsib_splitflag(Relation rel, Relation heaprel, BlockNumber leftsib,
-                     BlockNumber target)
+_bt_leftsib_splitflag(Relation rel, BlockNumber leftsib, BlockNumber target)
 {
    Buffer      buf;
    Page        page;
@@ -1704,7 +1708,7 @@ _bt_leftsib_splitflag(Relation rel, Relation heaprel, BlockNumber leftsib,
    if (leftsib == P_NONE)
        return false;
 
-   buf = _bt_getbuf(rel, heaprel, leftsib, BT_READ);
+   buf = _bt_getbuf(rel, leftsib, BT_READ);
    page = BufferGetPage(buf);
    opaque = BTPageGetOpaque(page);
 
@@ -1750,7 +1754,7 @@ _bt_leftsib_splitflag(Relation rel, Relation heaprel, BlockNumber leftsib,
  * to-be-deleted subtree.)
  */
 static bool
-_bt_rightsib_halfdeadflag(Relation rel, Relation heaprel, BlockNumber leafrightsib)
+_bt_rightsib_halfdeadflag(Relation rel, BlockNumber leafrightsib)
 {
    Buffer      buf;
    Page        page;
@@ -1759,7 +1763,7 @@ _bt_rightsib_halfdeadflag(Relation rel, Relation heaprel, BlockNumber leafrights
 
    Assert(leafrightsib != P_NONE);
 
-   buf = _bt_getbuf(rel, heaprel, leafrightsib, BT_READ);
+   buf = _bt_getbuf(rel, leafrightsib, BT_READ);
    page = BufferGetPage(buf);
    opaque = BTPageGetOpaque(page);
 
@@ -1948,18 +1952,18 @@ _bt_pagedel(Relation rel, Buffer leafbuf, BTVacState *vstate)
                 * marked with INCOMPLETE_SPLIT flag before proceeding
                 */
                Assert(leafblkno == scanblkno);
-               if (_bt_leftsib_splitflag(rel, vstate->info->heaprel, leftsib, leafblkno))
+               if (_bt_leftsib_splitflag(rel, leftsib, leafblkno))
                {
                    ReleaseBuffer(leafbuf);
                    return;
                }
 
                /* we need an insertion scan key for the search, so build one */
-               itup_key = _bt_mkscankey(rel, vstate->info->heaprel, targetkey);
+               itup_key = _bt_mkscankey(rel, targetkey);
                /* find the leftmost leaf page with matching pivot/high key */
                itup_key->pivotsearch = true;
-               stack = _bt_search(rel, vstate->info->heaprel, itup_key,
-                                  &sleafbuf, BT_READ, NULL);
+               stack = _bt_search(rel, NULL, itup_key, &sleafbuf, BT_READ,
+                                  NULL);
                /* won't need a second lock or pin on leafbuf */
                _bt_relbuf(rel, sleafbuf);
 
@@ -1990,7 +1994,8 @@ _bt_pagedel(Relation rel, Buffer leafbuf, BTVacState *vstate)
             * leafbuf page half-dead.
             */
            Assert(P_ISLEAF(opaque) && !P_IGNORE(opaque));
-           if (!_bt_mark_page_halfdead(rel, vstate->info->heaprel, leafbuf, stack))
+           if (!_bt_mark_page_halfdead(rel, vstate->info->heaprel, leafbuf,
+                                       stack))
            {
                _bt_relbuf(rel, leafbuf);
                return;
@@ -2053,7 +2058,7 @@ _bt_pagedel(Relation rel, Buffer leafbuf, BTVacState *vstate)
        if (!rightsib_empty)
            break;
 
-       leafbuf = _bt_getbuf(rel, vstate->info->heaprel, rightsib, BT_WRITE);
+       leafbuf = _bt_getbuf(rel, rightsib, BT_WRITE);
    }
 }
 
@@ -2066,6 +2071,10 @@ _bt_pagedel(Relation rel, Buffer leafbuf, BTVacState *vstate)
  * but may include additional internal pages (at most one per level of the
  * tree below the root).
  *
+ * Caller must pass a valid heaprel, since it's just about possible that our
+ * call to _bt_lock_subtree_parent will need to allocate a new index page to
+ * complete a page split.  Every call to _bt_allocbuf needs to pass a heaprel.
+ *
  * Returns 'false' if leafbuf is unsafe to delete, usually because leafbuf is
  * the rightmost child of its parent (and parent has more than one downlink).
  * Returns 'true' when the first stage of page deletion completed
@@ -2094,6 +2103,7 @@ _bt_mark_page_halfdead(Relation rel, Relation heaprel, Buffer leafbuf,
    Assert(!P_RIGHTMOST(opaque) && !P_ISROOT(opaque) &&
           P_ISLEAF(opaque) && !P_IGNORE(opaque) &&
           P_FIRSTDATAKEY(opaque) > PageGetMaxOffsetNumber(page));
+   Assert(heaprel != NULL);
 
    /*
     * Save info about the leaf page.
@@ -2108,7 +2118,7 @@ _bt_mark_page_halfdead(Relation rel, Relation heaprel, Buffer leafbuf,
     * delete the downlink.  It would fail the "right sibling of target page
     * is also the next child in parent page" cross-check below.
     */
-   if (_bt_rightsib_halfdeadflag(rel, heaprel, leafrightsib))
+   if (_bt_rightsib_halfdeadflag(rel, leafrightsib))
    {
        elog(DEBUG1, "could not delete page %u because its right sibling %u is half-dead",
             leafblkno, leafrightsib);
@@ -2352,7 +2362,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, BlockNumber scanblkno,
        Assert(target != leafblkno);
 
        /* Fetch the block number of the target's left sibling */
-       buf = _bt_getbuf(rel, vstate->info->heaprel, target, BT_READ);
+       buf = _bt_getbuf(rel, target, BT_READ);
        page = BufferGetPage(buf);
        opaque = BTPageGetOpaque(page);
        leftsib = opaque->btpo_prev;
@@ -2379,7 +2389,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, BlockNumber scanblkno,
        _bt_lockbuf(rel, leafbuf, BT_WRITE);
    if (leftsib != P_NONE)
    {
-       lbuf = _bt_getbuf(rel, vstate->info->heaprel, leftsib, BT_WRITE);
+       lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
        page = BufferGetPage(lbuf);
        opaque = BTPageGetOpaque(page);
        while (P_ISDELETED(opaque) || opaque->btpo_next != target)
@@ -2427,7 +2437,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, BlockNumber scanblkno,
            CHECK_FOR_INTERRUPTS();
 
            /* step right one page */
-           lbuf = _bt_getbuf(rel, vstate->info->heaprel, leftsib, BT_WRITE);
+           lbuf = _bt_getbuf(rel, leftsib, BT_WRITE);
            page = BufferGetPage(lbuf);
            opaque = BTPageGetOpaque(page);
        }
@@ -2491,7 +2501,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, BlockNumber scanblkno,
     * And next write-lock the (current) right sibling.
     */
    rightsib = opaque->btpo_next;
-   rbuf = _bt_getbuf(rel, vstate->info->heaprel, rightsib, BT_WRITE);
+   rbuf = _bt_getbuf(rel, rightsib, BT_WRITE);
    page = BufferGetPage(rbuf);
    opaque = BTPageGetOpaque(page);
 
@@ -2538,7 +2548,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, BlockNumber scanblkno,
     * of doing so are slim, and the locking considerations daunting.)
     *
     * We can safely acquire a lock on the metapage here --- see comments for
-    * _bt_newroot().
+    * _bt_newlevel().
     */
    if (leftsib == P_NONE && rightsib_is_rightmost)
    {
@@ -2547,8 +2557,7 @@ _bt_unlink_halfdead_page(Relation rel, Buffer leafbuf, BlockNumber scanblkno,
        if (P_RIGHTMOST(opaque))
        {
            /* rightsib will be the only one left on the level */
-           metabuf = _bt_getbuf(rel, vstate->info->heaprel, BTREE_METAPAGE,
-                                BT_WRITE);
+           metabuf = _bt_getbuf(rel, BTREE_METAPAGE, BT_WRITE);
            metapg = BufferGetPage(metabuf);
            metad = BTPageGetMeta(metapg);
 
@@ -2906,7 +2915,7 @@ _bt_lock_subtree_parent(Relation rel, Relation heaprel, BlockNumber child,
     *
     * Note: We deliberately avoid completing incomplete splits here.
     */
-   if (_bt_leftsib_splitflag(rel, heaprel, leftsibparent, parent))
+   if (_bt_leftsib_splitflag(rel, leftsibparent, parent))
        return false;
 
    /* Recurse to examine child page's grandparent page */
@@ -2976,6 +2985,7 @@ _bt_pendingfsm_finalize(Relation rel, BTVacState *vstate)
    Relation    heaprel = vstate->info->heaprel;
 
    Assert(stats->pages_newly_deleted >= vstate->npendingpages);
+   Assert(heaprel != NULL);
 
    if (vstate->npendingpages == 0)
    {
index 1ce5b15199a9a101464c7d331877275ffb230135..4553aaee53187805d2a2477ea1fd525ffa52e52d 100644 (file)
@@ -835,7 +835,7 @@ btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
    if (stats == NULL)
    {
        /* Check if VACUUM operation can entirely avoid btvacuumscan() call */
-       if (!_bt_vacuum_needs_cleanup(info->index, info->heaprel))
+       if (!_bt_vacuum_needs_cleanup(info->index))
            return NULL;
 
        /*
@@ -871,7 +871,7 @@ btvacuumcleanup(IndexVacuumInfo *info, IndexBulkDeleteResult *stats)
     */
    Assert(stats->pages_deleted >= stats->pages_free);
    num_delpages = stats->pages_deleted - stats->pages_free;
-   _bt_set_cleanup_info(info->index, info->heaprel, num_delpages);
+   _bt_set_cleanup_info(info->index, num_delpages);
 
    /*
     * It's quite possible for us to be fooled by concurrent page splits into
index 263f75fce95382bef54da0a3cc11a47ffd6c40b1..7e05e58676895baee69c0fc31b46fb671f756d1c 100644 (file)
@@ -42,8 +42,7 @@ static bool _bt_steppage(IndexScanDesc scan, ScanDirection dir);
 static bool _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir);
 static bool _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno,
                                  ScanDirection dir);
-static Buffer _bt_walk_left(Relation rel, Relation heaprel, Buffer buf,
-                           Snapshot snapshot);
+static Buffer _bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot);
 static bool _bt_endpoint(IndexScanDesc scan, ScanDirection dir);
 static inline void _bt_initialize_more_data(BTScanOpaque so, ScanDirection dir);
 
@@ -92,6 +91,9 @@ _bt_drop_lock_and_maybe_pin(IndexScanDesc scan, BTScanPos sp)
  * When access = BT_READ, an empty index will result in *bufP being set to
  * InvalidBuffer.  Also, in BT_WRITE mode, any incomplete splits encountered
  * during the search will be finished.
+ *
+ * heaprel must be provided by callers that pass access = BT_WRITE, since we
+ * might need to allocate a new root page for caller -- see _bt_allocbuf.
  */
 BTStack
 _bt_search(Relation rel, Relation heaprel, BTScanInsert key, Buffer *bufP,
@@ -100,6 +102,10 @@ _bt_search(Relation rel, Relation heaprel, BTScanInsert key, Buffer *bufP,
    BTStack     stack_in = NULL;
    int         page_access = BT_READ;
 
+   /* heaprel must be set whenever _bt_allocbuf is reachable */
+   Assert(access == BT_READ || access == BT_WRITE);
+   Assert(access == BT_READ || heaprel != NULL);
+
    /* Get the root page to start with */
    *bufP = _bt_getroot(rel, heaprel, access);
 
@@ -222,8 +228,8 @@ _bt_search(Relation rel, Relation heaprel, BTScanInsert key, Buffer *bufP,
  *
  * If forupdate is true, we will attempt to finish any incomplete splits
  * that we encounter.  This is required when locking a target page for an
- * insertion, because we don't allow inserting on a page before the split
- * is completed.  'stack' is only used if forupdate is true.
+ * insertion, because we don't allow inserting on a page before the split is
+ * completed.  'heaprel' and 'stack' are only used if forupdate is true.
  *
  * On entry, we have the buffer pinned and a lock of the type specified by
  * 'access'.  If we move right, we release the buffer and lock and acquire
@@ -247,6 +253,8 @@ _bt_moveright(Relation rel,
    BTPageOpaque opaque;
    int32       cmpval;
 
+   Assert(!forupdate || heaprel != NULL);
+
    /*
     * When nextkey = false (normal case): if the scan key that brought us to
     * this page is > the high key stored on the page, then the page has split
@@ -295,7 +303,7 @@ _bt_moveright(Relation rel,
                _bt_relbuf(rel, buf);
 
            /* re-acquire the lock in the right mode, and re-check */
-           buf = _bt_getbuf(rel, heaprel, blkno, access);
+           buf = _bt_getbuf(rel, blkno, access);
            continue;
        }
 
@@ -862,7 +870,6 @@ bool
 _bt_first(IndexScanDesc scan, ScanDirection dir)
 {
    Relation    rel = scan->indexRelation;
-   Relation    heaprel = scan->heapRelation;
    BTScanOpaque so = (BTScanOpaque) scan->opaque;
    Buffer      buf;
    BTStack     stack;
@@ -1355,7 +1362,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
    }
 
    /* Initialize remaining insertion scan key fields */
-   _bt_metaversion(rel, heaprel, &inskey.heapkeyspace, &inskey.allequalimage);
+   _bt_metaversion(rel, &inskey.heapkeyspace, &inskey.allequalimage);
    inskey.anynullkeys = false; /* unused */
    inskey.nextkey = nextkey;
    inskey.pivotsearch = false;
@@ -1366,7 +1373,7 @@ _bt_first(IndexScanDesc scan, ScanDirection dir)
     * Use the manufactured insertion scan key to descend the tree and
     * position ourselves on the target leaf page.
     */
-   stack = _bt_search(rel, heaprel, &inskey, &buf, BT_READ, scan->xs_snapshot);
+   stack = _bt_search(rel, NULL, &inskey, &buf, BT_READ, scan->xs_snapshot);
 
    /* don't need to keep the stack around... */
    _bt_freestack(stack);
@@ -2007,7 +2014,7 @@ _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
            /* check for interrupts while we're not holding any buffer lock */
            CHECK_FOR_INTERRUPTS();
            /* step right one page */
-           so->currPos.buf = _bt_getbuf(rel, scan->heapRelation, blkno, BT_READ);
+           so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
            page = BufferGetPage(so->currPos.buf);
            TestForOldSnapshot(scan->xs_snapshot, rel, page);
            opaque = BTPageGetOpaque(page);
@@ -2081,8 +2088,7 @@ _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
        if (BTScanPosIsPinned(so->currPos))
            _bt_lockbuf(rel, so->currPos.buf, BT_READ);
        else
-           so->currPos.buf = _bt_getbuf(rel, scan->heapRelation,
-                                        so->currPos.currPage, BT_READ);
+           so->currPos.buf = _bt_getbuf(rel, so->currPos.currPage, BT_READ);
 
        for (;;)
        {
@@ -2096,8 +2102,8 @@ _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
            }
 
            /* Step to next physical page */
-           so->currPos.buf = _bt_walk_left(rel, scan->heapRelation,
-                                           so->currPos.buf, scan->xs_snapshot);
+           so->currPos.buf = _bt_walk_left(rel, so->currPos.buf,
+                                           scan->xs_snapshot);
 
            /* if we're physically at end of index, return failure */
            if (so->currPos.buf == InvalidBuffer)
@@ -2144,8 +2150,7 @@ _bt_readnextpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
                    BTScanPosInvalidate(so->currPos);
                    return false;
                }
-               so->currPos.buf = _bt_getbuf(rel, scan->heapRelation, blkno,
-                                            BT_READ);
+               so->currPos.buf = _bt_getbuf(rel, blkno, BT_READ);
            }
        }
    }
@@ -2190,7 +2195,7 @@ _bt_parallel_readpage(IndexScanDesc scan, BlockNumber blkno, ScanDirection dir)
  * again if it's important.
  */
 static Buffer
-_bt_walk_left(Relation rel, Relation heaprel, Buffer buf, Snapshot snapshot)
+_bt_walk_left(Relation rel, Buffer buf, Snapshot snapshot)
 {
    Page        page;
    BTPageOpaque opaque;
@@ -2218,7 +2223,7 @@ _bt_walk_left(Relation rel, Relation heaprel, Buffer buf, Snapshot snapshot)
        _bt_relbuf(rel, buf);
        /* check for interrupts while we're not holding any buffer lock */
        CHECK_FOR_INTERRUPTS();
-       buf = _bt_getbuf(rel, heaprel, blkno, BT_READ);
+       buf = _bt_getbuf(rel, blkno, BT_READ);
        page = BufferGetPage(buf);
        TestForOldSnapshot(snapshot, rel, page);
        opaque = BTPageGetOpaque(page);
@@ -2309,7 +2314,7 @@ _bt_walk_left(Relation rel, Relation heaprel, Buffer buf, Snapshot snapshot)
  * The returned buffer is pinned and read-locked.
  */
 Buffer
-_bt_get_endpoint(Relation rel, Relation heaprel, uint32 level, bool rightmost,
+_bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
                 Snapshot snapshot)
 {
    Buffer      buf;
@@ -2325,9 +2330,9 @@ _bt_get_endpoint(Relation rel, Relation heaprel, uint32 level, bool rightmost,
     * smarter about intermediate levels.)
     */
    if (level == 0)
-       buf = _bt_getroot(rel, heaprel, BT_READ);
+       buf = _bt_getroot(rel, NULL, BT_READ);
    else
-       buf = _bt_gettrueroot(rel, heaprel);
+       buf = _bt_gettrueroot(rel);
 
    if (!BufferIsValid(buf))
        return InvalidBuffer;
@@ -2408,8 +2413,7 @@ _bt_endpoint(IndexScanDesc scan, ScanDirection dir)
     * version of _bt_search().  We don't maintain a stack since we know we
     * won't need it.
     */
-   buf = _bt_get_endpoint(rel, scan->heapRelation, 0,
-                          ScanDirectionIsBackward(dir), scan->xs_snapshot);
+   buf = _bt_get_endpoint(rel, 0, ScanDirectionIsBackward(dir), scan->xs_snapshot);
 
    if (!BufferIsValid(buf))
    {
index 6ad3f3c54d5fb861ca34647f02a5ee3dc61bff84..c2665fce411af94913aaeb7b23ea78849656724a 100644 (file)
@@ -566,7 +566,7 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
 
    wstate.heap = btspool->heap;
    wstate.index = btspool->index;
-   wstate.inskey = _bt_mkscankey(wstate.index, btspool->heap, NULL);
+   wstate.inskey = _bt_mkscankey(wstate.index, NULL);
    /* _bt_mkscankey() won't set allequalimage without metapage */
    wstate.inskey->allequalimage = _bt_allequalimage(wstate.index, true);
    wstate.btws_use_wal = RelationNeedsWAL(wstate.index);
index 05abf36032efb51eba7c6a94712cfb7aa9e0eadb..7da499c4dd55da75cbf5c681a30b01459ec57b21 100644 (file)
@@ -87,7 +87,7 @@ static int    _bt_keep_natts(Relation rel, IndexTuple lastleft,
  *     field themselves.
  */
 BTScanInsert
-_bt_mkscankey(Relation rel, Relation heaprel, IndexTuple itup)
+_bt_mkscankey(Relation rel, IndexTuple itup)
 {
    BTScanInsert key;
    ScanKey     skey;
@@ -112,7 +112,7 @@ _bt_mkscankey(Relation rel, Relation heaprel, IndexTuple itup)
    key = palloc(offsetof(BTScanInsertData, scankeys) +
                 sizeof(ScanKeyData) * indnkeyatts);
    if (itup)
-       _bt_metaversion(rel, heaprel, &key->heapkeyspace, &key->allequalimage);
+       _bt_metaversion(rel, &key->heapkeyspace, &key->allequalimage);
    else
    {
        /* Utility statement callers can set these fields themselves */
@@ -1761,8 +1761,7 @@ _bt_killitems(IndexScanDesc scan)
 
        droppedpin = true;
        /* Attempt to re-read the buffer, getting pin and lock. */
-       buf = _bt_getbuf(scan->indexRelation, scan->heapRelation,
-                        so->currPos.currPage, BT_READ);
+       buf = _bt_getbuf(scan->indexRelation, so->currPos.currPage, BT_READ);
 
        page = BufferGetPage(buf);
        if (BufferGetLSNAtomic(buf) == so->currPos.lsn)
index 65adf04c4eb3b890d62af74951ecf16364dfe8bb..39932d3c2d27c21a2cddf55d4ee86b67bdf2a92e 100644 (file)
@@ -462,7 +462,7 @@ get_relation_info(PlannerInfo *root, Oid relationObjectId, bool inhparent,
                     * For btrees, get tree height while we have the index
                     * open
                     */
-                   info->tree_height = _bt_getrootheight(indexRelation, relation);
+                   info->tree_height = _bt_getrootheight(indexRelation);
                }
                else
                {
index 01881069257bb9d100cd0a4b199c58af3ca32d9f..eb6cfcfd002134a228800df9e5a2111219ec66a2 100644 (file)
@@ -207,7 +207,6 @@ tuplesort_begin_heap(TupleDesc tupDesc,
 Tuplesortstate *
 tuplesort_begin_cluster(TupleDesc tupDesc,
                        Relation indexRel,
-                       Relation heaprel,
                        int workMem,
                        SortCoordinate coordinate, int sortopt)
 {
@@ -261,7 +260,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc,
 
    arg->tupDesc = tupDesc;     /* assume we need not copy tupDesc */
 
-   indexScanKey = _bt_mkscankey(indexRel, heaprel, NULL);
+   indexScanKey = _bt_mkscankey(indexRel, NULL);
 
    if (arg->indexInfo->ii_Expressions != NULL)
    {
@@ -362,7 +361,7 @@ tuplesort_begin_index_btree(Relation heapRel,
    arg->enforceUnique = enforceUnique;
    arg->uniqueNullsNotDistinct = uniqueNullsNotDistinct;
 
-   indexScanKey = _bt_mkscankey(indexRel, heapRel, NULL);
+   indexScanKey = _bt_mkscankey(indexRel, NULL);
 
    /* Prepare SortSupport data for each column */
    base->sortKeys = (SortSupport) palloc0(base->nKeys *
index d6847860959eadad3b7f185b21b8b206160c9edc..8891fa79734c0e15f4625b6aaa08024a177a9fb7 100644 (file)
@@ -293,11 +293,14 @@ BTPageIsRecyclable(Page page, Relation heaprel)
    BTPageOpaque opaque;
 
    Assert(!PageIsNew(page));
+   Assert(heaprel != NULL);
 
    /* Recycling okay iff page is deleted and safexid is old enough */
    opaque = BTPageGetOpaque(page);
    if (P_ISDELETED(opaque))
    {
+       FullTransactionId safexid = BTPageGetDeleteXid(page);
+
        /*
         * The page was deleted, but when? If it was just deleted, a scan
         * might have seen the downlink to it, and will read the page later.
@@ -308,7 +311,7 @@ BTPageIsRecyclable(Page page, Relation heaprel)
         * anyone. If not, then no scan that's still in progress could have
         * seen its downlink, and we can recycle it.
         */
-       return GlobalVisCheckRemovableFullXid(heaprel, BTPageGetDeleteXid(page));
+       return GlobalVisCheckRemovableFullXid(heaprel, safexid);
    }
 
    return false;
@@ -1194,18 +1197,17 @@ extern OffsetNumber _bt_findsplitloc(Relation rel, Page origpage,
  */
 extern void _bt_initmetapage(Page page, BlockNumber rootbknum, uint32 level,
                             bool allequalimage);
-extern bool _bt_vacuum_needs_cleanup(Relation rel, Relation heaprel);
-extern void _bt_set_cleanup_info(Relation rel, Relation heaprel,
-                                BlockNumber num_delpages);
+extern bool _bt_vacuum_needs_cleanup(Relation rel);
+extern void _bt_set_cleanup_info(Relation rel, BlockNumber num_delpages);
 extern void _bt_upgrademetapage(Page page);
 extern Buffer _bt_getroot(Relation rel, Relation heaprel, int access);
-extern Buffer _bt_gettrueroot(Relation rel, Relation heaprel);
-extern int _bt_getrootheight(Relation rel, Relation heaprel);
-extern void _bt_metaversion(Relation rel, Relation heaprel, bool *heapkeyspace,
+extern Buffer _bt_gettrueroot(Relation rel);
+extern int _bt_getrootheight(Relation rel);
+extern void _bt_metaversion(Relation rel, bool *heapkeyspace,
                            bool *allequalimage);
 extern void _bt_checkpage(Relation rel, Buffer buf);
-extern Buffer _bt_getbuf(Relation rel, Relation heaprel, BlockNumber blkno,
-                        int access);
+extern Buffer _bt_getbuf(Relation rel, BlockNumber blkno, int access);
+extern Buffer _bt_allocbuf(Relation rel, Relation heaprel);
 extern Buffer _bt_relandgetbuf(Relation rel, Buffer obuf,
                               BlockNumber blkno, int access);
 extern void _bt_relbuf(Relation rel, Buffer buf);
@@ -1237,13 +1239,13 @@ extern OffsetNumber _bt_binsrch_insert(Relation rel, BTInsertState insertstate);
 extern int32 _bt_compare(Relation rel, BTScanInsert key, Page page, OffsetNumber offnum);
 extern bool _bt_first(IndexScanDesc scan, ScanDirection dir);
 extern bool _bt_next(IndexScanDesc scan, ScanDirection dir);
-extern Buffer _bt_get_endpoint(Relation rel, Relation heaprel, uint32 level,
-                              bool rightmost, Snapshot snapshot);
+extern Buffer _bt_get_endpoint(Relation rel, uint32 level, bool rightmost,
+                              Snapshot snapshot);
 
 /*
  * s for functions in nbtutils.c
  */
-extern BTScanInsert _bt_mkscankey(Relation rel, Relation heaprel, IndexTuple itup);
+extern BTScanInsert _bt_mkscankey(Relation rel, IndexTuple itup);
 extern void _bt_freestack(BTStack stack);
 extern void _bt_preprocess_array_keys(IndexScanDesc scan);
 extern void _bt_start_array_keys(IndexScanDesc scan, ScanDirection dir);
index 656a2e98976223039666aaeee9cc6abb5ea742c6..af057b6358efa81135168110e58f039e2c43bace 100644 (file)
@@ -399,9 +399,7 @@ extern Tuplesortstate *tuplesort_begin_heap(TupleDesc tupDesc,
                                            int workMem, SortCoordinate coordinate,
                                            int sortopt);
 extern Tuplesortstate *tuplesort_begin_cluster(TupleDesc tupDesc,
-                                              Relation indexRel,
-                                              Relation heaprel,
-                                              int workMem,
+                                              Relation indexRel, int workMem,
                                               SortCoordinate coordinate,
                                               int sortopt);
 extern Tuplesortstate *tuplesort_begin_index_btree(Relation heapRel,