Skip to content

Commit ad4dd06

Browse files
committed
Add general limit on DSM memory which can be allocated by the AQO extension to
store learning data. Also, use common DSA area to place data and query texts. Default limit on DSM memory is 100 MB. TODO: remove meaningless dsa variables.
1 parent 3bd2bb3 commit ad4dd06

File tree

5 files changed

+132
-31
lines changed

5 files changed

+132
-31
lines changed

‎aqo.c

+15-2
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ _PG_init(void)
231231
"Max number of feature spaces that AQO can operate with.",
232232
NULL,
233233
&fs_max_items,
234-
1000,
234+
10000,
235235
1, INT_MAX,
236236
PGC_SUSET,
237237
0,
@@ -244,7 +244,7 @@ _PG_init(void)
244244
"Max number of feature subspaces that AQO can operate with.",
245245
NULL,
246246
&fss_max_items,
247-
1000,
247+
100000,
248248
0, INT_MAX,
249249
PGC_SUSET,
250250
0,
@@ -266,6 +266,19 @@ _PG_init(void)
266266
NULL
267267
);
268268

269+
DefineCustomIntVariable("aqo.dsm_size_max",
270+
"Maximum size of dynamic shared memory which AQO could allocate to store learning data.",
271+
NULL,
272+
&dsm_size_max,
273+
100,
274+
0, INT_MAX,
275+
PGC_SUSET,
276+
0,
277+
NULL,
278+
NULL,
279+
NULL
280+
);
281+
269282
prev_shmem_startup_hook = shmem_startup_hook;
270283
shmem_startup_hook = aqo_init_shmem;
271284
prev_planner_hook = planner_hook;

‎aqo_shared.c

-2
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,6 @@ aqo_init_shmem(void)
198198
aqo_state->qtext_trancheid = LWLockNewTrancheId();
199199
aqo_state->qtexts_changed = false;
200200
aqo_state->data_dsa_handler = DSM_HANDLE_INVALID;
201-
aqo_state->data_trancheid = LWLockNewTrancheId();
202201
aqo_state->data_changed = false;
203202
aqo_state->queries_changed = false;
204203

@@ -244,7 +243,6 @@ aqo_init_shmem(void)
244243
LWLockRegisterTranche(aqo_state->qtexts_lock.tranche, "AQO QTexts Lock Tranche");
245244
LWLockRegisterTranche(aqo_state->qtext_trancheid, "AQO Query Texts Tranche");
246245
LWLockRegisterTranche(aqo_state->data_lock.tranche, "AQO Data Lock Tranche");
247-
LWLockRegisterTranche(aqo_state->data_trancheid, "AQO Data Tranche");
248246
LWLockRegisterTranche(aqo_state->queries_lock.tranche, "AQO Queries Lock Tranche");
249247

250248
if (!IsUnderPostmaster)

‎aqo_shared.h

-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ typedef struct AQOSharedState
3838

3939
LWLock data_lock; /* Lock for shared fields below */
4040
dsa_handle data_dsa_handler;
41-
int data_trancheid;
4241
bool data_changed;
4342

4443
LWLock queries_lock; /* lock for access to queries storage */
@@ -52,7 +51,6 @@ extern HTAB *fss_htab;
5251

5352
extern int fs_max_items; /* Max number of feature spaces that AQO can operate */
5453
extern int fss_max_items;
55-
extern int querytext_max_size;
5654

5755
extern Size aqo_memsize(void);
5856
extern void reset_dsm_cache(void);

‎storage.c

+116-25
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,11 @@ typedef enum {
6161
} aqo_queries_cols;
6262

6363
typedef void* (*form_record_t) (void *ctx, size_t *size);
64-
typedef void (*deform_record_t) (void *data, size_t size);
64+
typedef bool (*deform_record_t) (void *data, size_t size);
6565

6666

6767
int querytext_max_size = 1000;
68+
int dsm_size_max = 100; /* in MB */
6869

6970
HTAB *stat_htab = NULL;
7071
HTAB *queries_htab = NULL;
@@ -642,7 +643,7 @@ data_store(const char *filename, form_record_t callback,
642643
return -1;
643644
}
644645

