How to aggregate streams? (multiplexer/demultiplexer)

1,448 views
Skip to first unread message

lbdremy

unread,
Sep 3, 2012, 5:44:24 AM9/3/12
to nod...@googlegroups.com
Hello,

I'm wondering, does someone know a module that is able to aggregate multiple streams into 1. 
For example you give it 3 readable streams and you get 1 stream emitting all `data ` events from these 3 streams and emitting the `end` event when the 3 streams are ended.
Same thing for writable streams, where when you write, you actually write to 3 streams.
And a combinaison of both for writable/readable streams.

Thanks,

Oleg Efimov (Sannis)

unread,
Sep 3, 2012, 6:23:11 AM9/3/12
to nod...@googlegroups.com
Aggregate is available in https://github.com/Obvious/pipette.

понедельник, 3 сентября 2012 г., 13:44:24 UTC+4 пользователь lbdremy написал:

Jake Verbaten

unread,
Sep 3, 2012, 1:19:18 PM9/3/12
to nod...@googlegroups.com
https://github.com/dominictarr/mux-demux

MuxDemux is fantastic.

However what your asking for is not a multiplexer.

3 readable streams into one stream:

  var newStream = through()
  stream1.pipe(newStream)
  stream2.pipe(newStream)
  stream3.pipe(newStream)

write to one into 3

  var writeStream = through()
  writeStream.pipe(stream1)
  writeStream.pipe(stream2)
  writeStream.pipe(stream3)


--
Job Board: http://jobs.nodejs.org/
Posting guidelines: https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to nod...@googlegroups.com
To unsubscribe from this group, send email to
nodejs+un...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en

lbdremy

unread,
Sep 9, 2012, 11:36:53 AM9/9/12
to nod...@googlegroups.com
Thanks for your answers :),

The aggregate module does the job of aggretate 3 readables stream into 1 with the Cat "class", thanks.

The code that you propose Raynos doesn't work for this case, because when the first readable stream will emit "end", the pipe method will call the end method of the writable stream.
Which means the writable stream is not more writable since "this.writable" is set to false when the end method is called. So next time another of the readable streams will emit "data", two things can happen, the pipe method call the write method of the writable stream in this case the writable stream shoud/will throw an error since the end method has been called before and "this.writable" is false or the data emitted will be ignored because the writable stream has its "this.writable" set to false. Am I right or I miss something important in the code?

For the second piece of code, you are completely right, I haven't think about it but it seems there is no need to do some sort of control on the different events since the trouble happens when data are emitted when the method end has already been called, in this case it can't happen. But maybe it would be nice to have a stream who know exactly how many streams will be piped and so return false when write is called until all streams are piped or cache the data emitted until all streams are piped.

Interesting, why this could not be considered as demultiplexer and multiplexer?
Tell me more, thank you.

Dominic Tarr

unread,
Sep 12, 2012, 5:35:02 PM9/12/12
to nod...@googlegroups.com
in electronics, multiplexer allows multiple channels to communicate
over a single channel.

http://en.wikipedia.org/wiki/Multiplexer

"An electronic multiplexer makes it possible for several signals to
share one device or resource, for example one A/D converter or one
communication line, instead of having one device per input signal.

On the other hand, a demultiplexer (or demux) is a device taking a
single input signal and selecting one of many data-output-lines, which
is connected to the single input. A multiplexer is often used with a
complementary demultiplexer on the receiving end"

You just want to merge the streams. I have some code here
https://github.com/dominictarr/event-stream/blob/master/index.js#L25-57

although it seems to have escaped the readme. oops. also, the comment is wrong.
it doesn't concat.

use this if you want to concatenate:
https://npmjs.org/package/kat

Tim Dickinson

unread,
Sep 15, 2012, 9:46:00 PM9/15/12
to nod...@googlegroups.com
LOL I should have searched the group. I asked this question like two days latter.
Reply all
Reply to author
Forward
0 new messages