Skip to content

Commit 612f530

Browse files
Ivan Lazarevfunny-falcon
Ivan Lazarev
authored andcommitted
[PBCKP-287] fix. added cfs files grouping and processing cfs segment in single thread manner
1 parent 343c6a0 commit 612f530

File tree

5 files changed

+257
-79
lines changed

5 files changed

+257
-79
lines changed

‎src/backup.c

+207-77
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ static bool pg_is_in_recovery(PGconn *conn);
6565
static bool pg_is_superuser(PGconn *conn);
6666
static void check_server_version(PGconn *conn, PGNodeInfo *nodeInfo);
6767
static void confirm_block_size(PGconn *conn, const char *name, int blcksz);
68-
static void set_cfs_datafiles(parray *files, const char *root, char *relative, size_t i);
68+
static size_t rewind_and_mark_cfs_datafiles(parray *files, const char *root, char *relative, size_t i);
69+
static void group_cfs_segments(parray *files, size_t first, size_t last);
70+
static bool remove_excluded_files_criterion(void *value, void *exclude_args);
71+
static void backup_cfs_segment(int i, pgFile *file, backup_files_arg *arguments);
72+
static void process_file(int i, pgFile *file, backup_files_arg *arguments);
6973

7074
static StopBackupCallbackParams stop_callback_params;
7175

@@ -2054,8 +2058,6 @@ static void *
20542058
backup_files(void *arg)
20552059
{
20562060
int i;
2057-
char from_fullpath[MAXPGPATH];
2058-
char to_fullpath[MAXPGPATH];
20592061
static time_t prev_time;
20602062

20612063
backup_files_arg *arguments = (backup_files_arg *) arg;
@@ -2067,7 +2069,6 @@ backup_files(void *arg)
20672069
for (i = 0; i < n_backup_files_list; i++)
20682070
{
20692071
pgFile *file = (pgFile *) parray_get(arguments->files_list, i);
2070-
pgFile *prev_file = NULL;
20712072

20722073
/* We have already copied all directories */
20732074
if (S_ISDIR(file->mode))
@@ -2087,6 +2088,9 @@ backup_files(void *arg)
20872088
}
20882089
}
20892090

2091+
if (file->skip_cfs_nested)
2092+
continue;
2093+
20902094
if (!pg_atomic_test_set_flag(&file->lock))
20912095
continue;
20922096

@@ -2097,89 +2101,146 @@ backup_files(void *arg)
20972101
elog(progress ? INFO : LOG, "Progress: (%d/%d). Process file \"%s\"",
20982102
i + 1, n_backup_files_list, file->rel_path);
20992103

