Skip to content

Commit

Permalink
feat(PrismaQueue): better job chaining, added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mgcrea committed May 30, 2024
1 parent 114e249 commit fcef7b7
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 29 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
"prettify": "prettier --write src/ test/",
"typecheck": "tsc --noEmit",
"spec": "DEBUG=prisma-queue,prisma-query vitest --run --pool=forks",
"dev": "DEBUG=prisma-queue,prisma-query vitest --watch --pool=forks --reporter=dot",
"dev": "DEBUG=prisma-queue vitest --watch --pool=forks --reporter=dot",
"test": "npm run lint && npm run prettycheck && npm run typecheck && npm run spec",
"prepare": "prisma generate",
"reset": "prisma db push --force-reset && prisma generate",
Expand Down
66 changes: 66 additions & 0 deletions src/PrismaQueue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { PrismaJob } from "src/PrismaJob";
import { debug, serializeError, waitFor } from "src/utils";
import {
createEmailQueue,
DEFAULT_POLL_INTERVAL,
prisma,
waitForNextEvent,
waitForNextJob,
Expand Down Expand Up @@ -148,6 +149,40 @@ describe("PrismaQueue", () => {
expect(record?.finishedAt).toBeNull();
expect(record?.error).toEqual(serializeError(error));
});
it("should properly dequeue multiple jobs in a row", async () => {
const JOB_WAIT = 50;
queue.worker = vi.fn(async (_job) => {
await waitFor(JOB_WAIT);
return { code: "200" };
});
await Promise.all([
queue.enqueue({ email: "[email protected]" }),
queue.enqueue({ email: "[email protected]" }),
]);
await waitFor(DEFAULT_POLL_INTERVAL + JOB_WAIT * 2 + 100);
expect(queue.worker).toHaveBeenCalledTimes(2);
expect(queue.worker).toHaveBeenNthCalledWith(2, expect.any(PrismaJob), expect.any(PrismaClient));
});
it("should properly handle multiple restarts", async () => {
const JOB_WAIT = 50;
await queue.stop();
queue.worker = vi.fn(async (_job) => {
await waitFor(JOB_WAIT);
return { code: "200" };
});
await Promise.all([
queue.enqueue({ email: "[email protected]" }),
queue.enqueue({ email: "[email protected]" }),
]);
queue.start();
expect(queue.worker).toHaveBeenCalledTimes(0);
await queue.stop();
queue.start();
await waitFor(10);
expect(queue.worker).toHaveBeenCalledTimes(1);
await waitFor(JOB_WAIT + 10);
expect(queue.worker).toHaveBeenCalledTimes(1);
});
afterAll(() => {
queue.stop();
});
Expand Down Expand Up @@ -246,6 +281,37 @@ describe("PrismaQueue", () => {
});
});

describe("maxConcurrency", () => {
let queue: PrismaQueue<JobPayload, JobResult>;
beforeAll(async () => {
queue = createEmailQueue({ maxConcurrency: 2 });
});
beforeEach(async () => {
await prisma.queueJob.deleteMany();
queue.start();
});
afterEach(async () => {
queue.stop();
});
it("should properly dequeue multiple jobs in a row according to maxConcurrency", async () => {
const JOB_WAIT = 100;
queue.worker = vi.fn(async (_job) => {
await waitFor(JOB_WAIT);
return { code: "200" };
});
await Promise.all([
queue.enqueue({ email: "[email protected]" }),
queue.enqueue({ email: "[email protected]" }),
]);
await waitFor(DEFAULT_POLL_INTERVAL + 100);
expect(queue.worker).toHaveBeenCalledTimes(2);
expect(queue.worker).toHaveBeenNthCalledWith(2, expect.any(PrismaJob), expect.any(PrismaClient));
});
afterAll(() => {
queue.stop();
});
});

