pipeline<A, B>(source: A, destination: B, callback?: PipelineCallback<B>): B extends NodeJS.WritableStream ? B : NodeJS.WritableStream
A module method to pipe between streams and generators forwarding errors and
properly cleaning up and provide a callback when the pipeline is complete.
The pipeline API provides a promise version, which can also
receive an options argument as the last parameter with asignalAbortSignal property. When the signal is aborted,destroy will be called on the underlying pipeline, with
anAbortError.
asyncfunctionrun() { awaitpipeline( fs.createReadStream('lowercase.txt'), asyncfunction* (source, { signal }) { source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. forawait (constchunkofsource) { yieldawaitprocessChunk(chunk, { signal }); } }, fs.createWriteStream('uppercase.txt') ); console.log('Pipeline succeeded.'); }
run().catch(console.error);
Remember to handle the signal argument passed into the async generator.
Especially in the case where the async generator is the source for the
pipeline (i.e. first argument) or the pipeline will never complete.
asyncfunctionrun() { awaitpipeline( asyncfunction* ({ signal }) { awaitsomeLongRunningfn({ signal }); yield'asd'; }, fs.createWriteStream('uppercase.txt') ); console.log('Pipeline succeeded.'); }
run().catch(console.error);
stream.pipeline() will call stream.destroy(err) on all streams except:
Readable streams which have emitted 'end' or 'close'.
Writable streams which have emitted 'finish' or 'close'.
stream.pipeline() leaves dangling event listeners on the streams
after the callback has been invoked. In the case of reuse of streams after
failure, this can cause event listener leaks and swallowed errors. If the last
stream is readable, dangling event listeners will be removed so that the last
stream can be consumed later.
stream.pipeline() closes all the streams when an error is raised.
The IncomingRequest usage with pipeline could lead to an unexpected behavior
once it would destroy the socket without sending the expected response.
See the example below:
constserver = http.createServer((req, res) => { constfileStream = fs.createReadStream('./fileNotExist.txt'); pipeline(fileStream, res, (err) => { if (err) { console.log(err); // No such file // this message can't be sent once `pipeline` already destroyed the socket returnres.end('error!!!'); } }); });
A module method to pipe between streams and generators forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.
The
pipeline
API provides a promise version, which can also receive an options argument as the last parameter with asignal
AbortSignal
property. When the signal is aborted,destroy
will be called on the underlying pipeline, with anAbortError
.To use an
AbortSignal
, pass it inside an options object, as the last argument:The
pipeline
API also supports async generators:Remember to handle the
signal
argument passed into the async generator. Especially in the case where the async generator is the source for the pipeline (i.e. first argument) or the pipeline will never complete.stream.pipeline()
will callstream.destroy(err)
on all streams except:Readable
streams which have emitted'end'
or'close'
.Writable
streams which have emitted'finish'
or'close'
.stream.pipeline()
leaves dangling event listeners on the streams after thecallback
has been invoked. In the case of reuse of streams after failure, this can cause event listener leaks and swallowed errors. If the last stream is readable, dangling event listeners will be removed so that the last stream can be consumed later.stream.pipeline()
closes all the streams when an error is raised. TheIncomingRequest
usage withpipeline
could lead to an unexpected behavior once it would destroy the socket without sending the expected response. See the example below:Since
v10.0.0