2100-
/* Handle zero sized files */
2101-
if (file->size == 0)
2104+
if (file->is_cfs)
21022105
{
2103-
file->write_size = 0;
2104-
continue;
2105-
}
2106-
2107-
/* construct destination filepath */
2108-
if (file->external_dir_num == 0)
2109-
{
2110-
join_path_components(from_fullpath, arguments->from_root, file->rel_path);
2111-
join_path_components(to_fullpath, arguments->to_root, file->rel_path);
2106+
backup_cfs_segment(i, file, arguments);
21122107
}
21132108
else
21142109
{
2115-
char external_dst[MAXPGPATH];
2116-
char *external_path = parray_get(arguments->external_dirs,
2117-
file->external_dir_num - 1);
2110+
process_file(i, file, arguments);
2111+
}
2112+
}
2113+
2114+
/* ssh connection to longer needed */
2115+
fio_disconnect();
2116+
2117+
/* Data files transferring is successful */
2118+
arguments->ret = 0;
2119+
2120+
return NULL;
2121+
}
2122+
2123+
static void
2124+
process_file(int i, pgFile *file, backup_files_arg *arguments)
2125+
{
2126+
char from_fullpath[MAXPGPATH];
2127+
char to_fullpath[MAXPGPATH];
2128+
pgFile *prev_file = NULL;
2129+
2130+
elog(progress ? INFO : LOG, "Progress: (%d/%zu). Process file \"%s\"",
2131+
i + 1, parray_num(arguments->files_list), file->rel_path);
21182132

2119-
makeExternalDirPathByNum(external_dst,
2133+
/* Handle zero sized files */
2134+
if (file->size == 0)
2135+
{
2136+
file->write_size = 0;
2137+
return;
2138+
}
2139+
2140+
/* construct from_fullpath & to_fullpath */
2141+
if (file->external_dir_num == 0)
2142+
{
2143+
join_path_components(from_fullpath, arguments->from_root, file->rel_path);
2144+
join_path_components(to_fullpath, arguments->to_root, file->rel_path);
2145+
}
2146+
else
2147+
{
2148+
char external_dst[MAXPGPATH];
2149+
char *external_path = parray_get(arguments->external_dirs,
2150+
file->external_dir_num - 1);
2151+
2152+
makeExternalDirPathByNum(external_dst,
21202153
arguments->external_prefix,
21212154
file->external_dir_num);
21222155

2123-
join_path_components(to_fullpath, external_dst, file->rel_path);
2124-
join_path_components(from_fullpath, external_path, file->rel_path);
2125-
}
2126-
2127-
/* Encountered some strange beast */
2128-
if (!S_ISREG(file->mode))
2129-
elog(WARNING, "Unexpected type %d of file \"%s\", skipping",
2130-
file->mode, from_fullpath);
2156+
join_path_components(to_fullpath, external_dst, file->rel_path);
2157+
join_path_components(from_fullpath, external_path, file->rel_path);
2158+
}
21312159

2132-
/* Check that file exist in previous backup */
2133-
if (current.backup_mode != BACKUP_MODE_FULL)
2134-
{
2135-
pgFile **prev_file_tmp = NULL;
2136-
prev_file_tmp = (pgFile **) parray_bsearch(arguments->prev_filelist,
2137-
file, pgFileCompareRelPathWithExternal);
2138-
if (prev_file_tmp)
2139-
{
2140-
/* File exists in previous backup */
2141-
file->exists_in_prev = true;
2142-
prev_file = *prev_file_tmp;
2143-
}
2144-
}
2160+
/* Encountered some strange beast */
2161+
if (!S_ISREG(file->mode))
2162+
{
2163+
elog(WARNING, "Unexpected type %d of file \"%s\", skipping",
2164+
file->mode, from_fullpath);
2165+
return;
2166+
}
21452167

2146-
/* backup file */
2147-
if (file->is_datafile && !file->is_cfs)
2148-
{
2149-
backup_data_file(file, from_fullpath, to_fullpath,
2150-
arguments->prev_start_lsn,
2151-
current.backup_mode,
2152-
instance_config.compress_alg,
2153-
instance_config.compress_level,
2154-
arguments->nodeInfo->checksum_version,
2155-
arguments->hdr_map, false);
2156-
}
2157-
else
2168+
/* Check that file exist in previous backup */
2169+
if (current.backup_mode != BACKUP_MODE_FULL)
2170+
{
2171+
pgFile **prevFileTmp = NULL;
2172+
prevFileTmp = (pgFile **) parray_bsearch(arguments->prev_filelist,
2173+
file, pgFileCompareRelPathWithExternal);
2174+
if (prevFileTmp)
21582175
{
2159-
backup_non_data_file(file, prev_file, from_fullpath, to_fullpath,
2160-
current.backup_mode, current.parent_backup, true);
2176+
/* File exists in previous backup */
2177+
file->exists_in_prev = true;
2178+
prev_file = *prevFileTmp;
21612179
}
2180+
}
21622181

2163-
if (file->write_size == FILE_NOT_FOUND)
2164-
continue;
2182+
/* backup file */
2183+
if (file->is_datafile && !file->is_cfs)
2184+
{
2185+
backup_data_file(file, from_fullpath, to_fullpath,
2186+
arguments->prev_start_lsn,
2187+
current.backup_mode,
2188+
instance_config.compress_alg,
2189+
instance_config.compress_level,
2190+
arguments->nodeInfo->checksum_version,
2191+
arguments->hdr_map, false);
2192+
}
2193+
else
2194+
{
2195+
backup_non_data_file(file, prev_file, from_fullpath, to_fullpath,
2196+
current.backup_mode, current.parent_backup, true);
2197+
}
21652198

2166-
if (file->write_size == BYTES_INVALID)
2167-
{
2168-
elog(LOG, "Skipping the unchanged file: \"%s\"", from_fullpath);
2169-
continue;
2170-
}
2199+
if (file->write_size == FILE_NOT_FOUND)
2200+
return;
21712201

2172-
elog(LOG, "File \"%s\". Copied "INT64_FORMAT " bytes",
2173-
from_fullpath, file->write_size);
2202+
if (file->write_size == BYTES_INVALID)
2203+
{
2204+
elog(LOG, "Skipping the unchanged file: \"%s\"", from_fullpath);
2205+
return;
21742206
}
21752207

2176-
/* ssh connection to longer needed */
2177-
fio_disconnect();
2208+
elog(LOG, "File \"%s\". Copied "INT64_FORMAT " bytes",
2209+
from_fullpath, file->write_size);
21782210

2179-
/* Data files transferring is successful */
2180-
arguments->ret = 0;
2211+
}
21812212

