diff --git a/.eslintrc.js b/.eslintrc.js index fce388e00095..fe98be14e2a7 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -51,7 +51,7 @@ const allPackages = [ 'packages/common/debug', 'packages/common/env', 'packages/common/infra', - 'packages/common/theme', + 'packages/common/doc-storage', 'tools/cli', ]; diff --git a/packages/backend/server/src/core/doc/storage/index.ts b/packages/backend/server/src/core/doc/storage/index.ts index 6ba0e23dd111..3e56264bd1ba 100644 --- a/packages/backend/server/src/core/doc/storage/index.ts +++ b/packages/backend/server/src/core/doc/storage/index.ts @@ -1,4 +1,6 @@ -// TODO(@forehalo): share with frontend +// This is a totally copy of definitions in [@affine/doc-storage] +// because currently importing cross workspace package from [@affine/server] is not yet supported +// should be kept updated with the original definitions in [@affine/doc-storage] import type { BlobStorageAdapter } from './blob'; import { Connection } from './connection'; import type { DocStorageAdapter } from './doc'; diff --git a/packages/common/doc-storage/package.json b/packages/common/doc-storage/package.json new file mode 100644 index 000000000000..a67dcb495c23 --- /dev/null +++ b/packages/common/doc-storage/package.json @@ -0,0 +1,17 @@ +{ + "name": "@affine/doc-storage", + "type": "module", + "version": "0.15.0", + "private": true, + "sideEffects": false, + "exports": { + ".": "./src/index.ts" + }, + "dependencies": { + "lodash-es": "^4.17.21", + "yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" + }, + "devDependencies": { + "@types/lodash-es": "^4.17.12" + } +} diff --git a/packages/common/doc-storage/src/blob.ts b/packages/common/doc-storage/src/blob.ts new file mode 100644 index 000000000000..e3c78191b7fd --- /dev/null +++ b/packages/common/doc-storage/src/blob.ts @@ -0,0 +1,16 @@ +import { Connection } from './connection'; + +export interface BlobStorageOptions {} + +export interface Blob { + key: string; + bin: Uint8Array; + mimeType: string; +} + +export abstract class BlobStorageAdapter extends Connection { + abstract getBlob(spaceId: string, key: string): Promise; + abstract setBlob(spaceId: string, blob: Blob): Promise; + abstract deleteBlob(spaceId: string, key: string): Promise; + abstract listBlobs(spaceId: string): Promise; +} diff --git a/packages/common/doc-storage/src/connection.ts b/packages/common/doc-storage/src/connection.ts new file mode 100644 index 000000000000..f82a72fbd393 --- /dev/null +++ b/packages/common/doc-storage/src/connection.ts @@ -0,0 +1,11 @@ +export class Connection { + protected connected: boolean = false; + connect(): Promise { + this.connected = true; + return Promise.resolve(); + } + disconnect(): Promise { + this.connected = false; + return Promise.resolve(); + } +} diff --git a/packages/common/doc-storage/src/doc.ts b/packages/common/doc-storage/src/doc.ts new file mode 100644 index 000000000000..b819dd728e86 --- /dev/null +++ b/packages/common/doc-storage/src/doc.ts @@ -0,0 +1,214 @@ +import { + applyUpdate, + Doc, + encodeStateAsUpdate, + encodeStateVector, + mergeUpdates, + UndoManager, +} from 'yjs'; + +import { Connection } from './connection'; +import { SingletonLocker } from './lock'; + +export interface DocRecord { + spaceId: string; + docId: string; + bin: Uint8Array; + timestamp: number; + editor?: string; +} + +export interface DocUpdate { + bin: Uint8Array; + timestamp: number; + editor?: string; +} + +export interface HistoryFilter { + before?: number; + limit?: number; +} + +export interface Editor { + name: string; + avatarUrl: string | null; +} + +export interface DocStorageOptions { + mergeUpdates?: (updates: Uint8Array[]) => Promise | Uint8Array; +} + +export abstract class DocStorageAdapter extends Connection { + private readonly locker = new SingletonLocker(); + + constructor( + protected readonly options: DocStorageOptions = { + mergeUpdates, + } + ) { + super(); + } + + // open apis + isEmptyBin(bin: Uint8Array): boolean { + return ( + bin.length === 0 || + // 0x0 for state vector + (bin.length === 1 && bin[0] === 0) || + // 0x00 for update + (bin.length === 2 && bin[0] === 0 && bin[1] === 0) + ); + } + + async getDoc(spaceId: string, docId: string): Promise { + await using _lock = await this.lockDocForUpdate(spaceId, docId); + + const snapshot = await this.getDocSnapshot(spaceId, docId); + const updates = await this.getDocUpdates(spaceId, docId); + + if (updates.length) { + const { timestamp, bin, editor } = await this.squash( + snapshot ? [snapshot, ...updates] : updates + ); + + const newSnapshot = { + spaceId: spaceId, + docId, + bin, + timestamp, + editor, + }; + + const success = await this.setDocSnapshot(newSnapshot); + + // if there is old snapshot, create a new history record + if (success && snapshot) { + await this.createDocHistory(snapshot); + } + + // always mark updates as merged unless throws + await this.markUpdatesMerged(spaceId, docId, updates); + + return newSnapshot; + } + + return snapshot; + } + + abstract pushDocUpdates( + spaceId: string, + docId: string, + updates: Uint8Array[], + editorId?: string + ): Promise; + + abstract deleteDoc(spaceId: string, docId: string): Promise; + abstract deleteSpace(spaceId: string): Promise; + async rollbackDoc( + spaceId: string, + docId: string, + timestamp: number, + editorId?: string + ): Promise { + await using _lock = await this.lockDocForUpdate(spaceId, docId); + const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp); + if (!toSnapshot) { + throw new Error('Can not find the version to rollback to.'); + } + + const fromSnapshot = await this.getDocSnapshot(spaceId, docId); + + if (!fromSnapshot) { + throw new Error('Can not find the current version of the doc.'); + } + + const change = this.generateChangeUpdate(fromSnapshot.bin, toSnapshot.bin); + await this.pushDocUpdates(spaceId, docId, [change], editorId); + // force create a new history record after rollback + await this.createDocHistory(fromSnapshot, true); + } + + abstract getSpaceDocTimestamps( + spaceId: string, + after?: number + ): Promise | null>; + abstract listDocHistories( + spaceId: string, + docId: string, + query: { skip?: number; limit?: number } + ): Promise<{ timestamp: number; editor: Editor | null }[]>; + abstract getDocHistory( + spaceId: string, + docId: string, + timestamp: number + ): Promise; + + // api for internal usage + protected abstract getDocSnapshot( + spaceId: string, + docId: string + ): Promise; + protected abstract setDocSnapshot(snapshot: DocRecord): Promise; + protected abstract getDocUpdates( + spaceId: string, + docId: string + ): Promise; + protected abstract markUpdatesMerged( + spaceId: string, + docId: string, + updates: DocUpdate[] + ): Promise; + + protected abstract createDocHistory( + snapshot: DocRecord, + force?: boolean + ): Promise; + + protected async squash(updates: DocUpdate[]): Promise { + const merge = this.options?.mergeUpdates ?? mergeUpdates; + const lastUpdate = updates.at(-1); + if (!lastUpdate) { + throw new Error('No updates to be squashed.'); + } + + // fast return + if (updates.length === 1) { + return lastUpdate; + } + + const finalUpdate = await merge(updates.map(u => u.bin)); + + return { + bin: finalUpdate, + timestamp: lastUpdate.timestamp, + editor: lastUpdate.editor, + }; + } + + protected async lockDocForUpdate( + spaceId: string, + docId: string + ): Promise { + return this.locker.lock(`workspace:${spaceId}:update`, docId); + } + + protected generateChangeUpdate(newerBin: Uint8Array, olderBin: Uint8Array) { + const newerDoc = new Doc(); + applyUpdate(newerDoc, newerBin); + const olderDoc = new Doc(); + applyUpdate(olderDoc, olderBin); + + const newerState = encodeStateVector(newerDoc); + const olderState = encodeStateVector(olderDoc); + + const diff = encodeStateAsUpdate(newerDoc, olderState); + + const undoManager = new UndoManager(Array.from(newerDoc.share.values())); + + applyUpdate(olderDoc, diff); + + undoManager.undo(); + + return encodeStateAsUpdate(olderDoc, newerState); + } +} diff --git a/packages/common/doc-storage/src/index.ts b/packages/common/doc-storage/src/index.ts new file mode 100644 index 000000000000..db27a63e70f4 --- /dev/null +++ b/packages/common/doc-storage/src/index.ts @@ -0,0 +1,33 @@ +// TODO(@forehalo): share with backend +import type { BlobStorageAdapter } from './blob'; +import { Connection } from './connection'; +import type { DocStorageAdapter } from './doc'; + +export class SpaceStorage extends Connection { + constructor( + public readonly doc: DocStorageAdapter, + public readonly blob: BlobStorageAdapter + ) { + super(); + } + + override async connect() { + await this.doc.connect(); + await this.blob.connect(); + } + + override async disconnect() { + await this.doc.disconnect(); + await this.blob.disconnect(); + } +} + +export { BlobStorageAdapter, type BlobStorageOptions } from './blob'; +export { + type DocRecord, + DocStorageAdapter, + type DocStorageOptions, + type DocUpdate, + type Editor, + type HistoryFilter, +} from './doc'; diff --git a/packages/common/doc-storage/src/lock.ts b/packages/common/doc-storage/src/lock.ts new file mode 100644 index 000000000000..c4fcf45f3e56 --- /dev/null +++ b/packages/common/doc-storage/src/lock.ts @@ -0,0 +1,42 @@ +export interface Locker { + lock(domain: string, resource: string): Promise; +} + +export class SingletonLocker implements Locker { + lockedResource = new Map(); + constructor() {} + + async lock(domain: string, resource: string) { + let lock = this.lockedResource.get(`${domain}:${resource}`); + + if (!lock) { + lock = new Lock(); + } + + await lock.acquire(); + + return lock; + } +} + +export class Lock { + private inner: Promise = Promise.resolve(); + private release: () => void = () => {}; + + async acquire() { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + let release: () => void = null!; + const nextLock = new Promise(resolve => { + release = resolve; + }); + + await this.inner; + this.inner = nextLock; + this.release = release; + } + + [Symbol.asyncDispose]() { + this.release(); + return Promise.resolve(); + } +} diff --git a/packages/common/doc-storage/tsconfig.json b/packages/common/doc-storage/tsconfig.json new file mode 100644 index 000000000000..76a00b3af083 --- /dev/null +++ b/packages/common/doc-storage/tsconfig.json @@ -0,0 +1,14 @@ +{ + "extends": "../../../tsconfig.json", + "include": ["./src"], + "compilerOptions": { + "composite": true, + "noEmit": false, + "outDir": "lib" + }, + "references": [ + { + "path": "./tsconfig.node.json" + } + ] +} diff --git a/packages/common/doc-storage/tsconfig.node.json b/packages/common/doc-storage/tsconfig.node.json new file mode 100644 index 000000000000..a51a6ab74414 --- /dev/null +++ b/packages/common/doc-storage/tsconfig.node.json @@ -0,0 +1,11 @@ +{ + "extends": "../../../tsconfig.json", + "compilerOptions": { + "composite": true, + "module": "ESNext", + "moduleResolution": "Node", + "allowSyntheticDefaultImports": true, + "outDir": "lib", + "noEmit": false + } +} diff --git a/tsconfig.json b/tsconfig.json index 9b7a7ee952fb..824d8287cf05 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -72,7 +72,8 @@ "@affine/native/*": ["./packages/frontend/native/*"], "@affine/server-native": ["./packages/backend/native/index.d.ts"], // Development only - "@affine/electron/*": ["./packages/frontend/electron/src/*"] + "@affine/electron/*": ["./packages/frontend/electron/src/*"], + "@affine/doc-storage": ["./packages/common/doc-storage/src"] } }, "include": [], @@ -119,6 +120,9 @@ { "path": "./packages/common/infra" }, + { + "path": "./packages/common/doc-storage" + }, // Tools { "path": "./tools/cli" diff --git a/yarn.lock b/yarn.lock index 5fe9ebef6bc2..72cfc6f9e80a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -523,6 +523,16 @@ __metadata: languageName: unknown linkType: soft +"@affine/doc-storage@workspace:packages/common/doc-storage": + version: 0.0.0-use.local + resolution: "@affine/doc-storage@workspace:packages/common/doc-storage" + dependencies: + "@types/lodash-es": "npm:^4.17.12" + lodash-es: "npm:^4.17.21" + yjs: "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch" + languageName: unknown + linkType: soft + "@affine/docs@workspace:docs/reference": version: 0.0.0-use.local resolution: "@affine/docs@workspace:docs/reference"