From cb3be46358b816bd51d713a4df02c6642cfa99b3 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 16:41:11 +0000 Subject: [PATCH 01/14] Make max migration number a variable --- __tests__/migrate.test.ts | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/__tests__/migrate.test.ts b/__tests__/migrate.test.ts index f9625115..eb61cc61 100644 --- a/__tests__/migrate.test.ts +++ b/__tests__/migrate.test.ts @@ -14,6 +14,8 @@ import { const options: WorkerSharedOptions = {}; +const MAX_MIGRATION_NUMBER = 18; + test("migration installs schema; second migration does no harm", async () => { await withPgClient(async (pgClient) => { await pgClient.query( @@ -40,7 +42,7 @@ test("migration installs schema; second migration does no harm", async () => { const { rows: migrationRows } = await pgClient.query( `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations order by id asc`, ); - expect(migrationRows).toHaveLength(18); + expect(migrationRows).toHaveLength(MAX_MIGRATION_NUMBER); const migration = migrationRows[0]; expect(migration.id).toEqual(1); @@ -90,8 +92,9 @@ test("multiple concurrent installs of the schema is fine", async () => { ); } } finally { - await Promise.allSettled(promises); + const results = await Promise.allSettled(promises); await Promise.allSettled(clients.map((c) => c.release())); + expect(results.every((r) => r.status === "fulfilled")).toBeTruthy(); } }); await withPgClient(async (pgClient) => { @@ -99,7 +102,7 @@ test("multiple concurrent installs of the schema is fine", async () => { const { rows: migrationRows } = await pgClient.query( `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations order by id asc`, ); - expect(migrationRows).toHaveLength(18); + expect(migrationRows).toHaveLength(MAX_MIGRATION_NUMBER); const migration = migrationRows[0]; expect(migration.id).toEqual(1); @@ -147,7 +150,7 @@ insert into ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations (id) values (1); const { rows: migrationRows } = await pgClient.query( `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations order by id asc`, ); - expect(migrationRows.length).toBeGreaterThanOrEqual(18); + expect(migrationRows.length).toBeGreaterThanOrEqual(MAX_MIGRATION_NUMBER); const migration2 = migrationRows[1]; expect(migration2.id).toEqual(2); expect(migration2.breaking).toEqual(false); @@ -208,7 +211,7 @@ test("aborts if database is more up to date than current worker", async () => { await expect( migrate(compiledSharedOptions, pgClient), ).rejects.toThrowErrorMatchingInlineSnapshot( - `"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision 18. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`, + `"Database is using Graphile Worker schema revision 999999 which includes breaking migration 999999, but the currently running worker only supports up to revision ${MAX_MIGRATION_NUMBER}. It would be unsafe to continue; please ensure all versions of Graphile Worker are compatible."`, ); }); }); From 95c138924d30106890c34e2888408749096f9fff Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 16:41:25 +0000 Subject: [PATCH 02/14] Ignore graphile-pro-worker folder --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 6bc380cd..0288b50b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ node_modules/ /tasks/ /rewired/ _LOCAL/ +/graphile-pro-worker From c7d0530d5df8ef7959549fc5fdc43973fa9f035b Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 16:41:32 +0000 Subject: [PATCH 03/14] Remove more warnings --- __tests__/runner.helpers.getTaskName.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/__tests__/runner.helpers.getTaskName.test.ts b/__tests__/runner.helpers.getTaskName.test.ts index 2af436b2..08acbd18 100644 --- a/__tests__/runner.helpers.getTaskName.test.ts +++ b/__tests__/runner.helpers.getTaskName.test.ts @@ -18,6 +18,8 @@ beforeAll(() => { connectionString: databaseDetails!.TEST_CONNECTION_STRING, max: JOB_COUNT * 2 + 5, }); + pgPool.on("error", () => {}); + pgPool.on("connect", () => {}); }); afterAll(() => { pgPool.end(); From 3ec4fe6767966bdda4fe41db6c75e321c4393309 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 16:55:56 +0000 Subject: [PATCH 04/14] Install latest graphile-config and use trict tsconfig --- package.json | 3 ++- tsconfig.json | 7 ++----- yarn.lock | 13 +++++++++---- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/package.json b/package.json index eb432422..b53dd921 100644 --- a/package.json +++ b/package.json @@ -49,10 +49,11 @@ "homepage": "https://github.com/graphile/worker#readme", "dependencies": { "@graphile/logger": "^0.2.0", + "@tsconfig/node18": "^18.2.4", "@types/debug": "^4.1.10", "@types/pg": "^8.10.5", "cosmiconfig": "^8.3.6", - "graphile-config": "^0.0.1-beta.4", + "graphile-config": "^0.0.1-beta.11", "json5": "^2.2.3", "pg": "^8.11.3", "tslib": "^2.6.2", diff --git a/tsconfig.json b/tsconfig.json index 87492f3e..8baff2c0 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -2,15 +2,13 @@ "ts-node": { "experimentalResolver": true }, + "extends": "@tsconfig/node18/tsconfig.json", "compilerOptions": { "rootDir": "src", "declarationDir": "./dist", "outDir": "./dist", "declaration": true, "allowJs": false, - "target": "es2020", - "module": "node16", - "moduleResolution": "node16", "sourceMap": true, "pretty": true, "importHelpers": true, @@ -20,8 +18,7 @@ "noFallthroughCasesInSwitch": true, "noUnusedParameters": false, "noUnusedLocals": false, - "preserveWatchOutput": true, - "lib": ["es2020", "dom"] + "preserveWatchOutput": true }, "include": ["src/**/*"], "exclude": ["node_modules", "dist"] diff --git a/yarn.lock b/yarn.lock index 8ea65ee4..d81fd84d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2326,6 +2326,11 @@ resolved "https://registry.yarnpkg.com/@tsconfig/node16/-/node16-1.0.4.tgz#0b92dcc0cc1c81f6f306a381f28e31b1a56536e9" integrity sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA== +"@tsconfig/node18@^18.2.4": + version "18.2.4" + resolved "https://registry.yarnpkg.com/@tsconfig/node18/-/node18-18.2.4.tgz#094efbdd70f697d37c09f34067bf41bc4a828ae3" + integrity sha512-5xxU8vVs9/FNcvm3gE07fPbn9tl6tqGGWA9tSlwsUEkBxtRnTsNmwrV8gasZ9F/EobaSv9+nu8AxUKccw77JpQ== + "@types/babel__core@^7.0.0", "@types/babel__core@^7.1.7": version "7.20.2" resolved "https://registry.yarnpkg.com/@types/babel__core/-/babel__core-7.20.2.tgz#215db4f4a35d710256579784a548907237728756" @@ -6350,10 +6355,10 @@ graphemer@^1.4.0: resolved "https://registry.yarnpkg.com/graphemer/-/graphemer-1.4.0.tgz#fb2f1d55e0e3a1849aeffc90c4fa0dd53a0e66c6" integrity sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag== -graphile-config@^0.0.1-beta.4: - version "0.0.1-beta.4" - resolved "https://registry.yarnpkg.com/graphile-config/-/graphile-config-0.0.1-beta.4.tgz#9c5d2b018a628c7ed931134e4258adb8aa3e94f0" - integrity sha512-9JWXMcrHNHMGMHdlzoOR75OwmvTxYdquYYkx0zoHol0Tkh8KcDxWdF9+tQ+l2hBjazMAI6R14jztRK4EM5bY3A== +graphile-config@^0.0.1-beta.4, graphile-config@^0.0.1-beta.11: + version "0.0.1-beta.11" + resolved "https://registry.yarnpkg.com/graphile-config/-/graphile-config-0.0.1-beta.11.tgz#4bd2ffd1fee6834f2e5dedc64016e7a3a9eda151" + integrity sha512-+2QLPpihQQvSYd6sSXcDrwHMMSygUrK41qWhak7u3vsXj2AGwVwl+kVvlBwuoovaoUPDsGF8zy5IevTAMgzg5Q== dependencies: "@types/interpret" "^1.1.1" "@types/node" "^20.5.7" From f230a96c316e48205f12ff82662658556fd1002d Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 16:56:12 +0000 Subject: [PATCH 05/14] Fix cron references --- src/cron.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/cron.ts b/src/cron.ts index 471d0fce..0017cfcd 100644 --- a/src/cron.ts +++ b/src/cron.ts @@ -333,19 +333,19 @@ export const runCron = ( } const start = new Date(); - events.emit("cron:starting", { cron: this, start }); + events.emit("cron:starting", { cron, start }); // We must backfill BEFORE scheduling any new jobs otherwise backfill won't // work due to known_crontabs.last_execution having been updated. await registerAndBackfillItems( - { pgPool, events, cron: this }, + { pgPool, events, cron }, escapedWorkerSchema, parsedCronItems, new Date(+start), useNodeTime, ); - events.emit("cron:started", { cron: this, start }); + events.emit("cron:started", { cron, start }); if (!cron._active) { return stop(); @@ -406,7 +406,7 @@ export const runCron = ( }, ); events.emit("cron:prematureTimer", { - cron: this, + cron, currentTimestamp, expectedTimestamp, }); @@ -422,7 +422,7 @@ export const runCron = ( )}s behind)`, ); events.emit("cron:overdueTimer", { - cron: this, + cron, currentTimestamp, expectedTimestamp, }); @@ -444,7 +444,7 @@ export const runCron = ( // Finally actually run the jobs. if (jobsAndIdentifiers.length) { events.emit("cron:schedule", { - cron: this, + cron, timestamp: expectedTimestamp, jobsAndIdentifiers, }); @@ -456,7 +456,7 @@ export const runCron = ( useNodeTime, ); events.emit("cron:scheduled", { - cron: this, + cron, timestamp: expectedTimestamp, jobsAndIdentifiers, }); From 737e590af5f9dae91c9e1fd56c3600d6d62726a7 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 16:58:25 +0000 Subject: [PATCH 06/14] Coerce errors --- __tests__/runner.runOnce.test.ts | 4 +++- src/cron.ts | 9 +++++++-- src/crontab.ts | 5 ++++- src/lib.ts | 17 +++++++++++++++-- src/main.ts | 25 ++++++++++++++++--------- src/migrate.ts | 16 ++++++++++++---- src/plugins/LoadTaskFromJsPlugin.ts | 5 ++++- src/runner.ts | 5 ++++- src/worker.ts | 12 +++++++----- 9 files changed, 72 insertions(+), 26 deletions(-) diff --git a/__tests__/runner.runOnce.test.ts b/__tests__/runner.runOnce.test.ts index 089c0587..48e08cd1 100644 --- a/__tests__/runner.runOnce.test.ts +++ b/__tests__/runner.runOnce.test.ts @@ -2,6 +2,7 @@ import { Pool } from "pg"; import { makeWorkerPresetWorkerOptions } from "../src/config"; import { Job, RunnerOptions, WorkerUtils } from "../src/interfaces"; +import { coerceError } from "../src/lib"; import { _allWorkerPools } from "../src/main"; import { WorkerPreset } from "../src/preset"; import { runOnce } from "../src/runner"; @@ -55,7 +56,8 @@ async function runOnceErrorAssertion( expect.assertions(1); try { await runOnce(options); - } catch (e) { + } catch (rawE) { + const e = coerceError(rawE); expect(e.message).toMatch(message); } } diff --git a/src/cron.ts b/src/cron.ts index 0017cfcd..4b6cfe4c 100644 --- a/src/cron.ts +++ b/src/cron.ts @@ -16,7 +16,12 @@ import { TimestampDigest, WorkerEvents, } from "./interfaces"; -import { CompiledOptions, CompiledSharedOptions, Releasers } from "./lib"; +import { + coerceError, + CompiledOptions, + CompiledSharedOptions, + Releasers, +} from "./lib"; interface CronRequirements { pgPool: Pool; @@ -475,7 +480,7 @@ export const runCron = ( } catch (e) { // If something goes wrong; abort. The calling code should re-schedule // which will re-trigger the backfilling code. - return stop(e); + return stop(coerceError(e)); } } diff --git a/src/crontab.ts b/src/crontab.ts index 8da02727..3cdb2bc8 100644 --- a/src/crontab.ts +++ b/src/crontab.ts @@ -21,6 +21,7 @@ import { CronItemOptions, ParsedCronItem, } from "./interfaces"; +import { coerceError } from "./lib"; /** * Returns a period of time in milliseconds representing the time phrase given. @@ -179,7 +180,9 @@ const parseCrontabPayload = ( return JSON5.parse(payloadString); } catch (e) { throw new Error( - `Failed to parse JSON5 payload on line ${lineNumber} of crontab: ${e.message}`, + `Failed to parse JSON5 payload on line ${lineNumber} of crontab: ${ + coerceError(e).message + }`, ); } }; diff --git a/src/lib.ts b/src/lib.ts index e4e94ef6..1938d5b9 100644 --- a/src/lib.ts +++ b/src/lib.ts @@ -400,7 +400,7 @@ export async function withReleasers( try { await releasers[i](); } catch (e) { - firstError = firstError || e; + firstError ??= coerceError(e); } } if (firstError) { @@ -530,7 +530,8 @@ export function makeEnhancedWithPgClient( for (let attempts = 0; attempts < MAX_RETRIES; attempts++) { try { return await withPgClient(...args); - } catch (e) { + } catch (rawE) { + const e = coerceError(rawE); const retryable = RETRYABLE_ERROR_CODES.find( ({ code }) => code === e.code, ); @@ -551,3 +552,15 @@ export function makeEnhancedWithPgClient( export const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +export function coerceError(err: unknown): Error & { code?: unknown } { + if (err instanceof Error) { + return err; + } else { + const message = + typeof err === "object" && err !== null && "message" in err + ? String(err.message) + : "An error occurred"; + return new Error(message, { cause: err }); + } +} diff --git a/src/main.ts b/src/main.ts index 5b29a0b8..7daad6c8 100644 --- a/src/main.ts +++ b/src/main.ts @@ -20,6 +20,7 @@ import { WorkerPoolOptions, } from "./interfaces"; import { + coerceError, CompiledSharedOptions, makeEnhancedWithPgClient, processSharedOptions, @@ -285,7 +286,9 @@ export function runTaskListInternal( await changeListener.release(); } catch (e) { logger.error( - `Error occurred whilst releasing listening client: ${e.message}`, + `Error occurred whilst releasing listening client: ${ + coerceError(e).message + }`, { error: e }, ); } @@ -413,9 +416,10 @@ export function runTaskListInternal( try { release(); } catch (e) { - logger.error(`Error occurred releasing client: ${e.stack}`, { - error: e, - }); + logger.error( + `Error occurred releasing client: ${coerceError(e).stack}`, + { error: e }, + ); } reconnectWithExponentialBackoff(e); @@ -714,10 +718,11 @@ export function _runTaskList( workerPool, error: e, }); - logger.error(`Error occurred during graceful shutdown: ${e.message}`, { + const message = coerceError(e).message; + logger.error(`Error occurred during graceful shutdown: ${message}`, { error: e, }); - return this.forcefulShutdown(e.message); + return this.forcefulShutdown(message); } terminate(); }, @@ -790,9 +795,11 @@ export function _runTaskList( workerPool, error: e, }); - logger.error(`Error occurred during forceful shutdown: ${e.message}`, { - error: e, - }); + const error = coerceError(e); + logger.error( + `Error occurred during forceful shutdown: ${error.message}`, + { error: e }, + ); } terminate(); }, diff --git a/src/migrate.ts b/src/migrate.ts index 8eb87fa3..0c3bf94a 100644 --- a/src/migrate.ts +++ b/src/migrate.ts @@ -2,7 +2,12 @@ import { PoolClient } from "pg"; import { migrations } from "./generated/sql"; import { WorkerSharedOptions, Writeable } from "./interfaces"; -import { BREAKING_MIGRATIONS, CompiledSharedOptions, sleep } from "./lib"; +import { + BREAKING_MIGRATIONS, + coerceError, + CompiledSharedOptions, + sleep, +} from "./lib"; function checkPostgresVersion(versionString: string) { const postgresVersion = parseInt(versionString, 10); @@ -86,7 +91,8 @@ export async function runMigration( JSON.stringify({ migrationNumber, breaking }), ]); await event.client.query("commit"); - } catch (error) { + } catch (rawError) { + const error = coerceError(rawError); await event.client.query("rollback"); await hooks.process("migrationError", { ...event, error }); if (!migrationInsertComplete && error.code === "23505") { @@ -128,12 +134,14 @@ export async function migrate( latestBreakingMigration = row.biggest_breaking_id; event.postgresVersion = checkPostgresVersion(row.server_version_num); break; - } catch (e) { + } catch (rawE) { + const e = coerceError(rawE); if (attempts === 0 && (e.code === "42P01" || e.code === "42703")) { try { await installSchema(compiledSharedOptions, event); break; - } catch (e2) { + } catch (rawE2) { + const e2 = coerceError(rawE2); if (e2.code === "23505") { // Another instance installed this concurrently? Go around again. } else { diff --git a/src/plugins/LoadTaskFromJsPlugin.ts b/src/plugins/LoadTaskFromJsPlugin.ts index 75b8155e..64b1e124 100644 --- a/src/plugins/LoadTaskFromJsPlugin.ts +++ b/src/plugins/LoadTaskFromJsPlugin.ts @@ -2,6 +2,7 @@ import { GraphileConfig } from "graphile-config"; import { pathToFileURL } from "url"; import { FileDetails, isValidTask } from "../index.js"; +import { coerceError } from "../lib.js"; import { version } from "../version.js"; const DEFAULT_EXTENSIONS = [".js", ".mjs", ".cjs"]; @@ -74,7 +75,9 @@ export const LoadTaskFromJsPlugin: GraphileConfig.Plugin = { ); } } catch (error) { - const message = `Error processing '${jsFile.fullPath}': ${error.message}`; + const message = `Error processing '${jsFile.fullPath}': ${ + coerceError(error).message + }`; throw new Error(message); } }, diff --git a/src/runner.ts b/src/runner.ts index d9c582cc..9274a878 100644 --- a/src/runner.ts +++ b/src/runner.ts @@ -8,6 +8,7 @@ import { TaskList, } from "./interfaces"; import { + coerceError, CompiledOptions, getUtilsAndReleasersFromOptions, Releasers, @@ -177,7 +178,9 @@ function buildRunner(input: { await Promise.all(promises).then(release); } catch (error) { logger.error( - `Error occurred whilst attempting to release runner options: ${error.message}`, + `Error occurred whilst attempting to release runner options: ${ + coerceError(error).message + }`, { error }, ); } diff --git a/src/worker.ts b/src/worker.ts index ecb5043e..c8dc3cd4 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -13,7 +13,7 @@ import { WorkerPool, WorkerSharedOptions, } from "./interfaces"; -import { CompiledSharedOptions } from "./lib"; +import { coerceError, CompiledSharedOptions } from "./lib"; import { completeJob } from "./sql/completeJob"; import { failJob } from "./sql/failJob"; @@ -182,7 +182,8 @@ export function makeNewWorker( } else { events.emit("worker:getJob:empty", { worker }); } - } catch (err) { + } catch (rawErr) { + const err = coerceError(rawErr); events.emit("worker:getJob:error", { worker, error: err }); if (continuous) { contiguousErrors++; @@ -250,7 +251,7 @@ export function makeNewWorker( }); result = await task(job.payload, helpers); } catch (error) { - err = error; + err = coerceError(error); } const durationRaw = process.hrtime(startTimestamp); const duration = durationRaw[0] * 1e3 + durationRaw[1] * 1e-6; @@ -393,11 +394,12 @@ export function makeNewWorker( } const when = err ? `after failure '${err.message}'` : "after success"; + const coerced = coerceError(fatalError); logger.error( - `Failed to release job '${job.id}' ${when}; committing seppuku\n${fatalError.message}`, + `Failed to release job '${job.id}' ${when}; committing seppuku\n${coerced.message}`, { fatalError, job }, ); - workerDeferred.reject(fatalError); + workerDeferred.reject(coerced); release(); return; } finally { From 9b1f4c54092c487b7e11f21e678ced2e289a36b2 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 16:58:39 +0000 Subject: [PATCH 07/14] Fix defer() implementation --- src/deferred.ts | 6 +++--- src/main.ts | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/deferred.ts b/src/deferred.ts index 2f8240a0..557f8278 100644 --- a/src/deferred.ts +++ b/src/deferred.ts @@ -4,11 +4,11 @@ export interface Deferred extends Promise { } export default function defer(): Deferred { - let resolve: (result?: T | PromiseLike) => void; - let reject: (error: Error) => void; + let resolve: Deferred["resolve"]; + let reject: Deferred["reject"]; return Object.assign( new Promise((_resolve, _reject) => { - resolve = _resolve; + resolve = _resolve as Deferred["resolve"]; reject = _reject; }), // @ts-ignore error TS2454: Variable 'resolve' is used before being assigned. diff --git a/src/main.ts b/src/main.ts index 7daad6c8..1e51bf86 100644 --- a/src/main.ts +++ b/src/main.ts @@ -3,7 +3,7 @@ import { EventEmitter } from "events"; import { Notification, Pool, PoolClient } from "pg"; import { inspect } from "util"; -import deferred from "./deferred"; +import defer from "./deferred"; import { makeWithPgClientFromClient, makeWithPgClientFromPool, @@ -561,7 +561,7 @@ export function _runTaskList( unregisterSignalHandlers = registerSignalHandlers(logger, events); } - const promise = deferred(); + const promise = defer(); function deactivate() { if (workerPool._active) { From c277ed5184849b72dacb04c1a9bdfb72c0f0694f Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 17:00:25 +0000 Subject: [PATCH 08/14] Tweak logging --- src/main.ts | 19 +++++++++++-------- src/worker.ts | 4 +++- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/main.ts b/src/main.ts index 1e51bf86..c3613ffd 100644 --- a/src/main.ts +++ b/src/main.ts @@ -35,6 +35,7 @@ import { makeNewWorker } from "./worker"; const ENABLE_DANGEROUS_LOGS = process.env.GRAPHILE_ENABLE_DANGEROUS_LOGS === "1"; +const NO_LOG_SUCCESS = !!process.env.NO_LOG_SUCCESS; // Wait at most 60 seconds between connection attempts for LISTEN. const MAX_DELAY = 60 * 1000; @@ -153,7 +154,7 @@ function _reallyRegisterSignalHandlers(logger: Logger) { _shuttingDownGracefully = true; } - logger.error( + logger.info( `Received '${signal}'; attempting global graceful shutdown... (all termination signals will be ignored for the next 5 seconds)`, ); const switchTimeout = setTimeout(switchToForcefulHandler, 5000); @@ -167,7 +168,7 @@ function _reallyRegisterSignalHandlers(logger: Logger) { clearTimeout(switchTimeout); process.removeListener(signal, gracefulHandler); if (!_shuttingDownForcefully) { - logger.error( + logger.info( `Global graceful shutdown complete; killing self via ${signal}`, ); process.kill(process.pid, signal); @@ -217,7 +218,7 @@ function _reallyRegisterSignalHandlers(logger: Logger) { process.stderr.on("error", stdioErrorHandler); _releaseSignalHandlers = () => { if (_shuttingDownGracefully || _shuttingDownForcefully) { - logger.warn(`Not unregistering signal handlers as we're shutting down`); + logger.debug(`Not unregistering signal handlers as we're shutting down`); return; } @@ -502,11 +503,13 @@ export function runTaskListInternal( const supportedTaskNames = Object.keys(tasks); - logger.info( - `Worker connected and looking for jobs... (task names: '${supportedTaskNames.join( - "', '", - )}')`, - ); + if (!NO_LOG_SUCCESS) { + logger.info( + `Worker connected and looking for jobs... (task names: '${supportedTaskNames.join( + "', '", + )}')`, + ); + } }; // Create a client dedicated to listening for new jobs. diff --git a/src/worker.ts b/src/worker.ts index c8dc3cd4..a3eeea8e 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -17,6 +17,8 @@ import { coerceError, CompiledSharedOptions } from "./lib"; import { completeJob } from "./sql/completeJob"; import { failJob } from "./sql/failJob"; +const NO_LOG_SUCCESS = !!process.env.NO_LOG_SUCCESS; + export function makeNewWorker( compiledSharedOptions: CompiledSharedOptions, params: { @@ -359,7 +361,7 @@ export function makeNewWorker( "Error occurred in event emitter for 'job:success'; this is an issue in your application code and you should fix it", ); } - if (!process.env.NO_LOG_SUCCESS) { + if (!NO_LOG_SUCCESS) { logger.info( `Completed task ${job.id} (${ job.task_identifier From 05db46215beae19272efd6b4d631b79ce5f03de0 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 17:00:39 +0000 Subject: [PATCH 09/14] Cannot guarantee that e exists --- src/worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/worker.ts b/src/worker.ts index a3eeea8e..cda0837f 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -284,7 +284,7 @@ export function makeNewWorker( // Create a "partial" error for the batch err = new Error( `Batch failures:\n${batchJobErrors - .map((e) => (e as Error).message ?? String(e)) + .map((e) => (e as Error)?.message ?? String(e)) .join("\n")}`, ); } From 8af9dfbdfdaed809ce356f43f12a3c0b14fa41d3 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 17:01:10 +0000 Subject: [PATCH 10/14] Client is not guaranteed to exist --- src/interfaces.ts | 1 - src/main.ts | 14 ++++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/interfaces.ts b/src/interfaces.ts index 4b67592f..47775398 100644 --- a/src/interfaces.ts +++ b/src/interfaces.ts @@ -908,7 +908,6 @@ export type WorkerEventMap = { "pool:listen:error": { workerPool: WorkerPool; error: unknown; - client: PoolClient; }; /** diff --git a/src/main.ts b/src/main.ts index c3613ffd..523a6608 100644 --- a/src/main.ts +++ b/src/main.ts @@ -363,7 +363,7 @@ export function runTaskListInternal( const listenForChanges = ( err: Error | undefined, - client: PoolClient, + maybeClient: PoolClient | undefined, releaseClient: () => void, ) => { if (!workerPool._active) { @@ -373,7 +373,7 @@ export function runTaskListInternal( } const reconnectWithExponentialBackoff = (err: Error) => { - events.emit("pool:listen:error", { workerPool, client, error: err }); + events.emit("pool:listen:error", { workerPool, error: err }); attempts++; @@ -400,11 +400,17 @@ export function runTaskListInternal( }, delay); }; - if (err) { + if (err || !maybeClient) { // Try again - reconnectWithExponentialBackoff(err); + reconnectWithExponentialBackoff( + err ?? + new Error( + `This should never happen, this error only exists to satisfy TypeScript`, + ), + ); return; } + const client = maybeClient; //---------------------------------------- From 31c36f5ac468b3dee5fe8844e456c1f12b08d107 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 17:01:29 +0000 Subject: [PATCH 11/14] Refactor to async function --- src/main.ts | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main.ts b/src/main.ts index 523a6608..ccc653b2 100644 --- a/src/main.ts +++ b/src/main.ts @@ -474,22 +474,24 @@ export function runTaskListInternal( } } - function release() { + async function release() { // No need to call changeListener.release() because the client errored changeListener = null; client.removeListener("notification", handleNotification); // TODO: ideally we'd only stop handling errors once all pending queries are complete; but either way we shouldn't try again! client.removeListener("error", onErrorReleaseClientAndTryAgain); events.emit("pool:listen:release", { workerPool, client }); - return client - .query('UNLISTEN "jobs:insert"; UNLISTEN "worker:migrate";') - .catch((error) => { - /* ignore errors */ - logger.error(`Error occurred attempting to UNLISTEN: ${error}`, { - error, - }); - }) - .then(() => releaseClient()); + try { + await client.query( + 'UNLISTEN "jobs:insert"; UNLISTEN "worker:migrate";', + ); + } catch (error) { + /* ignore errors */ + logger.error(`Error occurred attempting to UNLISTEN: ${error}`, { + error, + }); + } + return releaseClient(); } // On error, release this client and try again From 578d45f7cc2b230c3c7df6528ec0c18a8259d443 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 17:01:37 +0000 Subject: [PATCH 12/14] Remove indentation --- src/migrate.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/migrate.ts b/src/migrate.ts index 0c3bf94a..dfa544fc 100644 --- a/src/migrate.ts +++ b/src/migrate.ts @@ -125,9 +125,11 @@ export async function migrate( const { rows: [row], } = await event.client.query( - `select current_setting('server_version_num') as server_version_num, - (select id from ${escapedWorkerSchema}.migrations order by id desc limit 1) as id, - (select id from ${escapedWorkerSchema}.migrations where breaking is true order by id desc limit 1) as biggest_breaking_id;`, + `\ +select current_setting('server_version_num') as server_version_num, +(select id from ${escapedWorkerSchema}.migrations order by id desc limit 1) as id, +(select id from ${escapedWorkerSchema}.migrations where breaking is true order by id desc limit 1) as biggest_breaking_id; +`, ); latestMigration = row.id; From bcf9a875c93cf2a5757027d4700e7c13d8856ebe Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 17:01:45 +0000 Subject: [PATCH 13/14] Add isPromiseLike helper --- src/lib.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/lib.ts b/src/lib.ts index 1938d5b9..cf503498 100644 --- a/src/lib.ts +++ b/src/lib.ts @@ -564,3 +564,8 @@ export function coerceError(err: unknown): Error & { code?: unknown } { return new Error(message, { cause: err }); } } + +export function isPromiseLike(v: PromiseLike | T): v is PromiseLike { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return v != null && typeof (v as any).then === "function"; +} From cef1de14464c8f6a8418bd37575cbfd474f387e7 Mon Sep 17 00:00:00 2001 From: Benjie Gillam Date: Fri, 22 Nov 2024 17:04:53 +0000 Subject: [PATCH 14/14] Use more helpful pool-lacking-error-handlers errors --- src/lib.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/lib.ts b/src/lib.ts index cf503498..e79bc8b1 100644 --- a/src/lib.ts +++ b/src/lib.ts @@ -300,7 +300,17 @@ export async function assertPool( pgPool = _rawOptions.pgPool; if (pgPool.listeners("error").length === 0) { console.warn( - `Your pool doesn't have error handlers! See: https://err.red/wpeh`, + `Your pool doesn't have error handlers! See: https://err.red/wpeh?v=${encodeURIComponent( + version, + )}`, + ); + installErrorHandlers(compiledSharedOptions, releasers, pgPool); + } + if (pgPool.listeners("connect").length === 0) { + console.warn( + `Your pool doesn't have all of the error handlers! See: https://err.red/wpeh?v=${encodeURIComponent( + version, + )}&method=connect`, ); installErrorHandlers(compiledSharedOptions, releasers, pgPool); }