Pipe a single read stream to multiple write streams

2,720 views
Skip to first unread message

Jan Van Ryswyck

unread,
Nov 1, 2013, 7:38:52 PM11/1/13
to nod...@googlegroups.com
Hi all,

I am still playing around with streams V2 and I was wondering whether it is possible to get the following example working somehow:

var stream = require('stream'),
util = require('util');

//
// Reading stuff
//

var ReadingStuff = function() {
this._data = [1, 2, 3, 4, 5];
this._readStreamIndex = 0;

stream.Readable.call(this);
};

util.inherits(ReadingStuff, stream.Readable);

ReadingStuff.prototype._read = function() {
if(this._readStreamIndex === this._data.length) {
this._readStreamIndex = 0;
this.push(null);
return;
}

this.push(this._data[this._readStreamIndex].toString());
this._readStreamIndex += 1;
};


//
// Writing stuff
//

var WritingStuff = function() {
stream.Writable.call(this);

this.on('finish', function() {
console.log('Finished writing stuff!!');
});
};

util.inherits(WritingStuff, stream.Writable);

WritingStuff.prototype._write = function(chunk, encoding, next) {
console.log(chunk.toString(encoding));
next();
};

//
// Application
//

var readingStuff = new ReadingStuff();

var writingStuff = new WritingStuff();
var writingStuff2 = new WritingStuff();

readingStuff.pipe(writingStuff);

process.nextTick(function() {
// Just simulating another event loop
readingStuff.pipe(writingStuff2);
});

I havce a single read stream that I want to pipe to multiple write streams on separate iterations of the event loop (using process.nextTick to simulate that the read stream is used again later on). What I see is that the output is written to the first writable stream (+ the finish event is called). Afterwards on the finish event is executed for the second writable stream. Is it OK in this case to skip push(null) in the read stream? This means that there's no way to know inside the Writable stream that there's no more data to be wriitten.

Is there a way to "reset" a readable stream?

greelgorke

unread,
Nov 4, 2013, 3:32:55 AM11/4/13
to nod...@googlegroups.com
as far as i know, there is no support for multiple parallel pipes in core. also there is no support for reseting a stream, in fact streams are (not only in node) one-shots. anyway there are user land streams, that support multipiping. for reseting the stream you have to buffer all the content, which is not what streams should do. another possible solution might be to create a new readable when a second pipe appeared and pipe the new readable to it.

mscdex

unread,
Nov 4, 2013, 7:32:21 AM11/4/13
to nod...@googlegroups.com
On Monday, November 4, 2013 3:32:55 AM UTC-5, greelgorke wrote:
as far as i know, there is no support for multiple parallel pipes in core. also there is no support for reseting a stream, in fact streams are (not only in node) one-shots. anyway there are user land streams, that support multipiping. for reseting the stream you have to buffer all the content, which is not what streams should do. another possible solution might be to create a new readable when a second pipe appeared and pipe the new readable to it.


Huh? You can definitely pipe a Readable stream to multiple Writable streams at least in node v0.10.x or newer.

Here's where they get added:
https://github.com/joyent/node/blob/v0.10/lib/_stream_readable.js#L453

Here's where they get written to:
https://github.com/joyent/node/blob/v0.10/lib/_stream_readable.js#L589-L594

Jan Van Ryswyck

unread,
Nov 4, 2013, 7:39:21 AM11/4/13
to nod...@googlegroups.com
But this implies that I pipe things together in the same iteration of the event loop? This doesn't seem to enable to possible to somehow re-read a readable stream in later iterations. (like "resetting" a readable stream).

Floby

unread,
Nov 4, 2013, 9:07:44 AM11/4/13
to nod...@googlegroups.com
is that what you are looking for ?

Jan Van Ryswyck

unread,
Nov 4, 2013, 9:12:11 AM11/4/13
to nod...@googlegroups.com
Yes, that might be it! I'll give that a try.
Many thanks!

mscdex

unread,
Nov 4, 2013, 9:33:37 AM11/4/13
to nod...@googlegroups.com
On Monday, November 4, 2013 9:07:44 AM UTC-5, Floby wrote:
is that what you are looking for ?

According to that module's readme, it's only useful for node v0.8.

greelgorke

unread,
Nov 5, 2013, 3:09:16 AM11/5/13
to nod...@googlegroups.com


Am Montag, 4. November 2013 13:32:21 UTC+1 schrieb mscdex:
On Monday, November 4, 2013 3:32:55 AM UTC-5, greelgorke wrote:
as far as i know, there is no support for multiple parallel pipes in core. also there is no support for reseting a stream, in fact streams are (not only in node) one-shots. anyway there are user land streams, that support multipiping. for reseting the stream you have to buffer all the content, which is not what streams should do. another possible solution might be to create a new readable when a second pipe appeared and pipe the new readable to it.


Huh? You can definitely pipe a Readable stream to multiple Writable streams at least in node v0.10.x or newer.
 
yes, sure, since pipe does nothing special but to wire up the events. but there is no guarantee that all streams will get the same data, if piping is not happend in the same execution block. i guess you can pause the readable first.
Reply all
Reply to author
Forward
0 new messages