Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: prepare custom runtimes #98

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions src/common.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { MessagePort, TransferListItem } from 'node:worker_threads'
import { type MessagePort } from 'node:worker_threads'

/** Channel for communicating between main thread and workers */
export interface TinypoolChannel {
Expand All @@ -9,6 +9,9 @@ export interface TinypoolChannel {
postMessage(message: any): void
}

// TODO: Narrow down with generic
type Listener = (...args: any[]) => void

export interface TinypoolWorker {
runtime: string
initialize(options: {
Expand All @@ -19,12 +22,31 @@ export interface TinypoolWorker {
workerData: TinypoolData
trackUnmanagedFds?: boolean
}): void

/** Terminates the worker */
terminate(): Promise<any>
postMessage(message: any, transferListItem?: TransferListItem[]): void

/** Initialize the worker */
initializeWorker(message: StartupMessage): void

/** Run given task on worker */
runTask(message: RequestMessage): void

/** Listen on task finish messages */
onTaskFinished(message: Listener): void

/** Listen on ready messages */
onReady(listener: Listener): void

/** Listen on errors */
onError(listener: Listener): void

/** Listen on exit. Called only **once**. */
onExit(listener: Listener): void

/** Set's channel for 'main <-> worker' communication */
setChannel?: (channel: TinypoolChannel) => void
on(event: string, listener: (...args: any[]) => void): void
once(event: string, listener: (...args: any[]) => void): void
emit(event: string, ...data: any[]): void

ref?: () => void
unref?: () => void
threadId: number
Expand All @@ -45,7 +67,7 @@ export interface TinypoolWorkerMessage<
export interface StartupMessage {
filename: string | null
name: string
port: MessagePort
port?: MessagePort
sharedBuffer: Int32Array
useAtomics: boolean
}
Expand All @@ -55,6 +77,7 @@ export interface RequestMessage {
task: any
filename: string
name: string
transferList?: any
}

export interface ReadyMessage {
Expand Down
4 changes: 4 additions & 0 deletions src/entry/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ parentPort!.on('message', (message: StartupMessage) => {

const { port, sharedBuffer, filename, name } = message

if (!port) {
throw new Error(`Missing port ${JSON.stringify(message)}`)
}

;(async function () {
if (filename !== null) {
await getHandler(filename, name)
Expand Down
92 changes: 27 additions & 65 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
import {
MessageChannel,
type MessagePort,
receiveMessageOnPort,
} from 'node:worker_threads'
import { once, EventEmitterAsyncResource } from 'node:events'
import { type MessagePort } from 'node:worker_threads'
import { EventEmitterAsyncResource } from 'node:events'
import { AsyncResource } from 'node:async_hooks'
import { fileURLToPath, URL } from 'node:url'
import { join } from 'node:path'
Expand All @@ -13,7 +9,6 @@ import { performance } from 'node:perf_hooks'
import { readFileSync } from 'node:fs'
import { amount as physicalCpuCount } from './physicalCpuCount'
import {
type ReadyMessage,
type RequestMessage,
type ResponseMessage,
type StartupMessage,
Expand Down Expand Up @@ -448,7 +443,6 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
freeWorkerId: () => void
taskInfos: Map<number, TaskInfo>
idleTimeout: NodeJS.Timeout | null = null
port: MessagePort
sharedBuffer: Int32Array
lastSeenResponseCount: number = 0
usedMemory?: number
Expand All @@ -457,7 +451,6 @@ class WorkerInfo extends AsynchronouslyCreatedResource {

constructor(
worker: TinypoolWorker,
port: MessagePort,
workerId: number,
freeWorkerId: () => void,
onMessage: ResponseCallback
Expand All @@ -466,8 +459,7 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
this.worker = worker
this.workerId = workerId
this.freeWorkerId = freeWorkerId
this.port = port
this.port.on('message', (message: ResponseMessage) =>
this.worker.onTaskFinished((message: ResponseMessage) =>
this._handleResponse(message)
)
this.onMessage = onMessage
Expand Down Expand Up @@ -498,7 +490,6 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
clearTimeout(timer)
}

this.port.close()
this.clearIdleTimeout()
for (const taskInfo of this.taskInfos.values()) {
taskInfo.done(Errors.ThreadTermination())
Expand All @@ -518,26 +509,14 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
}
}

ref(): WorkerInfo {
this.port.ref()
return this
}

unref(): WorkerInfo {
// Note: Do not call ref()/unref() on the Worker itself since that may cause
// a hard crash, see https://github.com/nodejs/node/pull/33394.
this.port.unref()
return this
}

_handleResponse(message: ResponseMessage): void {
this.usedMemory = message.usedMemory
this.onMessage(message)

if (this.taskInfos.size === 0) {
// No more tasks running on this Worker means it should not keep the
// process running.
this.unref()
//this.unref()
}
}

Expand All @@ -548,13 +527,14 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
taskId: taskInfo.taskId,
filename: taskInfo.filename,
name: taskInfo.name,
transferList: taskInfo.transferList,
}

try {
if (taskInfo.channel) {
this.worker.setChannel?.(taskInfo.channel)
}
this.port.postMessage(message, taskInfo.transferList)
this.worker.runTask(message)
} catch (err) {
// This would mostly happen if e.g. message contains unserializable data
// or transferList is invalid.
Expand All @@ -564,7 +544,6 @@ class WorkerInfo extends AsynchronouslyCreatedResource {

taskInfo.workerInfo = this
this.taskInfos.set(taskInfo.taskId, taskInfo)
this.ref()
this.clearIdleTimeout()

// Inform the worker that there are new messages posted, and wake it up
Expand All @@ -586,10 +565,10 @@ class WorkerInfo extends AsynchronouslyCreatedResource {
if (actualResponseCount !== this.lastSeenResponseCount) {
this.lastSeenResponseCount = actualResponseCount

let entry
while ((entry = receiveMessageOnPort(this.port)) !== undefined) {
this._handleResponse(entry.message)
}
// TODO let entry
// while ((entry = receiveMessageOnPort(this.port)) !== undefined) {
// this._handleResponse(entry.message)
// }
}
}

Expand Down Expand Up @@ -694,7 +673,7 @@ class ThreadPool {
})
const tinypoolPrivateData = { workerId: workerId! }

const worker =
const worker: TinypoolWorker =
this.options.runtime === 'child_process'
? new ProcessWorker()
: new ThreadWorker()
Expand Down Expand Up @@ -737,10 +716,8 @@ class ThreadPool {
this._processPendingMessages()
}

const { port1, port2 } = new MessageChannel()
const workerInfo = new WorkerInfo(
worker,
port1,
workerId!,
() => workerIds.set(workerId, true),
onMessage
Expand All @@ -754,32 +731,23 @@ class ThreadPool {
const message: StartupMessage = {
filename: this.options.filename,
name: this.options.name,
port: port2,
sharedBuffer: workerInfo.sharedBuffer,
useAtomics: this.options.useAtomics,
}

worker.postMessage(message, [port2])

worker.on('message', (message: ReadyMessage) => {
if (message.ready === true) {
if (workerInfo.currentUsage() === 0) {
workerInfo.unref()
}
worker.initializeWorker(message)

if (!workerInfo.isReady()) {
workerInfo.markAsReady()
}
return
worker.onReady(() => {
if (workerInfo.currentUsage() === 0) {
worker.unref?.()
}

worker.emit(
'error',
new Error(`Unexpected message on Worker: ${inspect(message)}`)
)
if (!workerInfo.isReady()) {
workerInfo.markAsReady()
}
})

worker.on('error', (err: Error) => {
worker.onError((err: Error) => {
// Work around the bug in https://github.com/nodejs/node/pull/33394
worker.ref = () => {}

Expand Down Expand Up @@ -809,13 +777,7 @@ class ThreadPool {
}
})

worker.unref()
port1.on('close', () => {
// The port is only closed if the Worker stops for some reason, but we
// always .unref() the Worker itself. We want to receive e.g. 'error'
// events on it, so we ref it once we know it's going to exit anyway.
worker.ref()
})
worker.unref?.()

this.workers.add(workerInfo)
}
Expand Down Expand Up @@ -1056,13 +1018,14 @@ class ThreadPool {
taskInfo.done(new Error('Terminating worker thread'))
}

const exitEvents: Promise<any[]>[] = []
const exitEvents: Promise<void>[] = []
while (this.workers.size > 0) {
const [workerInfo] = this.workers
// @ts-expect-error -- TODO Fix
exitEvents.push(once(workerInfo.worker, 'exit'))
// @ts-expect-error -- TODO Fix
void this._removeWorker(workerInfo)

if (workerInfo) {
exitEvents.push(new Promise((r) => workerInfo.worker.onExit(r)))
void this._removeWorker(workerInfo)
}
}

await Promise.all(exitEvents)
Expand All @@ -1087,8 +1050,7 @@ class ThreadPool {
Array.from(this.workers).filter((workerInfo) => {
// Remove idle workers
if (workerInfo.currentUsage() === 0) {
// @ts-expect-error -- TODO Fix
exitEvents.push(once(workerInfo.worker, 'exit'))
exitEvents.push(new Promise((r) => workerInfo.worker.onExit(r)))
void this._removeWorker(workerInfo)
}
// Mark on-going workers for recycling.
Expand Down
Loading
Loading