queries.
</para>
</sect2>
+
+ <sect2 id="runtime-config-resource-async-behavior">
+ <title>Asynchronous Behavior</title>
+
+ <variablelist>
+ <varlistentry id="guc-effective-io-concurrency" xreflabel="effective_io_concurrency">
+ <term><varname>effective_io_concurrency</varname> (<type>integer</type>)</term>
+ <indexterm>
+ <primary><varname>effective_io_concurrency</> configuration parameter</primary>
+ </indexterm>
+ <listitem>
+ <para>
+ Sets the number of concurrent disk I/O operations that
+ <productname>PostgreSQL</> expects can be executed
+ simultaneously. Raising this value will increase the number of I/O
+ operations that any individual <productname>PostgreSQL</> session
+ attempts to initiate in parallel. The allowed range is 1 to 1000,
+ or zero to disable issuance of asynchronous I/O requests.
+ </para>
+
+ <para>
+ A good starting point for this setting is the number of separate
+ drives comprising a RAID 0 stripe or RAID 1 mirror being used for the
+ database. (For RAID 5 the parity drive should not be counted.)
+ However, if the database is often busy with multiple queries issued in
+ concurrent sessions, lower values may be sufficient to keep the disk
+ array busy. A value higher than needed to keep the disks busy will
+ only result in extra CPU overhead.
+ </para>
+
+ <para>
+ For more exotic systems, such as memory-based storage or a RAID array
+ that is limited by bus bandwidth, the correct value might be the
+ number of I/O paths available. Some experimentation may be needed
+ to find the best value.
+ </para>
+
+ <para>
+ Asynchronous I/O depends on an effective <function>posix_fadvise</>
+ function, which some operating systems lack. If the function is not
+ present then setting this parameter to anything but zero will result
+ in an error. On some operating systems the function is present but
+ does not actually do anything. On such systems setting a nonzero
+ value will add CPU overhead without improving performance.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </sect2>
</sect1>
<sect1 id="runtime-config-wal">
TIDBitmap *tbm;
TBMIterator *tbmiterator;
TBMIterateResult *tbmres;
+ TBMIterator *prefetch_iterator;
OffsetNumber targoffset;
TupleTableSlot *slot;
tbm = node->tbm;
tbmiterator = node->tbmiterator;
tbmres = node->tbmres;
+ prefetch_iterator = node->prefetch_iterator;
/*
* Check if we are evaluating PlanQual for tuple of this relation.
/*
* If we haven't yet performed the underlying index scan, do it, and
* begin the iteration over the bitmap.
+ *
+ * For prefetching, we use *two* iterators, one for the pages we are
+ * actually scanning and another that runs ahead of the first for
+ * prefetching. node->prefetch_pages tracks exactly how many pages
+ * ahead the prefetch iterator is. Also, node->prefetch_target tracks
+ * the desired prefetch distance, which starts small and increases up
+ * to the GUC-controlled maximum, target_prefetch_pages. This is to
+ * avoid doing a lot of prefetching in a scan that stops after a few
+ * tuples because of a LIMIT.
*/
if (tbm == NULL)
{
node->tbm = tbm;
node->tbmiterator = tbmiterator = tbm_begin_iterate(tbm);
node->tbmres = tbmres = NULL;
+
+#ifdef USE_PREFETCH
+ if (target_prefetch_pages > 0)
+ {
+ node->prefetch_iterator = prefetch_iterator = tbm_begin_iterate(tbm);
+ node->prefetch_pages = 0;
+ node->prefetch_target = -1;
+ }
+#endif /* USE_PREFETCH */
}
for (;;)
break;
}
+#ifdef USE_PREFETCH
+ if (node->prefetch_pages > 0)
+ {
+ /* The main iterator has closed the distance by one page */
+ node->prefetch_pages--;
+ }
+ else if (prefetch_iterator)
+ {
+ /* Do not let the prefetch iterator get behind the main one */
+ TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+ if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
+ elog(ERROR, "prefetch and main iterators are out of sync");
+ }
+#endif /* USE_PREFETCH */
+
/*
* Ignore any claimed entries past what we think is the end of the
* relation. (This is probably not necessary given that we got at
* Set rs_cindex to first slot to examine
*/
scan->rs_cindex = 0;
+
+#ifdef USE_PREFETCH
+ /*
+ * Increase prefetch target if it's not yet at the max. Note
+ * that we will increase it to zero after fetching the very
+ * first page/tuple, then to one after the second tuple is
+ * fetched, then it doubles as later pages are fetched.
+ */
+ if (node->prefetch_target >= target_prefetch_pages)
+ /* don't increase any further */ ;
+ else if (node->prefetch_target >= target_prefetch_pages / 2)
+ node->prefetch_target = target_prefetch_pages;
+ else if (node->prefetch_target > 0)
+ node->prefetch_target *= 2;
+ else
+ node->prefetch_target++;
+#endif /* USE_PREFETCH */
}
else
{
* Continuing in previously obtained page; advance rs_cindex
*/
scan->rs_cindex++;
+
+#ifdef USE_PREFETCH
+ /*
+ * Try to prefetch at least a few pages even before we get to the
+ * second page if we don't stop reading after the first tuple.
+ */
+ if (node->prefetch_target < target_prefetch_pages)
+ node->prefetch_target++;
+#endif /* USE_PREFETCH */
+ }
+
+#ifdef USE_PREFETCH
+ /*
+ * We issue prefetch requests *after* fetching the current page
+ * to try to avoid having prefetching interfere with the main I/O.
+ */
+ if (prefetch_iterator)
+ {
+ while (node->prefetch_pages < node->prefetch_target)
+ {
+ TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
+
+ if (tbmpre == NULL)
+ {
+ /* No more pages to prefetch */
+ tbm_end_iterate(prefetch_iterator);
+ node->prefetch_iterator = prefetch_iterator = NULL;
+ break;
+ }
+ node->prefetch_pages++;
+ PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
+ }
}
+#endif /* USE_PREFETCH */
/*
* Out of range? If so, nothing more to look at on this page
if (node->tbmiterator)
tbm_end_iterate(node->tbmiterator);
+ if (node->prefetch_iterator)
+ tbm_end_iterate(node->prefetch_iterator);
if (node->tbm)
tbm_free(node->tbm);
node->tbm = NULL;
node->tbmiterator = NULL;
node->tbmres = NULL;
+ node->prefetch_iterator = NULL;
/*
* Always rescan the input immediately, to ensure we can pass down any
*/
if (node->tbmiterator)
tbm_end_iterate(node->tbmiterator);
+ if (node->prefetch_iterator)
+ tbm_end_iterate(node->prefetch_iterator);
if (node->tbm)
tbm_free(node->tbm);
scanstate->tbm = NULL;
scanstate->tbmiterator = NULL;
scanstate->tbmres = NULL;
+ scanstate->prefetch_iterator = NULL;
+ scanstate->prefetch_pages = 0;
+ scanstate->prefetch_target = 0;
/*
* Miscellaneous initialization
int bgwriter_lru_maxpages = 100;
double bgwriter_lru_multiplier = 2.0;
+/*
+ * How many buffers PrefetchBuffer callers should try to stay ahead of their
+ * ReadBuffer calls by. This is maintained by the assign hook for
+ * effective_io_concurrency. Zero means "never prefetch".
+ */
+int target_prefetch_pages = 0;
+
/* local state for StartBufferIO and related functions */
static volatile BufferDesc *InProgressBuf = NULL;
static bool IsForInput;
static void AtProcExit_Buffers(int code, Datum arg);
+/*
+ * PrefetchBuffer -- initiate asynchronous read of a block of a relation
+ *
+ * This is named by analogy to ReadBuffer but doesn't actually allocate a
+ * buffer. Instead it tries to ensure that a future ReadBuffer for the given
+ * block will not be delayed by the I/O. Prefetching is optional.
+ * No-op if prefetching isn't compiled in.
+ */
+void
+PrefetchBuffer(Relation reln, ForkNumber forkNum, BlockNumber blockNum)
+{
+#ifdef USE_PREFETCH
+ Assert(RelationIsValid(reln));
+ Assert(BlockNumberIsValid(blockNum));
+
+ /* Open it at the smgr level if not already done */
+ RelationOpenSmgr(reln);
+
+ if (reln->rd_istemp)
+ {
+ /* pass it off to localbuf.c */
+ LocalPrefetchBuffer(reln->rd_smgr, forkNum, blockNum);
+ }
+ else
+ {
+ BufferTag newTag; /* identity of requested block */
+ uint32 newHash; /* hash value for newTag */
+ LWLockId newPartitionLock; /* buffer partition lock for it */
+ int buf_id;
+
+ /* create a tag so we can lookup the buffer */
+ INIT_BUFFERTAG(newTag, reln->rd_smgr->smgr_rnode, forkNum, blockNum);
+
+ /* determine its hash code and partition lock ID */
+ newHash = BufTableHashCode(&newTag);
+ newPartitionLock = BufMappingPartitionLock(newHash);
+
+ /* see if the block is in the buffer pool already */
+ LWLockAcquire(newPartitionLock, LW_SHARED);
+ buf_id = BufTableLookup(&newTag, newHash);
+ LWLockRelease(newPartitionLock);
+
+ /* If not in buffers, initiate prefetch */
+ if (buf_id < 0)
+ smgrprefetch(reln->rd_smgr, forkNum, blockNum);
+ }
+#endif /* USE_PREFETCH */
+}
+
+
/*
* ReadBuffer -- a shorthand for ReadBufferExtended, for reading from main
* fork with RBM_NORMAL mode and default strategy.
static Block GetLocalBufferStorage(void);
+/*
+ * LocalPrefetchBuffer -
+ * initiate asynchronous read of a block of a relation
+ *
+ * Do PrefetchBuffer's work for temporary relations.
+ * No-op if prefetching isn't compiled in.
+ */
+void
+LocalPrefetchBuffer(SMgrRelation smgr, ForkNumber forkNum,
+ BlockNumber blockNum)
+{
+#ifdef USE_PREFETCH
+ BufferTag newTag; /* identity of requested block */
+ LocalBufferLookupEnt *hresult;
+
+ INIT_BUFFERTAG(newTag, smgr->smgr_rnode, forkNum, blockNum);
+
+ /* Initialize local buffers if first request in this session */
+ if (LocalBufHash == NULL)
+ InitLocalBuffers();
+
+ /* See if the desired buffer already exists */
+ hresult = (LocalBufferLookupEnt *)
+ hash_search(LocalBufHash, (void *) &newTag, HASH_FIND, NULL);
+
+ if (hresult)
+ {
+ /* Yes, so nothing to do */
+ return;
+ }
+
+ /* Not in buffers, so initiate prefetch */
+ smgrprefetch(smgr, forkNum, blockNum);
+#endif /* USE_PREFETCH */
+}
+
+
/*
* LocalBufferAlloc -
* Find or create a local buffer for the given page of the given relation.
FreeVfd(file);
}
+/*
+ * FilePrefetch - initiate asynchronous read of a given range of the file.
+ * The logical seek position is unaffected.
+ *
+ * Currently the only implementation of this function is using posix_fadvise
+ * which is the simplest standardized interface that accomplishes this.
+ * We could add an implementation using libaio in the future; but note that
+ * this API is inappropriate for libaio, which wants to have a buffer provided
+ * to read into.
+ */
+int
+FilePrefetch(File file, off_t offset, int amount)
+{
+#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
+ int returnCode;
+
+ Assert(FileIsValid(file));
+
+ DO_DB(elog(LOG, "FilePrefetch: %d (%s) " INT64_FORMAT " %d",
+ file, VfdCache[file].fileName,
+ (int64) offset, amount));
+
+ returnCode = FileAccess(file);
+ if (returnCode < 0)
+ return returnCode;
+
+ returnCode = posix_fadvise(VfdCache[file].fd, offset, amount,
+ POSIX_FADV_WILLNEED);
+
+ return returnCode;
+#else
+ Assert(FileIsValid(file));
+ return 0;
+#endif
+}
+
int
FileRead(File file, char *buffer, int amount)
{
}
}
+/*
+ * mdprefetch() -- Initiate asynchronous read of the specified block of a relation
+ */
+void
+mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
+{
+#ifdef USE_PREFETCH
+ off_t seekpos;
+ MdfdVec *v;
+
+ v = _mdfd_getseg(reln, forknum, blocknum, false, EXTENSION_FAIL);
+
+ seekpos = (off_t) BLCKSZ * (blocknum % ((BlockNumber) RELSEG_SIZE));
+ Assert(seekpos < (off_t) BLCKSZ * RELSEG_SIZE);
+
+ (void) FilePrefetch(v->mdfd_vfd, seekpos, BLCKSZ);
+#endif /* USE_PREFETCH */
+}
+
+
/*
* mdread() -- Read the specified block from a relation.
*/
bool isRedo);
void (*smgr_extend) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool isTemp);
+ void (*smgr_prefetch) (SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum);
void (*smgr_read) (SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer);
void (*smgr_write) (SMgrRelation reln, ForkNumber forknum,
static const f_smgr smgrsw[] = {
/* magnetic disk */
{mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend,
- mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync,
+ mdprefetch, mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync,
mdpreckpt, mdsync, mdpostckpt
}
};
buffer, isTemp);
}
+/*
+ * smgrprefetch() -- Initiate asynchronous read of the specified block of a relation.
+ */
+void
+smgrprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum)
+{
+ (*(smgrsw[reln->smgr_which].smgr_prefetch)) (reln, forknum, blocknum);
+}
+
/*
* smgrread() -- read a particular block from a relation into the supplied
* buffer.
#include <ctype.h>
#include <float.h>
+#include <math.h>
#include <limits.h>
#include <unistd.h>
#include <sys/stat.h>
static const char *show_tcp_keepalives_idle(void);
static const char *show_tcp_keepalives_interval(void);
static const char *show_tcp_keepalives_count(void);
-static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source);
static bool assign_maxconnections(int newval, bool doit, GucSource source);
+static bool assign_autovacuum_max_workers(int newval, bool doit, GucSource source);
+static bool assign_effective_io_concurrency(int newval, bool doit, GucSource source);
static const char *assign_pgstat_temp_directory(const char *newval, bool doit, GucSource source);
static char *config_enum_get_options(struct config_enum *record,
static int wal_block_size;
static int wal_segment_size;
static bool integer_datetimes;
+static int effective_io_concurrency;
/* should be static, but commands/variable.c needs to get at these */
char *role_string;
100, 0, 1000, NULL, NULL
},
+ {
+ {"effective_io_concurrency", PGC_USERSET, RESOURCES,
+ gettext_noop("Number of simultaneous requests that can be handled efficiently by the disk subsystem."),
+ gettext_noop("For RAID arrays, this should be approximately the number of drive spindles in the array.")
+ },
+ &effective_io_concurrency,
+#ifdef USE_PREFETCH
+ 1, 0, 1000,
+#else
+ 0, 0, 0,
+#endif
+ assign_effective_io_concurrency, NULL
+ },
+
{
{"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE,
gettext_noop("Automatic log file rotation will occur after N minutes."),
return true;
}
+static bool
+assign_effective_io_concurrency(int newval, bool doit, GucSource source)
+{
+#ifdef USE_PREFETCH
+ double new_prefetch_pages = 0.0;
+ int i;
+
+ /*----------
+ * The user-visible GUC parameter is the number of drives (spindles),
+ * which we need to translate to a number-of-pages-to-prefetch target.
+ *
+ * The expected number of prefetch pages needed to keep N drives busy is:
+ *
+ * drives | I/O requests
+ * -------+----------------
+ * 1 | 1
+ * 2 | 2/1 + 2/2 = 3
+ * 3 | 3/1 + 3/2 + 3/3 = 5 1/2
+ * 4 | 4/1 + 4/2 + 4/3 + 4/4 = 8 1/3
+ * n | n * H(n)
+ *
+ * This is called the "coupon collector problem" and H(n) is called the
+ * harmonic series. This could be approximated by n * ln(n), but for
+ * reasonable numbers of drives we might as well just compute the series.
+ *
+ * Alternatively we could set the target to the number of pages necessary
+ * so that the expected number of active spindles is some arbitrary
+ * percentage of the total. This sounds the same but is actually slightly
+ * different. The result ends up being ln(1-P)/ln((n-1)/n) where P is
+ * that desired fraction.
+ *
+ * Experimental results show that both of these formulas aren't aggressive
+ * enough, but we don't really have any better proposals.
+ *
+ * Note that if newval = 0 (disabled), we must set target = 0.
+ *----------
+ */
+
+ for (i = 1; i <= newval; i++)
+ new_prefetch_pages += (double) newval / (double) i;
+
+ /* This range check shouldn't fail, but let's be paranoid */
+ if (new_prefetch_pages >= 0.0 && new_prefetch_pages < (double) INT_MAX)
+ {
+ if (doit)
+ target_prefetch_pages = (int) rint(new_prefetch_pages);
+ return true;
+ }
+ else
+ return false;
+#else
+ return true;
+#endif /* USE_PREFETCH */
+}
+
static const char *
assign_pgstat_temp_directory(const char *newval, bool doit, GucSource source)
{
#bgwriter_lru_maxpages = 100 # 0-1000 max buffers written/round
#bgwriter_lru_multiplier = 2.0 # 0-10.0 multipler on buffers scanned/round
+# - Asynchronous Behavior -
+
+#effective_io_concurrency = 1 # 1-1000, or 0 to disable prefetching
+
#------------------------------------------------------------------------------
# WRITE AHEAD LOG
* tbm bitmap obtained from child index scan(s)
* tbmiterator iterator for scanning current pages
* tbmres current-page data
+ * prefetch_iterator iterator for prefetching ahead of current page
+ * prefetch_pages # pages prefetch iterator is ahead of current
+ * prefetch_target target prefetch distance
* ----------------
*/
typedef struct BitmapHeapScanState
TIDBitmap *tbm;
TBMIterator *tbmiterator;
TBMIterateResult *tbmres;
+ TBMIterator *prefetch_iterator;
+ int prefetch_pages;
+ int prefetch_target;
} BitmapHeapScanState;
/* ----------------
#define USE_POSIX_FADVISE
#endif
+/*
+ * USE_PREFETCH code should be compiled only if we have a way to implement
+ * prefetching. (This is decoupled from USE_POSIX_FADVISE because there
+ * might in future be support for alternative low-level prefetch APIs.)
+ */
+#ifdef USE_POSIX_FADVISE
+#define USE_PREFETCH
+#endif
+
/*
* This is the default directory in which AF_UNIX socket files are
* placed. Caution: changing this risks breaking your existing client
extern void BufTableDelete(BufferTag *tagPtr, uint32 hashcode);
/* localbuf.c */
-extern BufferDesc *LocalBufferAlloc(SMgrRelation reln, ForkNumber forkNum,
+extern void LocalPrefetchBuffer(SMgrRelation smgr, ForkNumber forkNum,
+ BlockNumber blockNum);
+extern BufferDesc *LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum,
BlockNumber blockNum, bool *foundPtr);
extern void MarkLocalBufferDirty(Buffer buffer);
extern void DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum,
extern bool zero_damaged_pages;
extern int bgwriter_lru_maxpages;
extern double bgwriter_lru_multiplier;
+extern int target_prefetch_pages;
/* in buf_init.c */
extern PGDLLIMPORT char *BufferBlocks;
/*
* s for functions in bufmgr.c
*/
+extern void PrefetchBuffer(Relation reln, ForkNumber forkNum,
+ BlockNumber blockNum);
extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
extern Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum,
BlockNumber blockNum, ReadBufferMode mode,
extern File PathNameOpenFile(FileName fileName, int fileFlags, int fileMode);
extern File OpenTemporaryFile(bool interXact);
extern void FileClose(File file);
+extern int FilePrefetch(File file, off_t offset, int amount);
extern int FileRead(File file, char *buffer, int amount);
extern int FileWrite(File file, char *buffer, int amount);
extern int FileSync(File file);
bool isTemp, bool isRedo);
extern void smgrextend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool isTemp);
+extern void smgrprefetch(SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum);
extern void smgrread(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer);
extern void smgrwrite(SMgrRelation reln, ForkNumber forknum,
extern void mdunlink(RelFileNode rnode, ForkNumber forknum, bool isRedo);
extern void mdextend(SMgrRelation reln, ForkNumber forknum,
BlockNumber blocknum, char *buffer, bool isTemp);
+extern void mdprefetch(SMgrRelation reln, ForkNumber forknum,
+ BlockNumber blocknum);
extern void mdread(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
char *buffer);
extern void mdwrite(SMgrRelation reln, ForkNumber forknum,