645-
static void
646+
static bool
646647
_deform_stat_record_cb(void *data, size_t size)
647648
{
648649
bool found;
@@ -656,24 +657,35 @@ _deform_stat_record_cb(void *data, size_t size)
656657
entry = (StatEntry *) hash_search(stat_htab, &queryid, HASH_ENTER, &found);
657658
Assert(!found);
658659
memcpy(entry, data, sizeof(StatEntry));
660+
return true;
659661
}
660662

661663
void
662664
aqo_stat_load(void)
663665
{
664-
long entries;
665-
666666
Assert(!LWLockHeldByMe(&aqo_state->stat_lock));
667667

668668
LWLockAcquire(&aqo_state->stat_lock, LW_EXCLUSIVE);
669-
entries = hash_get_num_entries(stat_htab);
670-
Assert(entries == 0);
669+
670+
/* Load on postmaster sturtup. So no any concurrent actions possible here. */
671+
Assert(hash_get_num_entries(stat_htab) == 0);
672+
671673
data_load(PGAQO_STAT_FILE, _deform_stat_record_cb, NULL);
672674

673675
LWLockRelease(&aqo_state->stat_lock);
674676
}
675677

676-
static void
678+
static bool
679+
_check_dsa_validity(dsa_pointer ptr)
680+
{
681+
if (DsaPointerIsValid(ptr))
682+
return true;
683+
684+
elog(LOG, "[AQO] DSA Pointer isn't valid. Is the memory limit exceeded?");
685+
return false;
686+
}
687+
688+
static bool
677689
_deform_qtexts_record_cb(void *data, size_t size)
678690
{
679691
bool found;
@@ -690,9 +702,19 @@ _deform_qtexts_record_cb(void *data, size_t size)
690702
Assert(!found);
691703

692704
entry->qtext_dp = dsa_allocate(qtext_dsa, len);
693-
Assert(DsaPointerIsValid(entry->qtext_dp));
705+
if (!_check_dsa_validity(entry->qtext_dp))
706+
{
707+
/*
708+
* DSA stuck into problems. Rollback changes. Return false in belief
709+
* that caller recognize it and don't try to call us more.
710+
*/
711+
(void) hash_search(qtexts_htab, &queryid, HASH_REMOVE, NULL);
712+
return false;
713+
}
714+
694715
strptr = (char *) dsa_get_address(qtext_dsa, entry->qtext_dp);
695716
strlcpy(strptr, query_string, len);
717+
return true;
696718
}
697719

698720
void
@@ -705,7 +727,15 @@ aqo_qtexts_load(void)
705727
Assert(qtext_dsa != NULL);
706728

707729
LWLockAcquire(&aqo_state->qtexts_lock, LW_EXCLUSIVE);
708-
Assert(hash_get_num_entries(qtexts_htab) == 0);
730+
731+
if (hash_get_num_entries(qtexts_htab) != 0)
732+
{
733+
/* Someone have done it concurrently. */
734+
elog(LOG, "[AQO] Another backend have loaded query texts concurrently.");
735+
LWLockRelease(&aqo_state->qtexts_lock);
736+
return;
737+
}
738+
709739
data_load(PGAQO_TEXT_FILE, _deform_qtexts_record_cb, NULL);
710740

711741
/* Check existence of default feature space */
@@ -725,7 +755,7 @@ aqo_qtexts_load(void)
725755
* Getting a data chunk from a caller, add a record into the 'ML data'
726756
* shmem hash table. Allocate and fill DSA chunk for variadic part of the data.
727757
*/
728-
static void
758+
static bool
729759
_deform_data_record_cb(void *data, size_t size)
730760
{
731761
bool found;
@@ -737,7 +767,7 @@ _deform_data_record_cb(void *data, size_t size)
737767

738768
Assert(LWLockHeldByMeInMode(&aqo_state->data_lock, LW_EXCLUSIVE));
739769
entry = (DataEntry *) hash_search(data_htab, &fentry->key,
740-
HASH_ENTER, &found);
770+
HASH_ENTER, &found);
741771
Assert(!found);
742772

