How do I make a stream that can _write() multiple messages at the same time?

51 views
Skip to first unread message

Joseph Gentle

unread,
Feb 24, 2014, 5:38:22 AM2/24/14
to nod...@googlegroups.com
I'm (finally!) making node-browserchannel extend streams, but I can't figure out how to make streams do what I want. (browserchannel is a socketio-like library if you haven't seen it - https://github.com/josephg/node-browserchannel ).

I'm using a stream in objectMode (because its sending chunked messages). I want to be able to have many messages in-flight at a time (I can send many messages in reply to a long polling HTTP request from the client). According to the spec for streams, My problem is that _write won't let me process multiple messages at a time - according to the spec, I should only call the _write() callback when the message has been 'flushed', and _write doesn't get called again until a process.nextTick() after I call the callback.

- If I'm honest and call the callback only after the http response has been fired, I can only send one message per http response (_write doesn't get called again until the next event loop fires). Having only one message in-flight at a time is a _huge_ performance regression.
- If I lie and call the callback immediately, the stream behaves badly. For example, if you call end() with a message on the stream, the stream emits 'finish' long before the messages have actually been delivered to the client. You also can't syncronously send a bunch of response messages in response to a single request message - unless I do a setTimeout(..., 10) or something awful I have no way to bunch the responses into a single HTTP response.

Is there a good answer here? I can't think of a decent way to obey the streams protocol while still being performant. I could make my own duck typed implementation of the Duplex streams API, though I'll probably make mistakes and I won't have pipe() support. ... Are there any other good options?

-J


greelgorke

unread,
Feb 24, 2014, 8:14:58 AM2/24/14
to nod...@googlegroups.com
may be examples of your code would help

Fedor Indutny

unread,
Feb 24, 2014, 8:37:50 AM2/24/14
to nod...@googlegroups.com
Joseph,

I think you could try calling `writecb` immediately up until your
internal queue will fill up, and then starts calling it as soon as you
can do next write.
> --
> --
> Job Board: http://jobs.nodejs.org/
> Posting guidelines:
> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
> You received this message because you are subscribed to the Google
> Groups "nodejs" group.
> To post to this group, send email to nod...@googlegroups.com
> To unsubscribe from this group, send email to
> nodejs+un...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/nodejs?hl=en?hl=en
>
> ---
> You received this message because you are subscribed to the Google Groups
> "nodejs" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to nodejs+un...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.

Joseph Gentle

unread,
Feb 24, 2014, 1:33:22 PM2/24/14
to nod...@googlegroups.com
If I do that, calling code like this:

session.end('a', function() {
process.exit();
});

... will result in 'a' never being sent.

Here's an example of the two code styles, if it helps:
https://gist.github.com/josephg/9194035

Does anyone know how primus deals with this?
> You received this message because you are subscribed to a topic in the Google Groups "nodejs" group.
> To unsubscribe from this topic, visit https://groups.google.com/d/topic/nodejs/l0Lb6XXPhOc/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to nodejs+un...@googlegroups.com.

Arnout Kazemier

unread,
Feb 24, 2014, 4:40:17 PM2/24/14
to nod...@googlegroups.com
Primus is still using the Streams 1 interface.

Fedor Indutny

unread,
Feb 24, 2014, 5:13:17 PM2/24/14
to nod...@googlegroups.com
Joseph,

Take a look at following snippet:
https://gist.github.com/indutny/e3fcb35fc25399bb7f7b

Joseph Gentle

unread,
Feb 24, 2014, 5:48:41 PM2/24/14
to nodejs
Thanks guys. I emailed Dominic Tarr and he recommends against using
node streams v2 for this stuff as well.

Fedor - thats cute, but say if I call:

s.on('finish', function() { process.exit(); });
s.end();

... My process will exit while data is still in the buffer. The
annoying this is that I can provide that guarantee right now from my
code - I get message acknowledgements, so I can tell you exactly when
all messages have been received by the client. But I can't control
when finish is emitted using streams 2.0 - it gets sent automatically
after the _write() callback is called on the last message. If the
stream 'writes' to an internal memory buffer, those guarantees are all
lost.

Also imagine this code:

middleware = function(req, res) {
// Process messages in the request, then send the responses in the response
s.push(req.body.message);

// When there's data available, send it to the client
s.on('avail', function() {
process.nextTick(function() {
res.end(s.buffer);
});
});
}

then:

s.on('data', function(msg) {
s.write('one');
s.write('two');
});

^--- the request will only be sent 'one' but not 'two' because of how
_write works.
> You received this message because you are subscribed to a topic in the Google Groups "nodejs" group.
> To unsubscribe from this topic, visit https://groups.google.com/d/topic/nodejs/l0Lb6XXPhOc/unsubscribe.
> To unsubscribe from this group and all its topics, send an email to nodejs+un...@googlegroups.com.

Joseph Gentle

unread,
Feb 24, 2014, 5:51:58 PM2/24/14
to nodejs
The fascinating thing about all of this is that node's Readable
streams API is perfect for what I want. If I could consume a readable
stream when people send data to a client, (and the user calls .push()
to write) the semantics match perfectly. But I think implementing a
duplex stream using two readable streams isn't any better than
inventing my own streams API to expose.

... Which is what I'm doing now and it works fine, and what I'll
probably continue doing.

-J
Reply all
Reply to author
Forward
0 new messages