Skip to content

Commit

Permalink
feat(doc-storage): init doc-storage package
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo committed Sep 5, 2024
1 parent 5c67f98 commit dacc81e
Show file tree
Hide file tree
Showing 12 changed files with 377 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const allPackages = [
'packages/common/debug',
'packages/common/env',
'packages/common/infra',
'packages/common/theme',
'packages/common/doc-storage',
'tools/cli',
];

Expand Down
4 changes: 3 additions & 1 deletion packages/backend/server/src/core/doc/storage/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// TODO(@forehalo): share with frontend
// This is a totally copy of definitions in [@affine/doc-storage]
// because currently importing cross workspace package from [@affine/server] is not yet supported
// should be kept updated with the original definitions in [@affine/doc-storage]
import type { BlobStorageAdapter } from './blob';
import { Connection } from './connection';
import type { DocStorageAdapter } from './doc';
Expand Down
17 changes: 17 additions & 0 deletions packages/common/doc-storage/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "@affine/doc-storage",
"type": "module",
"version": "0.15.0",
"private": true,
"sideEffects": false,
"exports": {
".": "./src/index.ts"
},
"dependencies": {
"lodash-es": "^4.17.21",
"yjs": "patch:yjs@npm%3A13.6.18#~/.yarn/patches/yjs-npm-13.6.18-ad0d5f7c43.patch"
},
"devDependencies": {
"@types/lodash-es": "^4.17.12"
}
}
16 changes: 16 additions & 0 deletions packages/common/doc-storage/src/blob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Connection } from './connection';

export interface BlobStorageOptions {}

export interface Blob {
key: string;
bin: Uint8Array;
mimeType: string;
}

export abstract class BlobStorageAdapter extends Connection {
abstract getBlob(spaceId: string, key: string): Promise<Blob | null>;
abstract setBlob(spaceId: string, blob: Blob): Promise<string>;
abstract deleteBlob(spaceId: string, key: string): Promise<boolean>;
abstract listBlobs(spaceId: string): Promise<Blob>;
}
11 changes: 11 additions & 0 deletions packages/common/doc-storage/src/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
export class Connection {
protected connected: boolean = false;
connect(): Promise<void> {
this.connected = true;
return Promise.resolve();
}
disconnect(): Promise<void> {
this.connected = false;
return Promise.resolve();
}
}
214 changes: 214 additions & 0 deletions packages/common/doc-storage/src/doc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
import {
applyUpdate,
Doc,
encodeStateAsUpdate,
encodeStateVector,
mergeUpdates,
UndoManager,
} from 'yjs';

import { Connection } from './connection';
import { SingletonLocker } from './lock';

export interface DocRecord {
spaceId: string;
docId: string;
bin: Uint8Array;
timestamp: number;
editor?: string;
}

export interface DocUpdate {
bin: Uint8Array;
timestamp: number;
editor?: string;
}

export interface HistoryFilter {
before?: number;
limit?: number;
}

export interface Editor {
name: string;
avatarUrl: string | null;
}

export interface DocStorageOptions {
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
}

export abstract class DocStorageAdapter extends Connection {
private readonly locker = new SingletonLocker();

constructor(
protected readonly options: DocStorageOptions = {
mergeUpdates,
}
) {
super();
}

// open apis
isEmptyBin(bin: Uint8Array): boolean {
return (
bin.length === 0 ||
// 0x0 for state vector
(bin.length === 1 && bin[0] === 0) ||
// 0x00 for update
(bin.length === 2 && bin[0] === 0 && bin[1] === 0)
);
}

async getDoc(spaceId: string, docId: string): Promise<DocRecord | null> {
await using _lock = await this.lockDocForUpdate(spaceId, docId);

const snapshot = await this.getDocSnapshot(spaceId, docId);
const updates = await this.getDocUpdates(spaceId, docId);

if (updates.length) {
const { timestamp, bin, editor } = await this.squash(
snapshot ? [snapshot, ...updates] : updates
);

const newSnapshot = {
spaceId: spaceId,
docId,
bin,
timestamp,
editor,
};

const success = await this.setDocSnapshot(newSnapshot);

// if there is old snapshot, create a new history record
if (success && snapshot) {
await this.createDocHistory(snapshot);
}

// always mark updates as merged unless throws
await this.markUpdatesMerged(spaceId, docId, updates);

return newSnapshot;
}

return snapshot;
}

abstract pushDocUpdates(
spaceId: string,
docId: string,
updates: Uint8Array[],
editorId?: string
): Promise<number>;

abstract deleteDoc(spaceId: string, docId: string): Promise<void>;
abstract deleteSpace(spaceId: string): Promise<void>;
async rollbackDoc(
spaceId: string,
docId: string,
timestamp: number,
editorId?: string
): Promise<void> {
await using _lock = await this.lockDocForUpdate(spaceId, docId);
const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp);
if (!toSnapshot) {
throw new Error('Can not find the version to rollback to.');
}

const fromSnapshot = await this.getDocSnapshot(spaceId, docId);

if (!fromSnapshot) {
throw new Error('Can not find the current version of the doc.');
}

const change = this.generateChangeUpdate(fromSnapshot.bin, toSnapshot.bin);
await this.pushDocUpdates(spaceId, docId, [change], editorId);
// force create a new history record after rollback
await this.createDocHistory(fromSnapshot, true);
}

