Skip to content

Commit 4efbddd

Browse files
author
Ilyas Ridhuan
authored
Merge pull request #852 from EYBlockchain/revert-795-zepedro/websockets-fix
Revert "feat: adding ack to block produced websocket 🔌"
2 parents a1c3db0 + 55c7d48 commit 4efbddd

File tree

11 files changed

+195
-208
lines changed

11 files changed

+195
-208
lines changed

‎cli/lib/nf3.mjs

+1-3
Original file line numberDiff line numberDiff line change
@@ -898,7 +898,7 @@ class Nf3 {
898898
};
899899
connection.onmessage = async message => {
900900
const msg = JSON.parse(message.data);
901-
const { id, type, txDataToSign, block, transactions } = msg;
901+
const { type, txDataToSign, block, transactions } = msg;
902902
logger.debug(`Proposer received websocket message of type ${type}`);
903903
if (type === 'block') {
904904
proposerQueue.push(async () => {
@@ -913,8 +913,6 @@ class Nf3 {
913913
blockProposeEmitter.emit('error', err, block, transactions);
914914
}
915915
});
916-
917-
connection.send(id);
918916
}
919917
return null;
920918
};

‎common-files/utils/websocket.mjs

-102
This file was deleted.

‎nightfall-optimist/src/event-handlers/chain-reorg.mjs

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ import {
6666
deleteTreeByBlockNumberL2,
6767
deleteNullifiersForBlock,
6868
} from '../services/database.mjs';
69-
import { waitForContract } from '../utils/index.mjs';
69+
import { waitForContract } from './subscribe.mjs';
7070

7171
const { STATE_CONTRACT_NAME } = constants;
7272

‎nightfall-optimist/src/event-handlers/index.mjs

+15-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
import { startEventQueue } from '../utils/index.mjs';
1+
import {
2+
startEventQueue,
3+
subscribeToBlockAssembledWebSocketConnection,
4+
subscribeToChallengeWebSocketConnection,
5+
subscribeToInstantWithDrawalWebSocketConnection,
6+
subscribeToProposedBlockWebSocketConnection,
7+
} from './subscribe.mjs';
28
import blockProposedEventHandler from './block-proposed.mjs';
39
import newCurrentProposerEventHandler from './new-current-proposer.mjs';
410
import transactionSubmittedEventHandler from './transaction-submitted.mjs';
@@ -37,4 +43,11 @@ const eventHandlers = {
3743
},
3844
};
3945

