static void RemoveGXact(GlobalTransaction gxact);
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
-static char *ProcessTwoPhaseBuffer(FullTransactionId xid,
+static char *ProcessTwoPhaseBuffer(TransactionId xid,
XLogRecPtr prepare_start_lsn,
bool fromdisk, bool setParent, bool setNextXid);
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
const char *gid, TimestampTz prepared_at, Oid owner,
Oid databaseid);
-static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
+static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
/*
/************************************************************************/
/*
- * Compute FullTransactionId for the given TransactionId, using the current
- * epoch.
+ * Compute the FullTransactionId for the given TransactionId.
+ *
+ * The wrap logic is safe here because the span of active xids cannot exceed one
+ * epoch at any given time.
*/
static inline FullTransactionId
-FullTransactionIdFromCurrentEpoch(TransactionId xid)
+AdjustToFullTransactionId(TransactionId xid)
{
- FullTransactionId fxid;
FullTransactionId nextFullXid;
+ TransactionId nextXid;
uint32 epoch;
- nextFullXid = ReadNextFullTransactionId();
+ Assert(TransactionIdIsValid(xid));
+
+ LWLockAcquire(XidGenLock, LW_SHARED);
+ nextFullXid = TransamVariables->nextXid;
+ LWLockRelease(XidGenLock);
+
+ nextXid = XidFromFullTransactionId(nextFullXid);
epoch = EpochFromFullTransactionId(nextFullXid);
+ if (unlikely(xid > nextXid))
+ {
+ /* Wraparound occurred, must be from a prev epoch. */
+ Assert(epoch > 0);
+ epoch--;
+ }
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- return fxid;
+ return FullTransactionIdFromEpochAndXid(epoch, xid);
}
static inline int
-TwoPhaseFilePath(char *path, FullTransactionId fxid)
+TwoPhaseFilePath(char *path, TransactionId xid)
{
+ FullTransactionId fxid = AdjustToFullTransactionId(xid);
+
return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
EpochFromFullTransactionId(fxid),
XidFromFullTransactionId(fxid));
* If it looks OK (has a valid magic number and CRC), return the palloc'd
* contents of the file, issuing an error when finding corrupted data. If
* missing_ok is true, which indicates that missing files can be safely
- * ignored, then return NULL. This state can be reached when doing recovery
- * after discarding two-phase files from other epochs.
+ * ignored, then return NULL. This state can be reached when doing recovery.
*/
static char *
ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
pg_crc32c calc_crc,
file_crc;
int r;
- FullTransactionId fxid;
- fxid = FullTransactionIdFromCurrentEpoch(xid);
- TwoPhaseFilePath(path, fxid);
+ TwoPhaseFilePath(path, xid);
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
if (fd < 0)
AtEOXact_PgStat(isCommit, false);
/*
- * And now we can clean up any files we may have left. These should be
- * from the current epoch.
+ * And now we can clean up any files we may have left.
*/
if (ondisk)
- {
- FullTransactionId fxid;
-
- fxid = FullTransactionIdFromCurrentEpoch(xid);
- RemoveTwoPhaseFile(fxid, true);
- }
+ RemoveTwoPhaseFile(xid, true);
MyLockedGxact = NULL;
*
* If giveWarning is false, do not complain about file-not-present;
* this is an expected case during WAL replay.
- *
- * This routine is used at early stages at recovery where future and
- * past orphaned files are checked, hence the FullTransactionId to build
- * a complete file name fit for the removal.
*/
static void
-RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
+RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
{
char path[MAXPGPATH];
- TwoPhaseFilePath(path, fxid);
+ TwoPhaseFilePath(path, xid);
if (unlink(path))
if (errno != ENOENT || giveWarning)
ereport(WARNING,
char path[MAXPGPATH];
pg_crc32c statefile_crc;
int fd;
- FullTransactionId fxid;
/* Recompute CRC */
INIT_CRC32C(statefile_crc);
COMP_CRC32C(statefile_crc, content, len);
FIN_CRC32C(statefile_crc);
- /* Use current epoch */
- fxid = FullTransactionIdFromCurrentEpoch(xid);
- TwoPhaseFilePath(path, fxid);
+ TwoPhaseFilePath(path, xid);
fd = OpenTransientFile(path,
O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
* Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
* This is called once at the beginning of recovery, saving any extra
* lookups in the future. Two-phase files that are newer than the
- * minimum XID horizon are discarded on the way. Two-phase files with
- * an epoch older or newer than the current checkpoint's record epoch
- * are also discarded.
+ * minimum XID horizon are discarded on the way.
*/
void
restoreTwoPhaseData(void)
if (strlen(clde->d_name) == 16 &&
strspn(clde->d_name, "0123456789ABCDEF") == 16)
{
+ TransactionId xid;
FullTransactionId fxid;
char *buf;
fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
- buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
+ xid = XidFromFullTransactionId(fxid);
+
+ buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
true, false, false);
if (buf == NULL)
continue;
TransactionId origNextXid = XidFromFullTransactionId(nextXid);
TransactionId result = origNextXid;
TransactionId *xids = NULL;
- uint32 epoch = EpochFromFullTransactionId(nextXid);
int nxids = 0;
int allocsize = 0;
int i;
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
- FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
xid = gxact->xid;
- /*
- * All two-phase files with past and future epoch in pg_twophase are
- * gone at this point, so we're OK to rely on only the current epoch.
- */
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- buf = ProcessTwoPhaseBuffer(fxid,
+ buf = ProcessTwoPhaseBuffer(xid,
gxact->prepare_start_lsn,
gxact->ondisk, false, true);
StandbyRecoverPreparedTransactions(void)
{
int i;
- uint32 epoch;
- FullTransactionId nextFullXid;
-
- /* get current epoch */
- nextFullXid = ReadNextFullTransactionId();
- epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
- FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
xid = gxact->xid;
- /*
- * At this stage, we're OK to work with the current epoch as all past
- * and future files have been already discarded.
- */
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- buf = ProcessTwoPhaseBuffer(fxid,
+ buf = ProcessTwoPhaseBuffer(xid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf != NULL)
RecoverPreparedTransactions(void)
{
int i;
- uint32 epoch;
- FullTransactionId nextFullXid;
-
- /* get current epoch */
- nextFullXid = ReadNextFullTransactionId();
- epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
- FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
char *bufptr;
TransactionId *subxids;
const char *gid;
- /*
- * At this stage, we're OK to work with the current epoch as all past
- * and future files have been already discarded.
- */
xid = gxact->xid;
/*
* SubTransSetParent has been set before, if the prepared transaction
* generated xid assignment records.
*/
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- buf = ProcessTwoPhaseBuffer(fxid,
+ buf = ProcessTwoPhaseBuffer(xid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf == NULL)
/*
* ProcessTwoPhaseBuffer
*
- * Given a FullTransactionId, read it either from disk or read it directly
+ * Given a transaction id, read it either from disk or read it directly
* via shmem xlog record pointer using the provided "prepare_start_lsn".
*
* If setParent is true, set up subtransaction parent linkages.
* value scanned.
*/
static char *
-ProcessTwoPhaseBuffer(FullTransactionId fxid,
+ProcessTwoPhaseBuffer(TransactionId xid,
XLogRecPtr prepare_start_lsn,
bool fromdisk,
bool setParent, bool setNextXid)
{
FullTransactionId nextXid = TransamVariables->nextXid;
+ TransactionId origNextXid = XidFromFullTransactionId(nextXid);
TransactionId *subxids;
char *buf;
TwoPhaseFileHeader *hdr;
int i;
- TransactionId xid = XidFromFullTransactionId(fxid);
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
if (!fromdisk)
Assert(prepare_start_lsn != InvalidXLogRecPtr);
- /*
- * Reject full XID if too new. Note that this discards files from future
- * epochs.
- */
- if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
+ /* Already processed? */
+ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
{
if (fromdisk)
{
ereport(WARNING,
- (errmsg("removing future two-phase state file of epoch %u for transaction %u",
- EpochFromFullTransactionId(fxid), xid)));
- RemoveTwoPhaseFile(fxid, true);
- }
- else
- {
- ereport(WARNING,
- (errmsg("removing future two-phase state from memory for transaction %u",
+ (errmsg("removing stale two-phase state file for transaction %u",
xid)));
- PrepareRedoRemove(xid, true);
- }
- return NULL;
- }
-
- /* Discard files from past epochs */
- if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid))
- {
- if (fromdisk)
- {
- ereport(WARNING,
- (errmsg("removing past two-phase state file of epoch %u for transaction %u",
- EpochFromFullTransactionId(fxid), xid)));
- RemoveTwoPhaseFile(fxid, true);
+ RemoveTwoPhaseFile(xid, true);
}
else
{
ereport(WARNING,
- (errmsg("removing past two-phase state from memory for transaction %u",
+ (errmsg("removing stale two-phase state from memory for transaction %u",
xid)));
PrepareRedoRemove(xid, true);
}
return NULL;
}
- /* Already processed? */
- if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ /* Reject XID if too new */
+ if (TransactionIdFollowsOrEquals(xid, origNextXid))
{
if (fromdisk)
{
ereport(WARNING,
- (errmsg("removing stale two-phase state file for transaction %u",
+ (errmsg("removing future two-phase state file for transaction %u",
xid)));
- RemoveTwoPhaseFile(fxid, true);
+ RemoveTwoPhaseFile(xid, true);
}
else
{
ereport(WARNING,
- (errmsg("removing stale two-phase state from memory for transaction %u",
+ (errmsg("removing future two-phase state from memory for transaction %u",
xid)));
PrepareRedoRemove(xid, true);
}
if (!XLogRecPtrIsInvalid(start_lsn))
{
char path[MAXPGPATH];
- FullTransactionId fxid;
- /* Use current epoch */
- fxid = FullTransactionIdFromCurrentEpoch(hdr->xid);
- TwoPhaseFilePath(path, fxid);
+ TwoPhaseFilePath(path, hdr->xid);
if (access(path, F_OK) == 0)
{
*/
elog(DEBUG2, "removing 2PC data for transaction %u", xid);
if (gxact->ondisk)
- {
- FullTransactionId fxid;
-
- /*
- * We should deal with a file at the current epoch here.
- */
- fxid = FullTransactionIdFromCurrentEpoch(xid);
- RemoveTwoPhaseFile(fxid, giveWarning);
- }
+ RemoveTwoPhaseFile(xid, giveWarning);
RemoveGXact(gxact);
}