abstract getSpaceDocTimestamps(
spaceId: string,
after?: number
): Promise<Record<string, number> | null>;
abstract listDocHistories(
spaceId: string,
docId: string,
query: { skip?: number; limit?: number }
): Promise<{ timestamp: number; editor: Editor | null }[]>;
abstract getDocHistory(
spaceId: string,
docId: string,
timestamp: number
): Promise<DocRecord | null>;

// api for internal usage
protected abstract getDocSnapshot(
spaceId: string,
docId: string
): Promise<DocRecord | null>;
protected abstract setDocSnapshot(snapshot: DocRecord): Promise<boolean>;
protected abstract getDocUpdates(
spaceId: string,
docId: string
): Promise<DocUpdate[]>;
protected abstract markUpdatesMerged(
spaceId: string,
docId: string,
updates: DocUpdate[]
): Promise<number>;

protected abstract createDocHistory(
snapshot: DocRecord,
force?: boolean
): Promise<boolean>;

protected async squash(updates: DocUpdate[]): Promise<DocUpdate> {
const merge = this.options?.mergeUpdates ?? mergeUpdates;
const lastUpdate = updates.at(-1);
if (!lastUpdate) {
throw new Error('No updates to be squashed.');
}

// fast return
if (updates.length === 1) {
return lastUpdate;
}

const finalUpdate = await merge(updates.map(u => u.bin));

return {
bin: finalUpdate,
timestamp: lastUpdate.timestamp,
editor: lastUpdate.editor,
};
}

protected async lockDocForUpdate(
spaceId: string,
docId: string
): Promise<AsyncDisposable> {
return this.locker.lock(`workspace:${spaceId}:update`, docId);
}

protected generateChangeUpdate(newerBin: Uint8Array, olderBin: Uint8Array) {
const newerDoc = new Doc();
applyUpdate(newerDoc, newerBin);
const olderDoc = new Doc();
applyUpdate(olderDoc, olderBin);

const newerState = encodeStateVector(newerDoc);
const olderState = encodeStateVector(olderDoc);

const diff = encodeStateAsUpdate(newerDoc, olderState);

const undoManager = new UndoManager(Array.from(newerDoc.share.values()));

applyUpdate(olderDoc, diff);

undoManager.undo();

return encodeStateAsUpdate(olderDoc, newerState);
}
}
33 changes: 33 additions & 0 deletions packages/common/doc-storage/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// TODO(@forehalo): share with backend
import type { BlobStorageAdapter } from './blob';
import { Connection } from './connection';
import type { DocStorageAdapter } from './doc';

export class SpaceStorage extends Connection {
constructor(
public readonly doc: DocStorageAdapter,
public readonly blob: BlobStorageAdapter
) {
super();
}

override async connect() {
await this.doc.connect();
await this.blob.connect();
}

override async disconnect() {
await this.doc.disconnect();
await this.blob.disconnect();
}
}

export { BlobStorageAdapter, type BlobStorageOptions } from './blob';
export {
type DocRecord,
DocStorageAdapter,
type DocStorageOptions,
type DocUpdate,
type Editor,
type HistoryFilter,
} from './doc';
42 changes: 42 additions & 0 deletions packages/common/doc-storage/src/lock.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
export interface Locker {
lock(domain: string, resource: string): Promise<Lock>;
}

export class SingletonLocker implements Locker {
lockedResource = new Map<string, Lock>();
constructor() {}

async lock(domain: string, resource: string) {
let lock = this.lockedResource.get(`${domain}:${resource}`);

if (!lock) {
lock = new Lock();
}

await lock.acquire();

return lock;
}
}

export class Lock {
private inner: Promise<void> = Promise.resolve();
private release: () => void = () => {};

async acquire() {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
let release: () => void = null!;
const nextLock = new Promise<void>(resolve => {
release = resolve;
});

await this.inner;
this.inner = nextLock;
this.release = release;
}

[Symbol.asyncDispose]() {
this.release();
return Promise.resolve();
}
}
14 changes: 14 additions & 0 deletions packages/common/doc-storage/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"extends": "../../../tsconfig.json",
"include": ["./src"],
"compilerOptions": {
"composite": true,
"noEmit": false,
"outDir": "lib"
},
"references": [
{
"path": "./tsconfig.node.json"
}
]
}
11 changes: 11 additions & 0 deletions packages/common/doc-storage/tsconfig.node.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"extends": "../../../tsconfig.json",
"compilerOptions": {
"composite": true,
"module": "ESNext",
"moduleResolution": "Node",
"allowSyntheticDefaultImports": true,
"outDir": "lib",
"noEmit": false
}
}
Loading

0 comments on commit dacc81e

Please sign in to comment.