Skip to content

Commit

Permalink
fix(WebSocketClientManager): account for async nature of "addConnection"
Browse files Browse the repository at this point in the history
  • Loading branch information
kettanaito committed Sep 17, 2024
1 parent fde2b4e commit 06c9cf2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 27 deletions.
4 changes: 2 additions & 2 deletions src/core/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ function createWebSocketLinkHandler(url: Path): WebSocketLink {
// handler matches and emits a connection event.
// When that happens, store that connection in the
// set of all connections for reference.
handler[kEmitter].on('connection', ({ client }) => {
clientManager.addConnection(client)
handler[kEmitter].on('connection', async ({ client }) => {
await clientManager.addConnection(client)
})

// The "handleWebSocketEvent" function will invoke
Expand Down
20 changes: 9 additions & 11 deletions src/core/ws/WebSocketClientManager.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
/**
* @vitest-environment node-websocket
*/
// @vitest-environment node-websocket
import {
WebSocketClientConnection,
WebSocketData,
Expand All @@ -25,26 +23,26 @@ afterEach(() => {
vi.resetAllMocks()
})

it('adds a client from this runtime to the list of clients', () => {
it('adds a client from this runtime to the list of clients', async () => {
const manager = new WebSocketClientManager(channel, '*')
const connection = new WebSocketClientConnection(
socket,
new TestWebSocketTransport(),
)

manager.addConnection(connection)
await manager.addConnection(connection)

// Must add the client to the list of clients.
expect(Array.from(manager.clients.values())).toEqual([connection])
})

it('adds multiple clients from this runtime to the list of clients', () => {
it('adds multiple clients from this runtime to the list of clients', async () => {
const manager = new WebSocketClientManager(channel, '*')
const connectionOne = new WebSocketClientConnection(
socket,
new TestWebSocketTransport(),
)
manager.addConnection(connectionOne)
await manager.addConnection(connectionOne)

// Must add the client to the list of clients.
expect(Array.from(manager.clients.values())).toEqual([connectionOne])
Expand All @@ -53,7 +51,7 @@ it('adds multiple clients from this runtime to the list of clients', () => {
socket,
new TestWebSocketTransport(),
)
manager.addConnection(connectionTwo)
await manager.addConnection(connectionTwo)

// Must add the new cilent to the list as well.
expect(Array.from(manager.clients.values())).toEqual([
Expand All @@ -68,7 +66,7 @@ it('replays a "send" event coming from another runtime', async () => {
socket,
new TestWebSocketTransport(),
)
manager.addConnection(connection)
await manager.addConnection(connection)
vi.spyOn(connection, 'send')

// Emulate another runtime signaling this connection to receive data.
Expand Down Expand Up @@ -97,7 +95,7 @@ it('replays a "close" event coming from another runtime', async () => {
socket,
new TestWebSocketTransport(),
)
manager.addConnection(connection)
await manager.addConnection(connection)
vi.spyOn(connection, 'close')

// Emulate another runtime signaling this connection to close.
Expand Down Expand Up @@ -137,7 +135,7 @@ it('removes the extraneous message listener when the connection closes', async (
})
vi.spyOn(connection, 'send')

manager.addConnection(connection)
await manager.addConnection(connection)
connection.close()

// Signals from other runtimes have no effect on the closed connection.
Expand Down
26 changes: 12 additions & 14 deletions src/core/ws/WebSocketClientManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,10 @@ export class WebSocketClientManager {
}

private async removeRuntimeClients(): Promise<void> {
this.store.deleteMany(Array.from(this.runtimeClients.keys())).then(() => {
this.runtimeClients.clear()
this.flushDatabaseToMemory()
this.notifyOthersAboutDatabaseUpdate()
})
await this.store.deleteMany(Array.from(this.runtimeClients.keys()))
this.runtimeClients.clear()
await this.flushDatabaseToMemory()
this.notifyOthersAboutDatabaseUpdate()
}

/**
Expand All @@ -110,13 +109,12 @@ export class WebSocketClientManager {
this.channel.postMessage({ type: 'db:update' })
}

private addClient(client: WebSocketClientConnection): void {
this.store.add(client).then(() => {
// Sync the in-memory clients in this runtime with the
// updated database. This pulls in all the stored clients.
this.flushDatabaseToMemory()
this.notifyOthersAboutDatabaseUpdate()
})
private async addClient(client: WebSocketClientConnection): Promise<void> {
await this.store.add(client)
// Sync the in-memory clients in this runtime with the
// updated database. This pulls in all the stored clients.
await this.flushDatabaseToMemory()
this.notifyOthersAboutDatabaseUpdate()
}

/**
Expand All @@ -125,14 +123,14 @@ export class WebSocketClientManager {
* connection object because `addConnection()` is called only
* for the opened connections in the same runtime.
*/
public addConnection(client: WebSocketClientConnection): void {
public async addConnection(client: WebSocketClientConnection): Promise<void> {
// Store this client in the map of clients created in this runtime.
// This way, the manager can distinguish between this runtime clients
// and extraneous runtime clients when synchronizing clients storage.
this.runtimeClients.set(client.id, client)

// Add the new client to the storage.
this.addClient(client)
await this.addClient(client)

// Handle the incoming BroadcastChannel messages from other runtimes
// that attempt to control this runtime (via a remote connection wrapper).
Expand Down

0 comments on commit 06c9cf2

Please sign in to comment.