40-
export { startEventQueue, eventHandlers };
46+
export {
47+
startEventQueue,
48+
subscribeToBlockAssembledWebSocketConnection,
49+
subscribeToChallengeWebSocketConnection,
50+
subscribeToInstantWithDrawalWebSocketConnection,
51+
subscribeToProposedBlockWebSocketConnection,
52+
eventHandlers,
53+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/* eslint-disable no-await-in-loop */
2+
3+
/**
4+
* Module to subscribe to blockchain events
5+
*/
6+
import WebSocket from 'ws';
7+
import config from 'config';
8+
import logger from 'common-files/utils/logger.mjs';
9+
import { getContractInstance, getContractAddress } from 'common-files/utils/contract.mjs';
10+
import constants from 'common-files/constants/index.mjs';
11+
12+
const {
13+
PROPOSERS_CONTRACT_NAME,
14+
SHIELD_CONTRACT_NAME,
15+
CHALLENGES_CONTRACT_NAME,
16+
STATE_CONTRACT_NAME,
17+
} = constants;
18+
const { RETRIES, WEBSOCKET_PORT, WEBSOCKET_PING_TIME } = config;
19+
const wss = new WebSocket.Server({ port: WEBSOCKET_PORT });
20+
21+
/**
22+
Function that does some standardised setting up of a websocket's events.
23+
It logs open, close and error events, sets up a ping and logs the pong. It will
24+
close the socket on pong failure. The user is expected to handle the reconnect.
25+
It does not set up the onmessage event because this tends to be case-specific.
26+
*/
27+
function setupWebsocketEvents(ws, socketName) {
28+
let timeoutID;
29+
// setup a pinger to ping the websocket correspondent
30+
const intervalID = setInterval(() => {
31+
ws.ping();
32+
// set up a timeout - will close the websocket, which will trigger a reconnect
33+
timeoutID = setTimeout(() => {
34+
logger.warn(`Timed out waiting for ping response from ${socketName}`);
35+
ws.terminate();
36+
}, 2 * WEBSOCKET_PING_TIME);
37+
}, WEBSOCKET_PING_TIME);
38+
// check we received a pong in time (clears the timer set by the pinger)
39+
ws.on('pong', () => {
40+
// logger.debug(`Got pong from ${socketName} websocket`);
41+
clearTimeout(timeoutID);
42+
});
43+
ws.on('error', () => {
44+
logger.debug(`ERROR ${socketName}`);
45+
});
46+
ws.on('open', () => {
47+
logger.debug(`OPEN ${socketName}`);
48+
});
49+
ws.on('close', err => {
50+
logger.debug(`CLOSE ${socketName} ${err}`);
51+
clearInterval(intervalID);
52+
});
53+
}
54+
55+
/**
56+
* Function that tries to get a (named) contract instance and, if it fails, will
57+
* retry after 3 seconds. After RETRIES attempts, it will give up and throw.
58+
* This is useful in case nightfall-optimist comes up before the contract
59+
* is fully deployed.
60+
*/
61+
export async function waitForContract(contractName) {
62+
let errorCount = 0;
63+
let error;
64+
let instance;
65+
while (errorCount < RETRIES) {
66+
try {
67+
error = undefined;
68+
const address = await getContractAddress(contractName);
69+
if (address === undefined) throw new Error(`${contractName} contract address was undefined`);
70+
instance = getContractInstance(contractName, address);
71+
return instance;
72+
} catch (err) {
73+
error = err;
74+
errorCount++;
75+
logger.warn(`Unable to get a ${contractName} contract instance will try again in 3 seconds`);
76+
await new Promise(resolve => setTimeout(() => resolve(), 3000));
77+
}
78+
}
79+
if (error) throw error;
80+
return instance;
81+
}
82+
83+
/**
84+
*
85+
* @param callback - The function that distributes events to the event-handler function
86+
* @param arg - List of arguments to be passed to callback, the first element must be the event-handler functions
87+
* @returns = List of emitters from each contract.
88+
*/
89+
export async function startEventQueue(callback, ...arg) {
90+
const contractNames = [
91+
STATE_CONTRACT_NAME,
92+
SHIELD_CONTRACT_NAME,
93+
CHALLENGES_CONTRACT_NAME,
94+
PROPOSERS_CONTRACT_NAME,
95+
];
96+
const contracts = await Promise.all(contractNames.map(c => waitForContract(c)));
97+
const emitters = contracts.map(e => {
98+
const emitterC = e.events.allEvents();
99+
emitterC.on('changed', event => callback(event, arg));
100+
emitterC.on('data', event => callback(event, arg));
101+
return emitterC;
102+
});
103+
logger.debug('Subscribed to layer 2 state events');
104+
return emitters;
105+
}
106+
107+
export async function subscribeToChallengeWebSocketConnection(callback, ...args) {
108+
wss.on('connection', ws => {
109+
ws.on('message', message => {
110+
if (message === 'challenge') {
111+
setupWebsocketEvents(ws, 'challenge');
112+
callback(ws, args);
113+
}
114+
});
115+
});
116+
logger.debug('Subscribed to Challenge WebSocket connection');
117+
}
118+
119+
export async function subscribeToBlockAssembledWebSocketConnection(callback, ...args) {
120+
wss.on('connection', ws => {
121+
ws.on('message', message => {
122+
if (message === 'blocks') {
123+
setupWebsocketEvents(ws, 'proposer');
124+
callback(ws, args);
125+
}
126+
});
127+
});
128+
logger.debug('Subscribed to BlockAssembled WebSocket connection');
129+
}
130+
131+
export async function subscribeToInstantWithDrawalWebSocketConnection(callback, ...args) {
132+
wss.on('connection', ws => {
133+
ws.on('message', message => {
134+
if (message === 'instant') {
135+
setupWebsocketEvents(ws, 'liquidity provider');
136+
callback(ws, args);
137+
}
138+
});
139+
});
140+
logger.debug('Subscribed to InstantWithDrawal WebSocket connection');
141+
}
142+
143+
export async function subscribeToProposedBlockWebSocketConnection(callback, ...args) {
144+
wss.on('connection', ws => {
145+
ws.on('message', message => {
146+
try {
147+
if (JSON.parse(message).type === 'sync') {
148+
logger.info(`SUBSCRIBING TO PROPOSEDBLOCK`);
149+
setupWebsocketEvents(ws, 'publisher');
150+
callback(ws, args);
151+
}
152+
} catch (error) {
153+
logger.debug('Not JSON Message');
154+
}
155+
});
156+
});
157+
logger.debug('Subscribed to ProposedBlock WebSocket connection');
158+
}

‎nightfall-optimist/src/index.mjs

+12-19
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
11
import logger from 'common-files/utils/logger.mjs';
22
import { queueManager, queues, enqueueEvent } from 'common-files/utils/event-queue.mjs';
3-
import { NFWebsocket } from 'common-files/utils/websocket.mjs';
4-
import config from 'config';
53
import app from './app.mjs';
6-
import { startEventQueue, eventHandlers } from './event-handlers/index.mjs';
4+
import {
5+
startEventQueue,
6+
subscribeToBlockAssembledWebSocketConnection,
7+
subscribeToChallengeWebSocketConnection,
8+
subscribeToInstantWithDrawalWebSocketConnection,
9+
subscribeToProposedBlockWebSocketConnection,
10+
eventHandlers,
11+
} from './event-handlers/index.mjs';
712
import Proposer from './classes/proposer.mjs';
813
import {
914
setBlockAssembledWebSocketConnection,
@@ -15,27 +20,15 @@ import { setInstantWithdrawalWebSocketConnection } from './services/instant-with
1520
import { setProposer } from './routes/proposer.mjs';
1621
import { setBlockProposedWebSocketConnection } from './event-handlers/block-proposed.mjs';
1722

18-
const { WEBSOCKET_PORT, WEBSOCKET_PING_TIME } = config;
19-
2023
const main = async () => {
2124
try {
2225
const proposer = new Proposer();
2326
setProposer(proposer); // passes the proposer instance int the proposer routes
2427
// subscribe to WebSocket events first
25-
26-
const ws = new NFWebsocket({ port: WEBSOCKET_PORT, pingTime: WEBSOCKET_PING_TIME });
27-
28-
ws.subscribe({ topic: 'challenge', socketName: 'challenge' }, setChallengeWebSocketConnection);
29-
ws.subscribe({ topic: 'blocks', socketName: 'proposer' }, setBlockAssembledWebSocketConnection);
30-
ws.subscribe(
31-
{ topic: 'instant', socketName: 'liquidity provider' },
32-
setInstantWithdrawalWebSocketConnection,
33-
);
34-
ws.subscribe(
35-
{ topic: 'sync', socketName: 'publisher', filter: 'type' },
36-
setBlockProposedWebSocketConnection,
37-
);
38-
28+
await subscribeToBlockAssembledWebSocketConnection(setBlockAssembledWebSocketConnection);
29+
await subscribeToChallengeWebSocketConnection(setChallengeWebSocketConnection);
30+
await subscribeToInstantWithDrawalWebSocketConnection(setInstantWithdrawalWebSocketConnection);
31+
await subscribeToProposedBlockWebSocketConnection(setBlockProposedWebSocketConnection);
3932
// start the event queue
4033
await startEventQueue(queueManager, eventHandlers, proposer);
4134
// enqueue the block-assembler every time the queue becomes empty

‎nightfall-optimist/src/routes/proposer.mjs

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
getLatestTree,
2121
getLatestBlockInfo,
2222
} from '../services/database.mjs';
23-
import { waitForContract } from '../utils/index.mjs';
23+
import { waitForContract } from '../event-handlers/subscribe.mjs';
2424
import transactionSubmittedEventHandler from '../event-handlers/transaction-submitted.mjs';
2525
import getProposers from '../services/proposer.mjs';
2626

0 commit comments

Comments
 (0)