743773
/* Copy fixed-size part of entry byte-by-byte even with caves */
@@ -747,9 +777,20 @@ _deform_data_record_cb(void *data, size_t size)
747777
sz = _compute_data_dsa(entry);
748778
Assert(sz + offsetof(DataEntry, data_dp) == size);
749779
entry->data_dp = dsa_allocate(data_dsa, sz);
750-
Assert(DsaPointerIsValid(entry->data_dp));
780+
781+
if (!_check_dsa_validity(entry->data_dp))
782+
{
783+
/*
784+
* DSA stuck into problems. Rollback changes. Return false in belief
785+
* that caller recognize it and don't try to call us more.
786+
*/
787+
(void) hash_search(data_htab, &fentry->key, HASH_REMOVE, NULL);
788+
return false;
789+
}
790+
751791
dsa_ptr = (char *) dsa_get_address(data_dsa, entry->data_dp);
752792
memcpy(dsa_ptr, ptr, sz);
793+
return true;
753794
}
754795

755796
void
@@ -759,14 +800,22 @@ aqo_data_load(void)
759800
Assert(data_dsa != NULL);
760801

761802
LWLockAcquire(&aqo_state->data_lock, LW_EXCLUSIVE);
762-
Assert(hash_get_num_entries(data_htab) == 0);
803+
804+
if (hash_get_num_entries(data_htab) != 0)
805+
{
806+
/* Someone have done it concurrently. */
807+
elog(LOG, "[AQO] Another backend have loaded query data concurrently.");
808+
LWLockRelease(&aqo_state->data_lock);
809+
return;
810+
}
811+
763812
data_load(PGAQO_DATA_FILE, _deform_data_record_cb, NULL);
764813

765814
aqo_state->data_changed = false; /* mem data is consistent with disk */
766815
LWLockRelease(&aqo_state->data_lock);
767816
}
768817

769-
static void
818+
static bool
770819
_deform_queries_record_cb(void *data, size_t size)
771820
{
772821
bool found;
@@ -780,20 +829,22 @@ _deform_queries_record_cb(void *data, size_t size)
780829
entry = (QueriesEntry *) hash_search(queries_htab, &queryid, HASH_ENTER, &found);
781830
Assert(!found);
782831
memcpy(entry, data, sizeof(QueriesEntry));
832+
return true;
783833
}
784834

