@@ -22,30 +22,15 @@ and catch these removals, processing them appropriately.
22
22
import Queue from 'queue' ;
23
23
import config from 'config' ;
24
24
import logger from 'common-files/utils/logger.mjs' ;
25
+ import { web3 } from 'common-files/utils/contract.mjs' ;
25
26
26
- const { MAX_QUEUE } = config ;
27
+ const { MAX_QUEUE , CONFIRMATION_POLL_TIME , CONFIRMATIONS } = config ;
27
28
const fastQueue = new Queue ( { autostart : true , concurrency : 1 } ) ;
28
29
const slowQueue = new Queue ( { autostart : true , concurrency : 1 } ) ;
30
+ const removed = { } ; // singleton holding transaction hashes of any removed events
29
31
const stopQueue = new Queue ( { autostart : false , concurrency : 1 } ) ;
30
32
export const queues = [ fastQueue , slowQueue , stopQueue ] ;
31
33
32
- /**
33
- This function will return a promise that resolves to true when the next highest
34
- priority queue is empty (priority goes in reverse order, prioity 0 is highest
35
- priority)
36
- */
37
- function nextHigherPriorityQueueHasEmptied ( priority ) {
38
- return new Promise ( resolve => {
39
- const listener = ( ) => resolve ( ) ;
40
- if ( priority === 0 ) resolve ( ) ; // resolve if we're the highest priority queue
41
- queues [ priority - 1 ] . once ( 'end' , listener ) ; // or when the higher priority queue empties
42
- if ( queues [ priority - 1 ] . length === 0 ) {
43
- queues [ priority - 1 ] . removeListener ( 'end' , listener ) ;
44
- resolve ( ) ; // or if it's already empty
45
- }
46
- } ) ;
47
- }
48
-
49
34
/**
50
35
This function will wait until all the functions currently in a queue have been
51
36
processed. It's useful if you want to ensure that Nightfall has had an opportunity
@@ -63,17 +48,64 @@ function flushQueue(priority) {
63
48
64
49
async function enqueueEvent ( callback , priority , args ) {
65
50
queues [ priority ] . push ( async ( ) => {
66
- // await nextHigherPriorityQueueHasEmptied(priority);
67
- // prevent conditionalmakeblock from running until fastQueue is emptied
68
51
return callback ( args ) ;
69
52
} ) ;
70
53
}
71
54
55
+ /**
56
+ This function will return when the event has been on chain for <confirmations> blocks
57
+ It's useful to call this if you want to be sure that your event has been confirmed
58
+ multiple times before you go ahead and process it.
59
+ */
60
+
61
+ function waitForConfirmation ( eventObject ) {
62
+ logger . debug ( `Confirming event ${ eventObject . event } ` ) ;
63
+ const { transactionHash, blockNumber } = eventObject ;
64
+ return new Promise ( ( resolve , reject ) => {
65
+ let confirmedBlocks = 0 ;
66
+ const id = setInterval ( async ( ) => {
67
+ // get the transaction that caused the event
68
+ // if it's been in a chain reorg then it will have been removed.
69
+ if ( removed [ transactionHash ] > 0 ) {
70
+ clearInterval ( id ) ;
71
+ removed [ eventObject . transactionHash ] -- ;
72
+ reject (
73
+ new Error (
74
+ `Event removed; probable chain reorg. Event was ${ eventObject . event } , transaction hash was ${ transactionHash } ` ,
75
+ ) ,
76
+ ) ;
77
+ }
78
+ const currentBlock = await web3 . eth . getBlock ( 'latest' ) ;
79
+ if ( currentBlock . number - blockNumber > confirmedBlocks ) {
80
+ confirmedBlocks = currentBlock . number - blockNumber ;
81
+ }
82
+ if ( confirmedBlocks >= CONFIRMATIONS ) {
83
+ clearInterval ( id ) ;
84
+ logger . debug (
85
+ `Event ${ eventObject . event } has been confirmed ${
86
+ currentBlock . number - blockNumber
87
+ } times`,
88
+ ) ;
89
+ resolve ( eventObject ) ;
90
+ }
91
+ } , CONFIRMATION_POLL_TIME ) ;
92
+ } ) ;
93
+ }
94
+
72
95
async function dequeueEvent ( priority ) {
73
96
queues [ priority ] . shift ( ) ;
74
97
}
75
98
76
99
async function queueManager ( eventObject , eventArgs ) {
100
+ if ( eventObject . removed ) {
101
+ // in this model we don't queue removals but we can use them to reject the event
102
+ // Note the event object and its removal have the same transactionHash.
103
+ // Also note that we can get more than one removal because the event could be re-mined
104
+ // and removed again - so we need to keep count of the removals.
105
+ if ( ! removed [ eventObject . transactionHash ] ) removed [ eventObject . transactionHash ] = 0 ;
106
+ removed [ eventObject . transactionHash ] ++ ; // store the removal; waitForConfirmation will read this and reject.
107
+ return ;
108
+ }
77
109
// First element of eventArgs must be the eventHandlers object
78
110
const [ eventHandlers , ...args ] = eventArgs ;
79
111
// handlers contains the functions needed to handle particular types of event,
@@ -84,30 +116,23 @@ async function queueManager(eventObject, eventArgs) {
84
116
}
85
117
// pull up the priority for the event being handled (removers have identical priority)
86
118
const priority = eventHandlers . priority [ eventObject . event ] ;
87
- // if the event was removed then we have a chain reorg and need to reset our
88
- // layer 2 state accordingly.
89
- if ( eventObject . removed ) {
90
- if ( ! eventHandlers . removers [ eventObject . event ] ) {
91
- logger . debug ( `Unknown event removal ${ eventObject . event } ignored` ) ;
92
- return ;
93
- }
94
- logger . info ( `Queueing event removal ${ eventObject . event } ` ) ;
95
- queues [ priority ] . push ( async ( ) => {
96
- await nextHigherPriorityQueueHasEmptied ( priority ) ; // prevent eventHandlers running until the higher priority queue has emptied
97
- return eventHandlers . removers [ eventObject . event ] ( eventObject , args ) ;
98
- } ) ;
99
- // otherwise queue the event for processing.
100
- } else {
101
- logger . info ( `Queueing event ${ eventObject . event } ` ) ;
102
- queues [ priority ] . push ( async ( ) => {
103
- // await nextHigherPriorityQueueHasEmptied(priority); // prevent eventHandlers running until the higher priority queue has emptied
119
+ logger . info (
120
+ `Queueing event ${ eventObject . event } , with transaction hash ${ eventObject . transactionHash } and priority ${ priority } ` ,
121
+ ) ;
122
+ queues [ priority ] . push ( async ( ) => {
123
+ // we won't even think about processing an event until it's been confirmed many times
124
+ try {
125
+ await waitForConfirmation ( eventObject ) ;
104
126
return eventHandlers [ eventObject . event ] ( eventObject , args ) ;
105
- } ) ;
106
- }
127
+ } catch ( err ) {
128
+ return logger . warn ( err . message ) ;
129
+ }
130
+ } ) ;
131
+ // }
107
132
// the queue shouldn't get too long if we're keeping up with the blockchain.
108
133
if ( queues [ priority ] . length > MAX_QUEUE )
109
134
logger . warn ( `The event queue has more than ${ MAX_QUEUE } events` ) ;
110
135
}
111
136
112
137
/* ignore unused exports */
113
- export { flushQueue , enqueueEvent , dequeueEvent , queueManager } ;
138
+ export { flushQueue , enqueueEvent , queueManager , dequeueEvent , waitForConfirmation } ;
0 commit comments