Skip to content

Commit

Permalink
fix: use different websocket client store in browser and node
Browse files Browse the repository at this point in the history
  • Loading branch information
kettanaito committed Sep 17, 2024
1 parent a50bed2 commit 434e4cf
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 119 deletions.
164 changes: 46 additions & 118 deletions src/core/ws/WebSocketClientManager.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
import { DeferredPromise } from '@open-draft/deferred-promise'
import type {
WebSocketData,
WebSocketClientConnection,
WebSocketClientConnectionProtocol,
} from '@mswjs/interceptors/WebSocket'
import { type Path } from '../utils/matching/matchRequestUrl'

const DB_NAME = 'msw-websocket-clients'
const DB_STORE_NAME = 'clients'
import { WebSocketClientStore } from './WebSocketClientStore'
import { WebSocketMemoryClientStore } from './WebSocketMemoryClientStore'
import { WebSocketIndexedDBClientStore } from './WebSocketIndexedDBClientStore'

export type WebSocketBroadcastChannelMessage =
| {
Expand All @@ -26,25 +25,26 @@ export type WebSocketBroadcastChannelMessage =
}
}

type SerializedClient = {
id: string
url: string
}

/**
* A manager responsible for accumulating WebSocket client
* connections across different browser runtimes.
*/
export class WebSocketClientManager {
private db: Promise<IDBDatabase>
private store: WebSocketClientStore
private runtimeClients: Map<string, WebSocketClientConnectionProtocol>
private allClients: Set<WebSocketClientConnectionProtocol>

constructor(
private channel: BroadcastChannel,
private url: Path,
) {
this.db = this.createDatabase()
// Store the clients in the IndexedDB in the browser,
// otherwise, store the clients in memory.
this.store =
typeof indexedDB !== 'undefined'
? new WebSocketIndexedDBClientStore()
: new WebSocketMemoryClientStore()

this.runtimeClients = new Map()
this.allClients = new Set()

Expand All @@ -63,94 +63,36 @@ export class WebSocketClientManager {
}
}

private async createDatabase() {
const promise = new DeferredPromise<IDBDatabase>()
const request = indexedDB.open(DB_NAME, 1)

request.onsuccess = ({ currentTarget }) => {
const db = Reflect.get(currentTarget!, 'result') as IDBDatabase

if (db.objectStoreNames.contains(DB_STORE_NAME)) {
return promise.resolve(db)
}
}
private async flushDatabaseToMemory() {
const storedClients = await this.store.getAll()

request.onupgradeneeded = async ({ currentTarget }) => {
const db = Reflect.get(currentTarget!, 'result') as IDBDatabase
if (db.objectStoreNames.contains(DB_STORE_NAME)) {
return
}
this.allClients = new Set(
storedClients.map((client) => {
const runtimeClient = this.runtimeClients.get(client.id)

const store = db.createObjectStore(DB_STORE_NAME, { keyPath: 'id' })
store.transaction.oncomplete = () => {
promise.resolve(db)
}
store.transaction.onerror = () => {
promise.reject(new Error('Failed to create WebSocket client store'))
}
}
request.onerror = () => {
promise.reject(new Error('Failed to open an IndexedDB database'))
}

return promise
}

private flushDatabaseToMemory() {
this.getStore().then((store) => {
const request = store.getAll() as IDBRequest<Array<SerializedClient>>

request.onsuccess = () => {
this.allClients = new Set(
request.result.map((client) => {
const runtimeClient = this.runtimeClients.get(client.id)

/**
* @note For clients originating in this runtime, use their
* direct references. No need to wrap them in a remote connection.
*/
if (runtimeClient) {
return runtimeClient
}
/**
* @note For clients originating in this runtime, use their
* direct references. No need to wrap them in a remote connection.
*/
if (runtimeClient) {
return runtimeClient
}

return new WebSocketRemoteClientConnection(
client.id,
new URL(client.url),
this.channel,
)
}),
return new WebSocketRemoteClientConnection(
client.id,
new URL(client.url),
this.channel,
)
}
})
}

private async getStore(): Promise<IDBObjectStore> {
const db = await this.db
return db.transaction(DB_STORE_NAME, 'readwrite').objectStore(DB_STORE_NAME)
}),
)
}

private async removeRuntimeClients(): Promise<void> {
const promise = new DeferredPromise<void>()
const store = await this.getStore()

this.runtimeClients.forEach((client) => {
store.delete(client.id)
})

store.transaction.oncomplete = () => {
this.store.deleteMany(Array.from(this.runtimeClients.keys())).then(() => {
this.runtimeClients.clear()
this.flushDatabaseToMemory()
this.notifyOthersAboutDatabaseUpdate()
promise.resolve()
}

store.transaction.onerror = () => {
promise.reject(
new Error('Failed to remove runtime clients from the store'),
)
}

return promise
})
}

/**
Expand All @@ -160,40 +102,21 @@ export class WebSocketClientManager {
return this.allClients
}

/**
* Notify other runtimes about the database update
* using the shared `BroadcastChannel` instance.
*/
private notifyOthersAboutDatabaseUpdate(): void {
// Notify other runtimes to sync their in-memory clients
// with the updated database.
this.channel.postMessage({ type: 'db:update' })
}

private addClient(client: WebSocketClientConnection): void {
this.getStore()
.then((store) => {
const request = store.add({
id: client.id,
url: client.url.href,
} satisfies SerializedClient)

request.onsuccess = () => {
// Sync the in-memory clients in this runtime with the
// updated database. This pulls in all the stored clients.
this.flushDatabaseToMemory()
this.notifyOthersAboutDatabaseUpdate()
}

request.onerror = () => {
// eslint-disable-next-line no-console
console.error(
`Failed to add a WebSocket client ("${client.id}") connection to the store.`,
)
}
})
.catch((error) => {
// eslint-disable-next-line no-console
console.error(
`Failed to add a WebSocket client ("${client.id}") to the store: ${error}`,
)
})
this.store.add(client).then(() => {
// Sync the in-memory clients in this runtime with the
// updated database. This pulls in all the stored clients.
this.flushDatabaseToMemory()
this.notifyOthersAboutDatabaseUpdate()
})
}

/**
Expand All @@ -203,7 +126,12 @@ export class WebSocketClientManager {
* for the opened connections in the same runtime.
*/
public addConnection(client: WebSocketClientConnection): void {
// Store this client in the map of clients created in this runtime.
// This way, the manager can distinguish between this runtime clients
// and extraneous runtime clients when synchronizing clients storage.
this.runtimeClients.set(client.id, client)

// Add the new client to the storage.
this.addClient(client)

// Handle the incoming BroadcastChannel messages from other runtimes
Expand Down
14 changes: 14 additions & 0 deletions src/core/ws/WebSocketClientStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import type { WebSocketClientConnectionProtocol } from '@mswjs/interceptors/WebSocket'

export type SerializedWebSocketClient = {
id: string
url: string
}

export abstract class WebSocketClientStore {
public abstract add(client: WebSocketClientConnectionProtocol): Promise<void>

public abstract getAll(): Promise<Array<SerializedWebSocketClient>>

public abstract deleteMany(clientIds: Array<string>): Promise<void>
}
112 changes: 112 additions & 0 deletions src/core/ws/WebSocketIndexedDBClientStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import { DeferredPromise } from '@open-draft/deferred-promise'
import { WebSocketClientConnectionProtocol } from '@mswjs/interceptors/lib/browser/interceptors/WebSocket'
import {
type SerializedWebSocketClient,
WebSocketClientStore,
} from './WebSocketClientStore'

const DB_NAME = 'msw-websocket-clients'
const DB_STORE_NAME = 'clients'

export class WebSocketIndexedDBClientStore implements WebSocketClientStore {
private db: Promise<IDBDatabase>

constructor() {
this.db = this.createDatabase()
}

public async add(client: WebSocketClientConnectionProtocol): Promise<void> {
const promise = new DeferredPromise<void>()
const store = await this.getStore()
const request = store.add({
id: client.id,
url: client.url.href,
} satisfies SerializedWebSocketClient)

request.onsuccess = () => {
promise.resolve()
}
request.onerror = () => {
promise.reject(new Error(`Failed to add WebSocket client ${client.id}`))
}

return promise
}

public async getAll(): Promise<Array<SerializedWebSocketClient>> {
const promise = new DeferredPromise<Array<SerializedWebSocketClient>>()
const store = await this.getStore()
const request = store.getAll() as IDBRequest<
Array<SerializedWebSocketClient>
>

request.onsuccess = () => {
promise.resolve(request.result)
}
request.onerror = () => {
promise.reject(new Error(`Failed to get all WebSocket clients`))
}

return promise
}

public async deleteMany(clientIds: Array<string>): Promise<void> {
const promise = new DeferredPromise<void>()
const store = await this.getStore()

for (const clientId of clientIds) {
store.delete(clientId)
}

store.transaction.oncomplete = () => {
promise.resolve()
}
store.transaction.onerror = () => {
promise.reject(
new Error(
`Failed to delete WebSocket clients [${clientIds.join(', ')}]`,
),
)
}

return promise
}

private async createDatabase(): Promise<IDBDatabase> {
const promise = new DeferredPromise<IDBDatabase>()
const request = indexedDB.open(DB_NAME, 1)

request.onsuccess = ({ currentTarget }) => {
const db = Reflect.get(currentTarget!, 'result') as IDBDatabase

if (db.objectStoreNames.contains(DB_STORE_NAME)) {
return promise.resolve(db)
}
}

request.onupgradeneeded = async ({ currentTarget }) => {
const db = Reflect.get(currentTarget!, 'result') as IDBDatabase
if (db.objectStoreNames.contains(DB_STORE_NAME)) {
return
}

const store = db.createObjectStore(DB_STORE_NAME, { keyPath: 'id' })
store.transaction.oncomplete = () => {
promise.resolve(db)
}
store.transaction.onerror = () => {
promise.reject(new Error('Failed to create WebSocket client store'))
}
}
request.onerror = () => {
promise.reject(new Error('Failed to open an IndexedDB database'))
}

return promise
}

private async getStore(): Promise<IDBObjectStore> {
const db = await this.db
return db.transaction(DB_STORE_NAME, 'readwrite').objectStore(DB_STORE_NAME)
}
}
27 changes: 27 additions & 0 deletions src/core/ws/WebSocketMemoryClientStore.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { WebSocketClientConnectionProtocol } from '@mswjs/interceptors/lib/browser/interceptors/WebSocket'
import {
SerializedWebSocketClient,
WebSocketClientStore,
} from './WebSocketClientStore'

export class WebSocketMemoryClientStore implements WebSocketClientStore {
private store: Map<string, SerializedWebSocketClient>

constructor() {
this.store = new Map()
}

public async add(client: WebSocketClientConnectionProtocol): Promise<void> {
this.store.set(client.id, { id: client.id, url: client.url.href })
}

public getAll(): Promise<Array<SerializedWebSocketClient>> {
return Promise.resolve(Array.from(this.store.values()))
}

public async deleteMany(clientIds: Array<string>): Promise<void> {
for (const clientId of clientIds) {
this.store.delete(clientId)
}
}
}
2 changes: 1 addition & 1 deletion test/browser/ws-api/ws.clients.browser.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ test('returns the number of active clients across different runtimes', async ({
expect(await pageOne.evaluate(() => window.link.clients.size)).toBe(2)
})

test.only('broadcasts messages across runtimes', async ({
test('broadcasts messages across runtimes', async ({
loadExample,
context,
page,
Expand Down

0 comments on commit 434e4cf

Please sign in to comment.