#define WAIT_EVENT_CLASS_MASK 0xFF000000
#define WAIT_EVENT_ID_MASK 0x0000FFFF
+/*
+ * Hash tables for storing custom wait event ids and their names in
+ * shared memory.
+ *
+ * WaitEventExtensionHashById is used to find the name from a event id.
+ * Any backend can search it to find custom wait events.
+ *
+ * WaitEventExtensionHashByName is used to find the event ID from a name.
+ * It is used to ensure that no duplicated entries are registered.
+ *
+ * The size of the hash table is based on the assumption that
+ * WAIT_EVENT_EXTENSION_BASH_INIT_SIZE is enough for most cases, and it seems
+ * unlikely that the number of entries will reach
+ * WAIT_EVENT_EXTENSION_BASH_MAX_SIZE.
+ */
+static HTAB *WaitEventExtensionHashById; /* find names from IDs */
+static HTAB *WaitEventExtensionHashByName; /* find IDs from names */
+
+#define WAIT_EVENT_EXTENSION_HASH_INIT_SIZE 16
+#define WAIT_EVENT_EXTENSION_HASH_MAX_SIZE 128
+
+/* hash table entries */
+typedef struct WaitEventExtensionEntryById
+{
+ uint16 event_id; /* hash key */
+ char wait_event_name[NAMEDATALEN]; /* custom wait event name */
+} WaitEventExtensionEntryById;
+
+typedef struct WaitEventExtensionEntryByName
+{
+ char wait_event_name[NAMEDATALEN]; /* hash key */
+ uint16 event_id; /* wait event ID */
+} WaitEventExtensionEntryByName;
+
+
/* dynamic allocation counter for custom wait events in extensions */
typedef struct WaitEventExtensionCounterData
{
#define NUM_BUILTIN_WAIT_EVENT_EXTENSION \
(WAIT_EVENT_EXTENSION_FIRST_USER_DEFINED - WAIT_EVENT_EXTENSION)
-/*
- * This is indexed by event ID minus NUM_BUILTIN_WAIT_EVENT_EXTENSION, and
- * stores the names of all dynamically-created event IDs known to the current
- * process. Any unused entries in the array will contain NULL.
- */
-static const char **WaitEventExtensionNames = NULL;
-static int WaitEventExtensionNamesAllocated = 0;
+/* wait event info for extensions */
+#define WAIT_EVENT_EXTENSION_INFO(eventId) (PG_WAIT_EXTENSION | eventId)
static const char *GetWaitEventExtensionIdentifier(uint16 eventId);
/*
- * Return the space for dynamic allocation counter.
+ * Return the space for dynamic shared hash tables and dynamic allocation counter.
*/
Size
WaitEventExtensionShmemSize(void)
{
- return sizeof(WaitEventExtensionCounterData);
+ Size sz;
+
+ sz = MAXALIGN(sizeof(WaitEventExtensionCounterData));
+ sz = add_size(sz, hash_estimate_size(WAIT_EVENT_EXTENSION_HASH_MAX_SIZE,
+ sizeof(WaitEventExtensionEntryById)));
+ sz = add_size(sz, hash_estimate_size(WAIT_EVENT_EXTENSION_HASH_MAX_SIZE,
+ sizeof(WaitEventExtensionEntryByName)));
+ return sz;
}
/*
- * Allocate shmem space for dynamic allocation counter.
+ * Allocate shmem space for dynamic shared hash and dynamic allocation counter.
*/
void
WaitEventExtensionShmemInit(void)
{
bool found;
+ HASHCTL info;
WaitEventExtensionCounter = (WaitEventExtensionCounterData *)
ShmemInitStruct("WaitEventExtensionCounterData",
- WaitEventExtensionShmemSize(), &found);
+ sizeof(WaitEventExtensionCounterData), &found);
if (!found)
{
WaitEventExtensionCounter->nextId = NUM_BUILTIN_WAIT_EVENT_EXTENSION;
SpinLockInit(&WaitEventExtensionCounter->mutex);
}
+
+ /* initialize or attach the hash tables to store custom wait events */
+ info.keysize = sizeof(uint16);
+ info.entrysize = sizeof(WaitEventExtensionEntryById);
+ WaitEventExtensionHashById = ShmemInitHash("WaitEventExtension hash by id",
+ WAIT_EVENT_EXTENSION_HASH_INIT_SIZE,
+ WAIT_EVENT_EXTENSION_HASH_MAX_SIZE,
+ &info,
+ HASH_ELEM | HASH_BLOBS);
+
+ /* key is a NULL-terminated string */
+ info.keysize = sizeof(char[NAMEDATALEN]);
+ info.entrysize = sizeof(WaitEventExtensionEntryByName);
+ WaitEventExtensionHashByName = ShmemInitHash("WaitEventExtension hash by name",
+ WAIT_EVENT_EXTENSION_HASH_INIT_SIZE,
+ WAIT_EVENT_EXTENSION_HASH_MAX_SIZE,
+ &info,
+ HASH_ELEM | HASH_STRINGS);
}
/*
- * Allocate a new event ID and return the wait event.
+ * Allocate a new event ID and return the wait event info.
+ *
+ * If the wait event name is already defined, this does not allocate a new
+ * entry; it returns the wait event information associated to the name.
*/
uint32
-WaitEventExtensionNew(void)
+WaitEventExtensionNew(const char *wait_event_name)
{
uint16 eventId;
+ bool found;
+ WaitEventExtensionEntryByName *entry_by_name;
+ WaitEventExtensionEntryById *entry_by_id;
+
+ /* Check the limit of the length of the event name */
+ if (strlen(wait_event_name) >= NAMEDATALEN)
+ elog(ERROR,
+ "cannot use custom wait event string longer than %u characters",
+ NAMEDATALEN - 1);
+
+ /*
+ * Check if the wait event info associated to the name is already defined,
+ * and return it if so.
+ */
+ LWLockAcquire(WaitEventExtensionLock, LW_SHARED);
+ entry_by_name = (WaitEventExtensionEntryByName *)
+ hash_search(WaitEventExtensionHashByName, wait_event_name,
+ HASH_FIND, &found);
+ LWLockRelease(WaitEventExtensionLock);
+ if (found)
+ return WAIT_EVENT_EXTENSION_INFO(entry_by_name->event_id);
- Assert(LWLockHeldByMeInMode(AddinShmemInitLock, LW_EXCLUSIVE));
+ /*
+ * Allocate and register a new wait event. Recheck if the event name
+ * exists, as it could be possible that a concurrent process has inserted
+ * one with the same name since the LWLock acquired again here was
+ * previously released.
+ */
+ LWLockAcquire(WaitEventExtensionLock, LW_EXCLUSIVE);
+ entry_by_name = (WaitEventExtensionEntryByName *)
+ hash_search(WaitEventExtensionHashByName, wait_event_name,
+ HASH_FIND, &found);
+ if (found)
+ {
+ LWLockRelease(WaitEventExtensionLock);
+ return WAIT_EVENT_EXTENSION_INFO(entry_by_name->event_id);
+ }
+ /* Allocate a new event Id */
SpinLockAcquire(&WaitEventExtensionCounter->mutex);
- if (WaitEventExtensionCounter->nextId > PG_UINT16_MAX)
+ if (WaitEventExtensionCounter->nextId >= WAIT_EVENT_EXTENSION_HASH_MAX_SIZE)
{
SpinLockRelease(&WaitEventExtensionCounter->mutex);
ereport(ERROR,
SpinLockRelease(&WaitEventExtensionCounter->mutex);
- return PG_WAIT_EXTENSION | eventId;
-}
-
-/*
- * Register a dynamic wait event name for extension in the lookup table
- * of the current process.
- *
- * This routine will save a pointer to the wait event name passed as an argument,
- * so the name should be allocated in a backend-lifetime context
- * (shared memory, TopMemoryContext, static constant, or similar).
- *
- * The "wait_event_name" will be user-visible as a wait event name, so try to
- * use a name that fits the style for those.
- */
-void
-WaitEventExtensionRegisterName(uint32 wait_event_info,
- const char *wait_event_name)
-{
- uint32 classId;
- uint16 eventId;
-
- classId = wait_event_info & WAIT_EVENT_CLASS_MASK;
- eventId = wait_event_info & WAIT_EVENT_ID_MASK;
-
- /* Check the wait event class. */
- if (classId != PG_WAIT_EXTENSION)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid wait event class %u", classId));
-
- /* This should only be called for user-defined wait event. */
- if (eventId < NUM_BUILTIN_WAIT_EVENT_EXTENSION)
- ereport(ERROR,
- errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("invalid wait event ID %u", eventId));
+ /* Register the new wait event */
+ entry_by_id = (WaitEventExtensionEntryById *)
+ hash_search(WaitEventExtensionHashById, &eventId,
+ HASH_ENTER, &found);
+ Assert(!found);
+ strlcpy(entry_by_id->wait_event_name, wait_event_name,
+ sizeof(entry_by_id->wait_event_name));
- /* Convert to array index. */
- eventId -= NUM_BUILTIN_WAIT_EVENT_EXTENSION;
+ entry_by_name = (WaitEventExtensionEntryByName *)
+ hash_search(WaitEventExtensionHashByName, wait_event_name,
+ HASH_ENTER, &found);
+ Assert(!found);
+ entry_by_name->event_id = eventId;
- /* If necessary, create or enlarge array. */
- if (eventId >= WaitEventExtensionNamesAllocated)
- {
- uint32 newalloc;
-
- newalloc = pg_nextpower2_32(Max(8, eventId + 1));
-
- if (WaitEventExtensionNames == NULL)
- WaitEventExtensionNames = (const char **)
- MemoryContextAllocZero(TopMemoryContext,
- newalloc * sizeof(char *));
- else
- WaitEventExtensionNames =
- repalloc0_array(WaitEventExtensionNames, const char *,
- WaitEventExtensionNamesAllocated, newalloc);
- WaitEventExtensionNamesAllocated = newalloc;
- }
+ LWLockRelease(WaitEventExtensionLock);
- WaitEventExtensionNames[eventId] = wait_event_name;
+ return WAIT_EVENT_EXTENSION_INFO(eventId);
}
/*
static const char *
GetWaitEventExtensionIdentifier(uint16 eventId)
{
+ bool found;
+ WaitEventExtensionEntryById *entry;
+
/* Built-in event? */
if (eventId < NUM_BUILTIN_WAIT_EVENT_EXTENSION)
return "Extension";
- /*
- * It is a user-defined wait event, so look at WaitEventExtensionNames[].
- * However, it is possible that the name has never been registered by
- * calling WaitEventExtensionRegisterName() in the current process, in
- * which case give up and return "extension".
- */
- eventId -= NUM_BUILTIN_WAIT_EVENT_EXTENSION;
+ /* It is a user-defined wait event, so lookup hash table. */
+ LWLockAcquire(WaitEventExtensionLock, LW_SHARED);
+ entry = (WaitEventExtensionEntryById *)
+ hash_search(WaitEventExtensionHashById, &eventId,
+ HASH_FIND, &found);
+ LWLockRelease(WaitEventExtensionLock);
- if (eventId >= WaitEventExtensionNamesAllocated ||
- WaitEventExtensionNames[eventId] == NULL)
- return "extension";
+ if (!entry)
+ elog(ERROR, "could not find custom wait event name for ID %u",
+ eventId);
- return WaitEventExtensionNames[eventId];
+ return entry->wait_event_name;
}
PG_MODULE_MAGIC;
-PG_FUNCTION_INFO_V1(worker_spi_init);
PG_FUNCTION_INFO_V1(worker_spi_launch);
PGDLLEXPORT void worker_spi_main(Datum main_arg) pg_attribute_noreturn();
-/* Shared memory state */
-typedef struct worker_spi_state
-{
- /* the wait event defined during initialization phase */
- uint32 wait_event;
-} worker_spi_state;
-
-static worker_spi_state *wsstate = NULL; /* pointer to shared memory */
-
-static shmem_request_hook_type prev_shmem_request_hook = NULL;
-static shmem_request_hook_type prev_shmem_startup_hook = NULL;
-
-static void worker_spi_shmem_request(void);
-static void worker_spi_shmem_startup(void);
-static void worker_spi_shmem_init(void);
-static Size worker_spi_memsize(void);
-
/* GUC variables */
static int worker_spi_naptime = 10;
static int worker_spi_total_workers = 2;
static char *worker_spi_database = NULL;
+/* value cached, fetched from shared memory */
+static uint32 worker_spi_wait_event_main = 0;
typedef struct worktable
{
const char *name;
} worktable;
-static void
-worker_spi_shmem_request(void)
-{
- if (prev_shmem_request_hook)
- prev_shmem_request_hook();
-
- RequestAddinShmemSpace(worker_spi_memsize());
-}
-
-static void
-worker_spi_shmem_startup(void)
-{
- if (prev_shmem_startup_hook)
- prev_shmem_startup_hook();
-
- worker_spi_shmem_init();
-}
-
-static Size
-worker_spi_memsize(void)
-{
- return MAXALIGN(sizeof(worker_spi_state));
-}
-
-/*
- * Initialize the shared memory state of worker_spi.
- *
- * This routine allocates a new wait event when called the first time.
- * On follow-up calls, the name of the wait event associated with the
- * existing shared memory state is registered.
- */
-static void
-worker_spi_shmem_init(void)
-{
- bool found;
-
- wsstate = NULL;
-
- /* Create or attach to the shared memory state */
- LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
- wsstate = ShmemInitStruct("worker_spi State",
- sizeof(worker_spi_state),
- &found);
-
- /* Define a new wait event */
- if (!found)
- wsstate->wait_event = WaitEventExtensionNew();
-
- LWLockRelease(AddinShmemInitLock);
-
- /*
- * Register the wait event in the lookup table of the current process.
- */
- WaitEventExtensionRegisterName(wsstate->wait_event, "worker_spi_main");
- return;
-}
-
/*
* Initialize workspace for a worker process: create the schema if it doesn't
* already exist.
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
- /* Create (if necessary) and attach to our shared memory area. */
- worker_spi_shmem_init();
-
/* Connect to our database */
BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
{
int ret;
+ /* First time, allocate or get the custom wait event */
+ if (worker_spi_wait_event_main == 0)
+ worker_spi_wait_event_main = WaitEventExtensionNew("worker_spi_main");
+
/*
* Background workers mustn't call usleep() or any direct equivalent:
* instead, they may wait on their process latch, which sleeps as
(void) WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
worker_spi_naptime * 1000L,
- wsstate->wait_event);
+ worker_spi_wait_event_main);
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
MarkGUCPrefixReserved("worker_spi");
- prev_shmem_request_hook = shmem_request_hook;
- shmem_request_hook = worker_spi_shmem_request;
- prev_shmem_startup_hook = shmem_startup_hook;
- shmem_startup_hook = worker_spi_shmem_startup;
-
/* set up common data for all our workers */
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
}
}
-/*
- * Wrapper to initialize a session with the shared memory state
- * used by this module. This is a convenience routine to be able to
- * see the custom wait event stored in shared memory without loading
- * through shared_preload_libraries.
- */
-Datum
-worker_spi_init(PG_FUNCTION_ARGS)
-{
- /* Create (if necessary) and attach to our shared memory area. */
- worker_spi_shmem_init();
-
- PG_RETURN_VOID();
-}
-
/*
* Dynamically launch an SPI worker.
*/
BgwHandleStatus status;
pid_t pid;
- /* Create (if necessary) and attach to our shared memory area. */
- worker_spi_shmem_init();
-
memset(&worker, 0, sizeof(worker));
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;