static void RemoveGXact(GlobalTransaction gxact);
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
-static char *ProcessTwoPhaseBuffer(TransactionId xid,
+static char *ProcessTwoPhaseBuffer(FullTransactionId xid,
XLogRecPtr prepare_start_lsn,
bool fromdisk, bool setParent, bool setNextXid);
static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
const char *gid, TimestampTz prepared_at, Oid owner,
Oid databaseid);
-static void RemoveTwoPhaseFile(TransactionId xid, bool giveWarning);
+static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
/*
/************************************************************************/
/*
- * Compute the FullTransactionId for the given TransactionId.
- *
- * The wrap logic is safe here because the span of active xids cannot exceed one
- * epoch at any given time.
+ * Compute FullTransactionId for the given TransactionId, using the current
+ * epoch.
*/
static inline FullTransactionId
-AdjustToFullTransactionId(TransactionId xid)
+FullTransactionIdFromCurrentEpoch(TransactionId xid)
{
+ FullTransactionId fxid;
FullTransactionId nextFullXid;
- TransactionId nextXid;
uint32 epoch;
- Assert(TransactionIdIsValid(xid));
-
- LWLockAcquire(XidGenLock, LW_SHARED);
- nextFullXid = TransamVariables->nextXid;
- LWLockRelease(XidGenLock);
-
- nextXid = XidFromFullTransactionId(nextFullXid);
+ nextFullXid = ReadNextFullTransactionId();
epoch = EpochFromFullTransactionId(nextFullXid);
- if (unlikely(xid > nextXid))
- {
- /* Wraparound occurred, must be from a prev epoch. */
- Assert(epoch > 0);
- epoch--;
- }
- return FullTransactionIdFromEpochAndXid(epoch, xid);
+ fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+ return fxid;
}
static inline int
-TwoPhaseFilePath(char *path, TransactionId xid)
+TwoPhaseFilePath(char *path, FullTransactionId fxid)
{
- FullTransactionId fxid = AdjustToFullTransactionId(xid);
-
return snprintf(path, MAXPGPATH, TWOPHASE_DIR "/%08X%08X",
EpochFromFullTransactionId(fxid),
XidFromFullTransactionId(fxid));
* If it looks OK (has a valid magic number and CRC), return the palloc'd
* contents of the file, issuing an error when finding corrupted data. If
* missing_ok is true, which indicates that missing files can be safely
- * ignored, then return NULL. This state can be reached when doing recovery.
+ * ignored, then return NULL. This state can be reached when doing recovery
+ * after discarding two-phase files from other epochs.
*/
static char *
ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
pg_crc32c calc_crc,
file_crc;
int r;
+ FullTransactionId fxid;
- TwoPhaseFilePath(path, xid);
+ fxid = FullTransactionIdFromCurrentEpoch(xid);
+ TwoPhaseFilePath(path, fxid);
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
if (fd < 0)
AtEOXact_PgStat(isCommit, false);
/*
- * And now we can clean up any files we may have left.
+ * And now we can clean up any files we may have left. These should be
+ * from the current epoch.
*/
if (ondisk)
- RemoveTwoPhaseFile(xid, true);
+ {
+ FullTransactionId fxid;
+
+ fxid = FullTransactionIdFromCurrentEpoch(xid);
+ RemoveTwoPhaseFile(fxid, true);
+ }
MyLockedGxact = NULL;
*
* If giveWarning is false, do not complain about file-not-present;
* this is an expected case during WAL replay.
+ *
+ * This routine is used at early stages at recovery where future and
+ * past orphaned files are checked, hence the FullTransactionId to build
+ * a complete file name fit for the removal.
*/
static void
-RemoveTwoPhaseFile(TransactionId xid, bool giveWarning)
+RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning)
{
char path[MAXPGPATH];
- TwoPhaseFilePath(path, xid);
+ TwoPhaseFilePath(path, fxid);
if (unlink(path))
if (errno != ENOENT || giveWarning)
ereport(WARNING,
char path[MAXPGPATH];
pg_crc32c statefile_crc;
int fd;
+ FullTransactionId fxid;
/* Recompute CRC */
INIT_CRC32C(statefile_crc);
COMP_CRC32C(statefile_crc, content, len);
FIN_CRC32C(statefile_crc);
- TwoPhaseFilePath(path, xid);
+ /* Use current epoch */
+ fxid = FullTransactionIdFromCurrentEpoch(xid);
+ TwoPhaseFilePath(path, fxid);
fd = OpenTransientFile(path,
O_CREAT | O_TRUNC | O_WRONLY | PG_BINARY);
* Scan pg_twophase and fill TwoPhaseState depending on the on-disk data.
* This is called once at the beginning of recovery, saving any extra
* lookups in the future. Two-phase files that are newer than the
- * minimum XID horizon are discarded on the way.
+ * minimum XID horizon are discarded on the way. Two-phase files with
+ * an epoch older or newer than the current checkpoint's record epoch
+ * are also discarded.
*/
void
restoreTwoPhaseData(void)
if (strlen(clde->d_name) == 16 &&
strspn(clde->d_name, "0123456789ABCDEF") == 16)
{
- TransactionId xid;
FullTransactionId fxid;
char *buf;
fxid = FullTransactionIdFromU64(strtou64(clde->d_name, NULL, 16));
- xid = XidFromFullTransactionId(fxid);
-
- buf = ProcessTwoPhaseBuffer(xid, InvalidXLogRecPtr,
+ buf = ProcessTwoPhaseBuffer(fxid, InvalidXLogRecPtr,
true, false, false);
if (buf == NULL)
continue;
TransactionId origNextXid = XidFromFullTransactionId(nextXid);
TransactionId result = origNextXid;
TransactionId *xids = NULL;
+ uint32 epoch = EpochFromFullTransactionId(nextXid);
int nxids = 0;
int allocsize = 0;
int i;
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
+ FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
xid = gxact->xid;
- buf = ProcessTwoPhaseBuffer(xid,
+ /*
+ * All two-phase files with past and future epoch in pg_twophase are
+ * gone at this point, so we're OK to rely on only the current epoch.
+ */
+ fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+ buf = ProcessTwoPhaseBuffer(fxid,
gxact->prepare_start_lsn,
gxact->ondisk, false, true);
StandbyRecoverPreparedTransactions(void)
{
int i;
+ uint32 epoch;
+ FullTransactionId nextFullXid;
+
+ /* get current epoch */
+ nextFullXid = ReadNextFullTransactionId();
+ epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
+ FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
xid = gxact->xid;
- buf = ProcessTwoPhaseBuffer(xid,
+ /*
+ * At this stage, we're OK to work with the current epoch as all past
+ * and future files have been already discarded.
+ */
+ fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+ buf = ProcessTwoPhaseBuffer(fxid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf != NULL)
RecoverPreparedTransactions(void)
{
int i;
+ uint32 epoch;
+ FullTransactionId nextFullXid;
+
+ /* get current epoch */
+ nextFullXid = ReadNextFullTransactionId();
+ epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
TransactionId xid;
+ FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
char *bufptr;
TransactionId *subxids;
const char *gid;
+ /*
+ * At this stage, we're OK to work with the current epoch as all past
+ * and future files have been already discarded.
+ */
xid = gxact->xid;
/*
* SubTransSetParent has been set before, if the prepared transaction
* generated xid assignment records.
*/
- buf = ProcessTwoPhaseBuffer(xid,
+ fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+ buf = ProcessTwoPhaseBuffer(fxid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf == NULL)
/*
* ProcessTwoPhaseBuffer
*
- * Given a transaction id, read it either from disk or read it directly
+ * Given a FullTransactionId, read it either from disk or read it directly
* via shmem xlog record pointer using the provided "prepare_start_lsn".
*
* If setParent is true, set up subtransaction parent linkages.
* value scanned.
*/
static char *
-ProcessTwoPhaseBuffer(TransactionId xid,
+ProcessTwoPhaseBuffer(FullTransactionId fxid,
XLogRecPtr prepare_start_lsn,
bool fromdisk,
bool setParent, bool setNextXid)
{
FullTransactionId nextXid = TransamVariables->nextXid;
- TransactionId origNextXid = XidFromFullTransactionId(nextXid);
TransactionId *subxids;
char *buf;
TwoPhaseFileHeader *hdr;
int i;
+ TransactionId xid = XidFromFullTransactionId(fxid);
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
if (!fromdisk)
Assert(prepare_start_lsn != InvalidXLogRecPtr);
- /* Reject XID if too new */
- if (TransactionIdFollowsOrEquals(xid, origNextXid))
+ /*
+ * Reject full XID if too new. Note that this discards files from future
+ * epochs.
+ */
+ if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
{
if (fromdisk)
{
ereport(WARNING,
- (errmsg("removing future two-phase state file for transaction %u",
- xid)));
- RemoveTwoPhaseFile(xid, true);
+ (errmsg("removing future two-phase state file of epoch %u for transaction %u",
+ EpochFromFullTransactionId(fxid), xid)));
+ RemoveTwoPhaseFile(fxid, true);
}
else
{
return NULL;
}
+ /* Discard files from past epochs */
+ if (EpochFromFullTransactionId(fxid) < EpochFromFullTransactionId(nextXid))
+ {
+ if (fromdisk)
+ {
+ ereport(WARNING,
+ (errmsg("removing past two-phase state file of epoch %u for transaction %u",
+ EpochFromFullTransactionId(fxid), xid)));
+ RemoveTwoPhaseFile(fxid, true);
+ }
+ else
+ {
+ ereport(WARNING,
+ (errmsg("removing past two-phase state from memory for transaction %u",
+ xid)));
+ PrepareRedoRemove(xid, true);
+ }
+ return NULL;
+ }
+
/* Already processed? */
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
{
ereport(WARNING,
(errmsg("removing stale two-phase state file for transaction %u",
xid)));
- RemoveTwoPhaseFile(xid, true);
+ RemoveTwoPhaseFile(fxid, true);
}
else
{
if (!XLogRecPtrIsInvalid(start_lsn))
{
char path[MAXPGPATH];
+ FullTransactionId fxid;
- TwoPhaseFilePath(path, hdr->xid);
+ /* Use current epoch */
+ fxid = FullTransactionIdFromCurrentEpoch(hdr->xid);
+ TwoPhaseFilePath(path, fxid);
if (access(path, F_OK) == 0)
{
*/
elog(DEBUG2, "removing 2PC data for transaction %u", xid);
if (gxact->ondisk)
- RemoveTwoPhaseFile(xid, giveWarning);
+ {
+ FullTransactionId fxid;
+
+ /*
+ * We should deal with a file at the current epoch here.
+ */
+ fxid = FullTransactionIdFromCurrentEpoch(xid);
+ RemoveTwoPhaseFile(fxid, giveWarning);
+ }
RemoveGXact(gxact);
}