describe("priority", () => {
let queue: PrismaQueue<JobPayload, JobResult>;
beforeAll(async () => {
Expand Down
74 changes: 48 additions & 26 deletions src/PrismaQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export interface PrismaQueue<T extends JobPayload = JobPayload, U extends JobRes

const DEFAULT_MAX_CONCURRENCY = 1;
const DEFAULT_POLL_INTERVAL = 10 * 1000;
const DEFAULT_JOB_INTERVAL = 25;
const DEFAULT_JOB_INTERVAL = 50;
const DEFAULT_DELETE_ON = "never";

// eslint-disable-next-line @typescript-eslint/no-unsafe-declaration-merging
Expand Down Expand Up @@ -123,15 +123,22 @@ export class PrismaQueue<
return this.#prisma[queueJobKey];
}

public async start(): Promise<void> {
debug(`start`, this.name);
public start(): void {
debug(`starting queue named="${this.name}"...`);
if (!this.stopped) {
debug(`queue named="${this.name}" is already running, skipping...`);
return;
}
this.stopped = false;
return this.poll();
this.poll();
}

public async stop(): Promise<void> {
debug(`stop`, this.name);
const { pollInterval } = this.config;
debug(`stopping queue named="${this.name}"...`);
this.stopped = true;
// Wait for the queue to stop
await waitFor(pollInterval);
}

public async enqueue(
Expand Down Expand Up @@ -185,11 +192,11 @@ export class PrismaQueue<
}

private async poll(): Promise<void> {
debug(`poll`, this.name);
const { maxConcurrency, pollInterval, jobInterval } = this.config;
debug(`polling queue named="${this.name}" with maxConcurrency=${maxConcurrency}...`);

while (!this.stopped) {
// Ensure that poll waits when no immediate jobs need processing.
// Wait for the queue to be ready
if (this.concurrency >= maxConcurrency) {
await waitFor(pollInterval);
continue;
Expand All @@ -201,23 +208,34 @@ export class PrismaQueue<
continue;
}

while (this.concurrency < maxConcurrency && estimatedQueueSize > 0) {
this.concurrency++;
setImmediate(() =>
this.dequeue()
.then((job) => {
if (!job) {
estimatedQueueSize = 0;
} else {
estimatedQueueSize--;
}
})
.catch((error) => this.emit("error", error))
.finally(() => {
this.concurrency--;
}),
);
await waitFor(jobInterval);
// Will loop until the queue is empty or stopped
while (estimatedQueueSize > 0 && !this.stopped) {
// Will loop until the concurrency limit is reached or stopped
while (this.concurrency < maxConcurrency && !this.stopped) {
// debug(`concurrency=${this.concurrency}, maxConcurrency=${maxConcurrency}`);
debug(`processing job from queue named="${this.name}"...`);
this.concurrency++;
setImmediate(() =>
this.dequeue()
.then((job) => {
if (job) {
debug(`dequeued job({id: ${job.id}, payload: ${JSON.stringify(job.payload)}})`);
estimatedQueueSize--;
} else {
// No more jobs to process
estimatedQueueSize = 0;
}
})
.catch((error) => {
this.emit("error", error);
})
.finally(() => {
this.concurrency--;
}),
);
await waitFor(jobInterval);
}
await waitFor(jobInterval * 2);
}
}
}
Expand All @@ -229,7 +247,7 @@ export class PrismaQueue<
if (this.stopped) {
return null;
}
debug(`dequeue`, this.name);
debug(`dequeuing from queue named="${this.name}"...`);
const { name: queueName } = this;
const { tableName: tableNameRaw, deleteOn, alignTimeZone } = this.config;
const tableName = escape(tableNameRaw);
Expand Down Expand Up @@ -262,7 +280,7 @@ export class PrismaQueue<
queueName,
);
if (!rows.length || !rows[0]) {
debug(`no job found to process`);
debug(`no jobs found in queue named="${this.name}"`);
// @NOTE Failed to acquire a lock
return null;
}
Expand All @@ -273,6 +291,7 @@ export class PrismaQueue<
assert(this.worker, "Missing queue worker to process job");
debug(`starting worker for job({id: ${id}, payload: ${JSON.stringify(payload)}})`);
result = await this.worker(job, this.#prisma);
debug(`finished worker for job({id: ${id}, payload: ${JSON.stringify(payload)}})`);
const date = new Date();
await job.update({ finishedAt: date, progress: 100, result, error: Prisma.DbNull });
this.emit("success", result, job);
Expand Down Expand Up @@ -310,6 +329,9 @@ export class PrismaQueue<
const { key, cron, payload, finishedAt } = job;
if (finishedAt && cron && key) {
// Schedule next cron
debug(
`scheduling next cron job({key: ${key}, cron: ${cron}}) with payload=${JSON.stringify(payload)}`,
);
await this.schedule({ key, cron }, payload);
}
}
Expand Down
22 changes: 20 additions & 2 deletions test/utils/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,31 @@ import { prisma } from "./client";
export type JobPayload = { email: string };
export type JobResult = { code: string };

const pollInterval = 500;
export const DEFAULT_POLL_INTERVAL = 500;
let globalQueueIndex = 0;

export const createEmailQueue = (
options: PrismaQueueOptions = {},
worker: JobWorker<JobPayload, JobResult> = async (_job) => {
return { code: "200" };
},
) => createQueue<JobPayload, JobResult>({ prisma, pollInterval, ...options }, worker);
) => {
const {
pollInterval = DEFAULT_POLL_INTERVAL,
name = `default-${globalQueueIndex}`,
...otherOptions
} = options;
globalQueueIndex++;
return createQueue<JobPayload, JobResult>(
{
prisma,
name,
pollInterval,
...otherOptions,
},
worker,
);
};

export const waitForNextJob = (queue: PrismaQueue<JobPayload, JobResult>) =>
waitForNextEvent(queue, "dequeue");
Expand Down

0 comments on commit fcef7b7

Please sign in to comment.