Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipe() allow destroying source! #55010

Closed
MarcWeber opened this issue Sep 19, 2024 · 2 comments
Closed

pipe() allow destroying source! #55010

MarcWeber opened this issue Sep 19, 2024 · 2 comments
Labels
feature request Issues that request new features to be added to Node.js. stream Issues and PRs related to the stream subsystem.

Comments

@MarcWeber
Copy link

MarcWeber commented Sep 19, 2024

What is the problem this feature will solve?

node 20.15.1. The idea is to pass around a stream without having to track the sources and being able to destroy it. So the transformer kinda starts owning control of end of life of it's source. Example use case: Implemet tail -f and abort after 5 lines. Then reading bytes is the source, and turning into lines is a transformer. The consumer (who wants to find a mysqld ready for connections in the log) should not care about the tail -f implementation.

stream = createSource().pipe(transformer)
stream.destroy()

should destroy the source, too.
This doesn't happen.

complete test case

// this file tests that using pipeDestroySource actually destroys
// the reader while standard .pipe() doesn't ?
import {Readable, Writable, Transform} from "node:stream";

const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms))

export const pipeDestroySource = (source:  Readable,  destination: Writable) => {
    // unpipe or close might work
    destination.on('unpipe', (e) => { if (e == source) source.destroy()})
    return source.pipe(destination)
}

export type OnDestroy = () => void

class MyReadStream extends Readable {

  constructor(private ondestroy: OnDestroy) {
    super({autoDestroy: true});
  }

  _read() {
        this.push(Buffer.from("A\n"))
  }
  _destroy(err: any, callback: any) {
        console.log("MyReadStream _destroy");
    this.ondestroy()
    callback(err);
  }
} 


export class MyTransform extends Transform {

  constructor(private ondestroy: OnDestroy) {
    super({objectMode: true, autoDestroy: true});
  }

  _transform(chunk: Buffer, encoding: string, callback: any) {
        this.push(chunk)
        callback();
  }

  _destroy(err: any, callback: any){
        console.log('MyTransform _destroy');
        this.ondestroy()
        super._destroy(err, callback)
  }

}
const main = async () => {
    const test = async (title: string, pipe: (source: Readable, destination: Writable) => Writable, expectations: boolean[] ) => {
        console.log(`=== test ${title} started`);
        let r_destroy_called = false
        let t_destroy_called = false
        const r = new MyReadStream(() => { r_destroy_called = true})
        const t = new MyTransform(()  => { t_destroy_called = true })
        const s = pipe(r, t)
        s.destroy()

        // logging won't work because the events actions will be run after this method returned!
        // console.log({r_destroy_called, t_destroy_called});
        console.log(`test ${title} done`);
    }

    await test(   "pipe",
            (s, d) => s.pipe(d),
            [false, true]
        )

    await sleep(1000)

    await test(
           "pipeDestroySource",
           pipeDestroySource,
           [true, true]
       )
}

main().catch((e) => {
    console.log("== ERROR uncaught exception  ==");
    console.log(e);
    process.exit(1)
})

What is the feature you are proposing to solve the problem?

stream = createSource().pipe(transformer, { destroySource: true })
stream.destroy()

What alternatives have you considered?

using this code instead:

export const pipeDestroySource = (source:  Readable,  destination: Writable) => {
    // unpipe or close might work
    destination.on('unpipe', (e) => { if (e == source) source.destroy()})
    return source.pipe(destination)
}
@MarcWeber MarcWeber added the feature request Issues that request new features to be added to Node.js. label Sep 19, 2024
@RedYetiDev RedYetiDev added the stream Issues and PRs related to the stream subsystem. label Sep 19, 2024
@RedYetiDev
Copy link
Member

@nodejs/streams


IIUC this is controlled by the spec: https://streams.spec.whatwg.org

@mcollina
Copy link
Member

IIUC this is controlled by the spec: https://streams.spec.whatwg.org

This is not correct. Node.js streams predates that.


We know .pipe() does not track destroys. Unfortunately, all the plumbing in npm depends on this, so we can't easily change it. Therefore, we added pipeline https://nodejs.org/api/stream.html#streampipelinesource-transforms-destination-options.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Issues that request new features to be added to Node.js. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

3 participants