785835
void
786836
aqo_queries_load(void)
787837
{
788-
long entries;
789838
bool found;
790839
uint64 queryid = 0;
791840

792841
Assert(!LWLockHeldByMe(&aqo_state->queries_lock));
793842

794843
LWLockAcquire(&aqo_state->queries_lock, LW_EXCLUSIVE);
795-
entries = hash_get_num_entries(queries_htab);
796-
Assert(entries == 0);
844+
845+
/* Load on postmaster sturtup. So no any concurrent actions possible here. */
846+
Assert(hash_get_num_entries(queries_htab) == 0);
847+
797848
data_load(PGAQO_QUERIES_FILE, _deform_queries_record_cb, NULL);
798849

799850
/* Check existence of default feature space */
@@ -836,14 +887,23 @@ data_load(const char *filename, deform_record_t callback, void *ctx)
836887
{
837888
void *data;
838889
size_t size;
890+
bool res;
839891

840892
if (fread(&size, sizeof(size), 1, file) != 1)
841893
goto read_error;
842894
data = palloc(size);
843895
if (fread(data, size, 1, file) != 1)
844896
goto read_error;
845-
callback(data, size);
897+
res = callback(data, size);
846898
pfree(data);
899+
900+
if (!res)
901+
{
902+
/* Error detected. Do not try to read tails of the storage. */
903+
elog(LOG, "[AQO] Because of an error skip %ld storage records.",
904+
num - i);
905+
break;
906+
}
847907
}
848908

849909
FreeFile(file);
@@ -896,11 +956,15 @@ dsa_init()
896956
Assert(aqo_state->data_dsa_handler == DSM_HANDLE_INVALID);
897957

898958
qtext_dsa = dsa_create(aqo_state->qtext_trancheid);
959+
Assert(qtext_dsa != NULL);
960+
961+
if (dsm_size_max > 0)
962+
dsa_set_size_limit(qtext_dsa, dsm_size_max * 1024 * 1024);
963+
899964
dsa_pin(qtext_dsa);
900965
aqo_state->qtexts_dsa_handler = dsa_get_handle(qtext_dsa);
901966

902-
data_dsa = dsa_create(aqo_state->data_trancheid);
903-
dsa_pin(data_dsa);
967+
data_dsa = qtext_dsa;
904968
aqo_state->data_dsa_handler = dsa_get_handle(data_dsa);
905969

906970
/* Load and initialize query texts hash table */
@@ -910,11 +974,10 @@ dsa_init()
910974
else
911975
{
912976
qtext_dsa = dsa_attach(aqo_state->qtexts_dsa_handler);
913-
data_dsa = dsa_attach(aqo_state->data_dsa_handler);
977+
data_dsa = qtext_dsa;
914978
}
915979

916980
dsa_pin_mapping(qtext_dsa);
917-
dsa_pin_mapping(data_dsa);
918981
MemoryContextSwitchTo(old_context);
919982
LWLockRelease(&aqo_state->lock);
920983

@@ -973,7 +1036,17 @@ aqo_qtext_store(uint64 queryid, const char *query_string)
9731036
entry->queryid = queryid;
9741037
size = size > querytext_max_size ? querytext_max_size : size;
9751038
entry->qtext_dp = dsa_allocate(qtext_dsa, size);
976-
Assert(DsaPointerIsValid(entry->qtext_dp));
1039+
1040+
if (!_check_dsa_validity(entry->qtext_dp))
1041+
{
1042+
/*
1043+
* DSA stuck into problems. Rollback changes. Return false in belief
1044+
* that caller recognize it and don't try to call us more.
1045+
*/
1046+
(void) hash_search(qtexts_htab, &queryid, HASH_REMOVE, NULL);
1047+
return false;
1048+
}
1049+
9771050
strptr = (char *) dsa_get_address(qtext_dsa, entry->qtext_dp);
9781051
strlcpy(strptr, query_string, size);
9791052
aqo_state->qtexts_changed = true;
@@ -1173,7 +1246,16 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
11731246

11741247
size = _compute_data_dsa(entry);
11751248
entry->data_dp = dsa_allocate0(data_dsa, size);
1176-
Assert(DsaPointerIsValid(entry->data_dp));
1249+
1250+
if (!_check_dsa_validity(entry->data_dp))
1251+
{
1252+
/*
1253+
* DSA stuck into problems. Rollback changes. Return false in belief
1254+
* that caller recognize it and don't try to call us more.
1255+
*/
1256+
(void) hash_search(data_htab, &key, HASH_REMOVE, NULL);
1257+
return false;
1258+
}
11771259
}
11781260

11791261
Assert(DsaPointerIsValid(entry->data_dp));
@@ -1195,7 +1277,16 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
11951277
/* Need to re-allocate DSA chunk */
11961278
dsa_free(data_dsa, entry->data_dp);
11971279
entry->data_dp = dsa_allocate0(data_dsa, size);
1198-
Assert(DsaPointerIsValid(entry->data_dp));
1280+
1281+
if (!_check_dsa_validity(entry->data_dp))
1282+
{
1283+
/*
1284+
* DSA stuck into problems. Rollback changes. Return false in belief
1285+
* that caller recognize it and don't try to call us more.
1286+
*/
1287+
(void) hash_search(data_htab, &key, HASH_REMOVE, NULL);
1288+
return false;
1289+
}
11991290
}
12001291
ptr = (char *) dsa_get_address(data_dsa, entry->data_dp);
12011292

‎storage.h

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ typedef struct QueriesEntry
8383
} QueriesEntry;
8484

8585
extern int querytext_max_size;
86+
extern int dsm_size_max;
8687

8788
extern HTAB *stat_htab;
8889
extern HTAB *qtexts_htab;

0 commit comments

Comments
 (0)