2182-
return NULL;
2213+
static void
2214+
backup_cfs_segment(int i, pgFile *file, backup_files_arg *arguments) {
2215+
pgFile *data_file = file;
2216+
pgFile *cfm_file = NULL;
2217+
pgFile *data_bck_file = NULL;
2218+
pgFile *cfm_bck_file = NULL;
2219+
2220+
while (data_file->cfs_chain)
2221+
{
2222+
data_file = data_file->cfs_chain;
2223+
if (data_file->forkName == cfm)
2224+
cfm_file = data_file;
2225+
if (data_file->forkName == cfs_bck)
2226+
data_bck_file = data_file;
2227+
if (data_file->forkName == cfm_bck)
2228+
cfm_bck_file = data_file;
2229+
}
2230+
data_file = file;
2231+
Assert(cfm_file); /* ensure we always have cfm exist */
2232+
2233+
elog(LOG, "backup CFS segment %s, data_file=%s, cfm_file=%s, data_bck_file=%s, cfm_bck_file=%s",
2234+
data_file->name, data_file->name, cfm_file->name, data_bck_file == NULL? "NULL": data_bck_file->name, cfm_bck_file == NULL? "NULL": cfm_bck_file->name);
2235+
2236+
/* storing cfs in order data_bck_file -> cfm_bck -> data_file -> map */
2237+
if (cfm_bck_file)
2238+
process_file(i, cfm_bck_file, arguments);
2239+
if (data_bck_file)
2240+
process_file(i, data_bck_file, arguments);
2241+
process_file(i, cfm_file, arguments);
2242+
process_file(i, data_file, arguments);
2243+
elog(LOG, "Backup CFS segment %s done", data_file->name);
21832244
}
21842245

21852246
/*
@@ -2209,11 +2270,12 @@ parse_filelist_filenames(parray *files, const char *root)
22092270
*/
22102271
if (strcmp(file->name, "pg_compression") == 0)
22112272
{
2273+
/* processing potential cfs tablespace */
22122274
Oid tblspcOid;
22132275
Oid dbOid;
22142276
char tmp_rel_path[MAXPGPATH];
22152277
/*
2216-
* Check that the file is located under
2278+
* Check that pg_compression is located under
22172279
* TABLESPACE_VERSION_DIRECTORY
22182280
*/
22192281
sscanf_result = sscanf(file->rel_path, PG_TBLSPC_DIR "/%u/%s/%u",
@@ -2222,8 +2284,12 @@ parse_filelist_filenames(parray *files, const char *root)
22222284
/* Yes, it is */
22232285
if (sscanf_result == 2 &&
22242286
strncmp(tmp_rel_path, TABLESPACE_VERSION_DIRECTORY,
2225-
strlen(TABLESPACE_VERSION_DIRECTORY)) == 0)
2226-
set_cfs_datafiles(files, root, file->rel_path, i);
2287+
strlen(TABLESPACE_VERSION_DIRECTORY)) == 0) {
2288+
/* rewind index to the beginning of cfs tablespace */
2289+
size_t start = rewind_and_mark_cfs_datafiles(files, root, file->rel_path, i);
2290+
/* group every to cfs segments chains */
2291+
group_cfs_segments(files, start, i);
2292+
}
22272293
}
22282294
}
22292295

@@ -2238,19 +2304,18 @@ parse_filelist_filenames(parray *files, const char *root)
22382304
*/
22392305
int unlogged_file_num = i - 1;
22402306
pgFile *unlogged_file = (pgFile *) parray_get(files,
2241-
unlogged_file_num);
2307+
unlogged_file_num);
22422308

22432309
unlogged_file_reloid = file->relOid;
22442310

