@@ -302,7 +302,9 @@ aqo_stat_store(uint64 queryid, bool use_aqo,
302
302
entry -> exec_time [pos ] = exec_time ;
303
303
entry -> est_error [pos ] = est_error ;
304
304
}
305
+
305
306
entry = memcpy (palloc (sizeof (StatEntry )), entry , sizeof (StatEntry ));
307
+ aqo_state -> stat_changed = true;
306
308
LWLockRelease (& aqo_state -> stat_lock );
307
309
return entry ;
308
310
}
@@ -424,14 +426,24 @@ aqo_stat_flush(void)
424
426
int ret ;
425
427
long entries ;
426
428
427
- LWLockAcquire (& aqo_state -> stat_lock , LW_SHARED );
429
+ /* Use exclusive lock to prevent concurrent flushing in different backends. */
430
+ LWLockAcquire (& aqo_state -> stat_lock , LW_EXCLUSIVE );
431
+
432
+ if (!aqo_state -> stat_changed )
433
+ /* Hash table wasn't changed, meaningless to store it in permanent storage */
434
+ goto end ;
435
+
428
436
entries = hash_get_num_entries (stat_htab );
429
437
hash_seq_init (& hash_seq , stat_htab );
430
438
ret = data_store (PGAQO_STAT_FILE , _form_stat_record_cb , entries ,
431
439
(void * ) & hash_seq );
432
440
if (ret != 0 )
433
441
hash_seq_term (& hash_seq );
442
+ else
443
+ /* Hash table and disk storage are now consistent */
444
+ aqo_state -> stat_changed = false;
434
445
446
+ end :
435
447
LWLockRelease (& aqo_state -> stat_lock );
436
448
}
437
449
@@ -468,7 +480,7 @@ aqo_qtexts_flush(void)
468
480
long entries ;
469
481
470
482
dsa_init ();
471
- LWLockAcquire (& aqo_state -> qtexts_lock , LW_SHARED );
483
+ LWLockAcquire (& aqo_state -> qtexts_lock , LW_EXCLUSIVE );
472
484
473
485
if (!aqo_state -> qtexts_changed )
474
486
/* XXX: mull over forced mode. */
@@ -480,7 +492,9 @@ aqo_qtexts_flush(void)
480
492
(void * ) & hash_seq );
481
493
if (ret != 0 )
482
494
hash_seq_term (& hash_seq );
483
- aqo_state -> qtexts_changed = false;
495
+ else
496
+ /* Hash table and disk storage are now consistent */
497
+ aqo_state -> qtexts_changed = false;
484
498
485
499
end :
486
500
LWLockRelease (& aqo_state -> qtexts_lock );
@@ -530,7 +544,7 @@ aqo_data_flush(void)
530
544
long entries ;
531
545
532
546
dsa_init ();
533
- LWLockAcquire (& aqo_state -> data_lock , LW_SHARED );
547
+ LWLockAcquire (& aqo_state -> data_lock , LW_EXCLUSIVE );
534
548
535
549
if (!aqo_state -> data_changed )
536
550
/* XXX: mull over forced mode. */
@@ -547,6 +561,7 @@ aqo_data_flush(void)
547
561
*/
548
562
hash_seq_term (& hash_seq );
549
563
else
564
+ /* Hash table and disk storage are now consistent */
550
565
aqo_state -> data_changed = false;
551
566
end :
552
567
LWLockRelease (& aqo_state -> data_lock );
@@ -573,14 +588,22 @@ aqo_queries_flush(void)
573
588
int ret ;
574
589
long entries ;
575
590
576
- LWLockAcquire (& aqo_state -> queries_lock , LW_SHARED );
591
+ LWLockAcquire (& aqo_state -> queries_lock , LW_EXCLUSIVE );
592
+
593
+ if (!aqo_state -> queries_changed )
594
+ goto end ;
595
+
577
596
entries = hash_get_num_entries (queries_htab );
578
597
hash_seq_init (& hash_seq , queries_htab );
579
598
ret = data_store (PGAQO_QUERIES_FILE , _form_queries_record_cb , entries ,
580
599
(void * ) & hash_seq );
581
600
if (ret != 0 )
582
601
hash_seq_term (& hash_seq );
602
+ else
603
+ /* Hash table and disk storage are now consistent */
604
+ aqo_state -> queries_changed = false;
583
605
606
+ end :
584
607
LWLockRelease (& aqo_state -> queries_lock );
585
608
}
586
609
@@ -620,7 +643,8 @@ data_store(const char *filename, form_record_t callback,
620
643
goto error ;
621
644
}
622
645
623
- (void ) durable_rename (tmpfile , filename , LOG );
646
+ /* Parallel (re)writing into a file haven't happen. */
647
+ (void ) durable_rename (tmpfile , filename , PANIC );
624
648
elog (LOG , "[AQO] %d records stored in file %s." , counter , filename );
625
649
return 0 ;
626
650
@@ -838,7 +862,7 @@ aqo_queries_load(void)
838
862
839
863
LWLockAcquire (& aqo_state -> queries_lock , LW_EXCLUSIVE );
840
864
841
- /* Load on postmaster sturtup . So no any concurrent actions possible here. */
865
+ /* Load on postmaster startup . So no any concurrent actions possible here. */
842
866
Assert (hash_get_num_entries (queries_htab ) == 0 );
843
867
844
868
data_load (PGAQO_QUERIES_FILE , _deform_queries_record_cb , NULL );
@@ -925,6 +949,9 @@ data_load(const char *filename, deform_record_t callback, void *ctx)
925
949
static void
926
950
on_shmem_shutdown (int code , Datum arg )
927
951
{
952
+ /*
953
+ * XXX: It can be expensive to rewrite a file on each shutdown of a backend.
954
+ */
928
955
aqo_qtexts_flush ();
929
956
aqo_data_flush ();
930
957
}
@@ -1200,6 +1227,7 @@ _aqo_data_remove(data_key *key)
1200
1227
1201
1228
if (hash_search (data_htab , key , HASH_REMOVE , NULL ) == NULL )
1202
1229
elog (PANIC , "[AQO] Inconsistent data hash table" );
1230
+
1203
1231
aqo_state -> data_changed = true;
1204
1232
}
1205
1233
@@ -1269,8 +1297,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
1269
1297
char * ptr ;
1270
1298
ListCell * lc ;
1271
1299
size_t size ;
1272
- bool tblOverflow ;
1273
- HASHACTION action ;
1300
+ bool tblOverflow ;
1301
+ HASHACTION action ;
1302
+ bool result ;
1274
1303
1275
1304
Assert (!LWLockHeldByMe (& aqo_state -> data_lock ));
1276
1305
@@ -1321,7 +1350,6 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
1321
1350
}
1322
1351
1323
1352
Assert (DsaPointerIsValid (entry -> data_dp ));
1324
- Assert (entry -> rows <= data -> rows ); /* Reserved for the future features */
1325
1353
1326
1354
if (entry -> cols != data -> cols || entry -> nrels != list_length (reloids ))
1327
1355
{
@@ -1387,8 +1415,9 @@ aqo_data_store(uint64 fs, int fss, OkNNrdata *data, List *reloids)
1387
1415
1388
1416
aqo_state -> data_changed = true;
1389
1417
end :
1418
+ result = aqo_state -> data_changed ;
1390
1419
LWLockRelease (& aqo_state -> data_lock );
1391
- return aqo_state -> data_changed ;
1420
+ return result ;
1392
1421
}
1393
1422
1394
1423
static void
@@ -1496,7 +1525,7 @@ load_aqo_data(uint64 fs, int fss, OkNNrdata *data, List **reloids,
1496
1525
1497
1526
dsa_init ();
1498
1527
1499
- LWLockAcquire (& aqo_state -> data_lock , LW_EXCLUSIVE );
1528
+ LWLockAcquire (& aqo_state -> data_lock , LW_SHARED );
1500
1529
1501
1530
if (!wideSearch )
1502
1531
{
@@ -1631,7 +1660,8 @@ aqo_data(PG_FUNCTION_ARGS)
1631
1660
ptr += sizeof (data_key );
1632
1661
1633
1662
if (entry -> cols > 0 )
1634
- values [AD_FEATURES ] = PointerGetDatum (form_matrix ((double * )ptr , entry -> rows , entry -> cols ));
1663
+ values [AD_FEATURES ] = PointerGetDatum (form_matrix ((double * ) ptr ,
1664
+ entry -> rows , entry -> cols ));
1635
1665
else
1636
1666
nulls [AD_FEATURES ] = true;
1637
1667
@@ -1719,7 +1749,9 @@ aqo_data_reset(void)
1719
1749
elog (ERROR , "[AQO] hash table corrupted" );
1720
1750
num_remove ++ ;
1721
1751
}
1722
- aqo_state -> data_changed = true;
1752
+
1753
+ if (num_remove > 0 )
1754
+ aqo_state -> data_changed = true;
1723
1755
LWLockRelease (& aqo_state -> data_lock );
1724
1756
if (num_remove != num_entries )
1725
1757
elog (ERROR , "[AQO] Query ML memory storage is corrupted or parallel access without a lock has detected." );
@@ -1831,6 +1863,7 @@ aqo_queries_store(uint64 queryid,
1831
1863
entry -> use_aqo = use_aqo ;
1832
1864
entry -> auto_tuning = auto_tuning ;
1833
1865
1866
+ aqo_state -> queries_changed = true;
1834
1867
LWLockRelease (& aqo_state -> queries_lock );
1835
1868
return true;
1836
1869
}
@@ -1856,7 +1889,10 @@ aqo_queries_reset(void)
1856
1889
elog (ERROR , "[AQO] hash table corrupted" );
1857
1890
num_remove ++ ;
1858
1891
}
1859
- aqo_state -> queries_changed = true;
1892
+
1893
+ if (num_remove > 0 )
1894
+ aqo_state -> queries_changed = true;
1895
+
1860
1896
LWLockRelease (& aqo_state -> queries_lock );
1861
1897
1862
1898
if (num_remove != num_entries - 1 )
0 commit comments