22452311
while (unlogged_file_num >= 0 &&
22462312
(unlogged_file_reloid != 0) &&
22472313
(unlogged_file->relOid == unlogged_file_reloid))
22482314
{
2249-
pgFileFree(unlogged_file);
2250-
parray_remove(files, unlogged_file_num);
2315+
/* flagged to remove from list on stage 2 */
2316+
unlogged_file->remove_from_list = true;
22512317

22522318
unlogged_file_num--;
2253-
i--;
22542319

22552320
unlogged_file = (pgFile *) parray_get(files,
22562321
unlogged_file_num);
@@ -2260,6 +2325,68 @@ parse_filelist_filenames(parray *files, const char *root)
22602325

22612326
i++;
22622327
}
2328+
2329+
/* stage 2. clean up from temporary tables */
2330+
parray_remove_if(files, remove_excluded_files_criterion, NULL, pgFileFree);
2331+
}
2332+
2333+
static bool
2334+
remove_excluded_files_criterion(void *value, void *exclude_args) {
2335+
pgFile *file = (pgFile*)value;
2336+
return file->remove_from_list;
2337+
}
2338+
2339+
/*
2340+
* For every cfs segment do group its files to linked list, datafile on the head.
2341+
* All non data files of segment moved to linked list and marked to skip in backup processing threads.
2342+
* @param first - first index of cfs tablespace files
2343+
* @param last - last index of cfs tablespace files
2344+
*/
2345+
void group_cfs_segments(parray *files, size_t first, size_t last) {/* grouping cfs files by relOid.segno, removing leafs of group */
2346+
2347+
for (;first <= last; first++)
2348+
{
2349+
pgFile *file = parray_get(files, first);
2350+
2351+
if (file->is_cfs)
2352+
{
2353+
pgFile *cfs_file = file;
2354+
size_t counter = first + 1;
2355+
pgFile *chain_file = parray_get(files, counter);
2356+
2357+
bool has_cfm = false; /* flag for later assertion the cfm file also exist */
2358+
2359+
elog(LOG, "Preprocessing cfs file %s, %u.%d", cfs_file->name, cfs_file->relOid, cfs_file->segno);
2360+
2361+
elog(LOG, "Checking file %s, %u.%d as cfs chain", chain_file->name, chain_file->relOid, chain_file->segno);
2362+
2363+
/* scanning cfs segment files */
2364+
while (cfs_file->relOid == chain_file->relOid &&
2365+
cfs_file->segno == chain_file->segno)
2366+
{
2367+
elog(LOG, "Grouping cfs chain file %s, %d.%d", chain_file->name, chain_file->relOid, chain_file->segno);
2368+
chain_file->skip_cfs_nested = true;
2369+
cfs_file->cfs_chain = chain_file; /* adding to cfs group */
2370+
cfs_file = chain_file;
2371+
2372+
/* next file */
2373+
counter++;
2374+
chain_file = parray_get(files, counter);
2375+
elog(LOG, "Checking file %s, %u.%d as cfs chain", chain_file->name, chain_file->relOid, chain_file->segno);
2376+
}
2377+
2378+
/* assertion - we always have cfs data + cfs map files */
2379+
cfs_file = file;
2380+
for (; cfs_file; cfs_file = cfs_file->cfs_chain) {
2381+
elog(LOG, "searching cfm in %s, chain is %s", cfs_file->name, cfs_file->cfs_chain == NULL? "NULL": cfs_file->cfs_chain->name);
2382+
has_cfm = cfs_file->forkName == cfm;
2383+
}
2384+
Assert(has_cfm);
2385+
2386+
/* shifting to last cfs segment file */
2387+
first = counter-1;
2388+
}
2389+
}
22632390
}
22642391

22652392
/* If file is equal to pg_compression, then we consider this tablespace as
@@ -2273,9 +2400,11 @@ parse_filelist_filenames(parray *files, const char *root)
22732400
* tblspcOid/TABLESPACE_VERSION_DIRECTORY/dboid/1
22742401
* tblspcOid/TABLESPACE_VERSION_DIRECTORY/dboid/1.cfm
22752402
* tblspcOid/TABLESPACE_VERSION_DIRECTORY/pg_compression
2403+
*
2404+
* @returns index of first tablespace entry, i.e tblspcOid/TABLESPACE_VERSION_DIRECTORY
22762405
*/
2277-
static void
2278-
set_cfs_datafiles(parray *files, const char *root, char *relative, size_t i)
2406+
static size_t
2407+
rewind_and_mark_cfs_datafiles(parray *files, const char *root, char *relative, size_t i)
22792408
{
22802409
int len;
22812410
int p;
@@ -2311,6 +2440,7 @@ set_cfs_datafiles(parray *files, const char *root, char *relative, size_t i)
23112440
}
23122441
}
23132442
free(cfs_tblspc_path);
2443+
return p+1;
23142444
}
23152445

23162446
/*

0 commit comments

Comments
 (0)