How to write stream chunk by chunk with callbacks

1,357 views
Skip to first unread message

Alexey Petrushin

unread,
Oct 13, 2012, 5:17:13 AM10/13/12
to nod...@googlegroups.com
I don't quite understand how steam pause/resume works, or more exactly - how to use it in simple manner. It's necessary to use it in situations when the read stream produces data faster than the write stream can consume. 

I need to write custom stream implementation and writing it with proper handling of `pause/resume` functionality seems not a very easy task.

Plain callbacks seems simpler to me, can streams be somehow wrapped into a code like that ( code with highlighting https://gist.github.com/3883920 ) ?

    var copy = function(inputStream, outputStream, callback){
      var copyNextChunk = function(){
        inputStream.read(fuction(err, chunk){    
          if(err) return callback(err)
          // When chunk == null there's no data, copying is finished.
          if(!chunk) return callback()
          outputStream.write(chunk, function(err){
            // Callback called only when chunk of data 
            // delivered to the recipient and
            // we can send another one.
            if(err) return callback(err)
            copyNextChunk()
          })  
        })
      }
    }

Bruno Jouhier

unread,
Oct 13, 2012, 1:23:00 PM10/13/12
to nod...@googlegroups.com
I wrote a post about plain callback APIs for streams: http://bjouhier.wordpress.com/2012/07/04/node-js-stream-api-events-or-callbacks/

Mikeal Rogers

unread,
Oct 13, 2012, 1:31:09 PM10/13/12
to nod...@googlegroups.com
all of these are wrong.

inputStream.pipe(outputStream)
outputStream.on('close', callback)

--
Job Board: http://jobs.nodejs.org/
Posting guidelines: https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to nod...@googlegroups.com
To unsubscribe from this group, send email to
nodejs+un...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en

Bruno Jouhier

unread,
Oct 13, 2012, 2:04:25 PM10/13/12
to nod...@googlegroups.com
What's wrong?

You'll find links to gists at the end of my post. The code works!
And Alexey's pumping function is equivalent to the pumping loop I gave in my post.

Mikeal Rogers

unread,
Oct 13, 2012, 4:33:00 PM10/13/12
to nod...@googlegroups.com
i'm not engaging with your strawman Bruno.

i showed how we *actually* move data in node. this is not a debate, that's how it works. if anyone wants to use node, or write a module that has a stream that moves data, that's how they do it.

this was a question, not an open invitation for bikeshedding. please let the list answer questions.

Isaac Schlueter

unread,
Oct 13, 2012, 6:19:07 PM10/13/12
to nod...@googlegroups.com
This is a good time to mention streams2, I think.

Caveat: There've been some discussions about this API. If you missed
those discussions, I'm sorry, but we're not going to have them again.
The base implementation is finished, and the integration with
node-core is close to being released. This is a "telling" message,
not an "asking" message :)

In 0.10, the Readable Stream interface is getting a massive overhaul.
Other streams (Duplexes, Writable streams, etc) are also getting
revamped considerably, so that they use the same base classes and
provide much more consistent events.

All the changes are being done such that the previous API continues to
work. However, if you attach a 'data' event handler, or call pause()
or resume(), then the stream switches into "old-mode", and will behave
like old streams. (However, it'll behave appropriately, with pause()
buffering like you almost always want it to, and so on.)

There is no pause/resume. There is no 'data' event. There is a read()
method, and a 'readable' event. You call read() until it returns
null, and then wait for a readable event to tell you it's time to
read() more. This is very symmetric to calling write() until it
returns false, and then waiting for the 'drain' event to tell you to
write() more.

In order to control how much you read, you can call read(n) with a
number of bytes (or characters if you've done a setEncoding() call in
the past) to return. If that many bytes aren't available, then it'll
return null, and emit 'readable' when they are.

So, how do you tell the underlying system to pause? Simple. Just
don't call read(). If you're not consuming the bytes, then the buffer
will fill up to a certain level, and stop pulling bytes in from the
underlying systems, and TCP will do its job, or it'll stop reading the
file, or whatever it is that this stream of data refers to. It's a
pull-style stream, so it doesn't spray chunks at you.

Every readable and writable class that uses the base classes will
automatically have high and low water mark control over their
buffering, and be completely consistent in how they emit events. If
you want to extend the classes, you can simply implement the
asynchronous MyStream.prototype._read(n, callback) method, or the
asynchronous MyStream.prototype._write(chunk, callback) method.
(There are also base classes for Duplex, which does both, and for
Transform, which turns the input into the output via a
_transform(chunk, outputFunction, callback) method.)


So, regarding the OP here:

1. Your code is wrong. Mikeal's input.pipe(output) is the way to do
it. If you want to listen for an event when everything is done, then
it's tricky in node 0.8 and before. In 0.10 and later, you'd do
`output.on('finish', callback)`, and all Writable objects will emit a
'finish' event when you've called end() and all the write buffer is
cleared. If you really wanted to not use pipe, here's how you'd do it
with new streams:

function flow(input, output, callback) {
input.on('end', output.end.bind(output))
output.on('finish', callback);
f()
function f() {
var chunk
while (null !== (chunk = input.read()))
if (false === output.write(chunk))
break;
input.once('readable', f)
}
}

With old streams, it's much trickier and harder to get right. Look at
`util.pump` or Stream.prototype.pipe.

There's also error handling and a few interesting edge cases, and the
backwards compatibility requirement makes it a bit trickier.
Basically, just use the input.pipe(output) method, and relax :)

2. If you decide to use some other streaming data abstraction, then
that's fine, but you are off the reservation as far as Node.js is
concerned. If you ask node users for help, don't be surprised if they
say "Just use streams and pipe()" and have no idea what your code is
doing. I actually think wheels *should* be reinvented from time to
time, but you should probably not reinvent the wheel until you've at
least tried the one everyone else is using, so you can make an
informed decision about it.

Alexey Petrushin

unread,
Oct 13, 2012, 8:08:54 PM10/13/12
to nod...@googlegroups.com, i...@izs.me
Thanks for help, especially You Isaac for such a detailed answer.

As far as I understand it's possible to wrap existing evented stream API into callback interface (with in-memory data buffers to handle mismatch between explicit/implicit control flow).
But probably it won't worth it, it will be more easy to just use it as it's supposed to be used (with pipes) and wait untill those changes in 0.10.
The new API seems to be very similar to what I asked for.

P.S. 

As for the question and why do I need it - I'm working on application that uses custom streams and though that maybe I can cheat and simplify my work a little by not implementing complex evented interface :).

I once used such abstraction for working with streams in ruby:

    to.write do |writer|
      from.read{|buff| writer.write buff}
    end

Files are open and closed properly, buffer also have some default size, so the code is very simple to use (more details http://alexeypetrushin.github.com/vfs ).
Basically by implementing just those two methods You get ability to stream from any stream into any stream (fs, s3, sftp, ...).

I tried to do something similar with asynchronous streams.

Mark Hahn

unread,
Oct 13, 2012, 8:16:44 PM10/13/12
to nod...@googlegroups.com
There is no 'data' event.  There is a read() method, and a 'readable' event.  You call read() until it returns null, and then wait for a readable event to tell you it's time to read() more.

So, if we want to pump it at max rate we would run a tight loop to read and write in the beginning and then on every readable event?   It seems like more work and a lot messier compared to the old data event scheme.


--

Nathan Rajlich

unread,
Oct 13, 2012, 8:19:57 PM10/13/12
to nod...@googlegroups.com
Mark, to pump at max rate you'd use .pipe().

Mark Hahn

unread,
Oct 13, 2012, 8:25:50 PM10/13/12
to nod...@googlegroups.com
But pipe only works if the writes are to another stream.  If they are to a db driver or something without pipe support then I have to do my own reads.  Or am I missing something here?

Isaac Schlueter

unread,
Oct 13, 2012, 8:31:51 PM10/13/12
to nod...@googlegroups.com
Mark,

Well... yes. If you want to siphon out the data as fast as possible,
and it's not going to a writable stream interface of some sort, then
you have to read() in a tight loop on every readable event. That's
actually not much different than the 'data' event scheme.

Note that if you attach a 'data' event handler, then it'll do this for
you. The backwards-compatible API is exactly the one you're used to.
The major difference is that, in 0.10, if you're using 'data' events,
then pause and resume actually work in a non-surprising way (ie, you
won't get 'data' events happening while it's in a paused state), and
all streams in core will have the same set of events and methods
(instead of each of them implementing 90% of the API in subtly
different ways).

Mark Hahn

unread,
Oct 13, 2012, 8:52:39 PM10/13/12
to nod...@googlegroups.com
So using it in the backwards-compatible way doesn't cause any performance loss?  If so I can choose which to use in every situation.

Isaac Schlueter

unread,
Oct 13, 2012, 10:24:03 PM10/13/12
to nod...@googlegroups.com
Mark,

The overall performance impact hasn't been fully established yet. I
suspect that it'll be slight, but it's unclear whether it'll be an
improvement (owing most likely to greater hidden class optimization
from having all streams share more of the same code), or a regression
(owing to the fact that there's just more stuff being done).

The syscall/IO footprint is identical, though. Past experience in
this area has shown that the total time spent in JS is usually a
pretty small part of the overall latency unless the code is very hot
or doing something very stupid. We'll see soon.

Dominic Tarr

unread,
Oct 14, 2012, 6:58:32 PM10/14/12
to nod...@googlegroups.com
mark, just implement a stream shaped wrapper for your db client thing, (and make sure you publish it to npm!)

Jake Verbaten

unread,
Oct 15, 2012, 1:26:47 AM10/15/12
to nod...@googlegroups.com
https://github.com/Raynos/for-each-stream

```
forEach(stream, function (chunk) {
    /* insert chunk into database or wait for them all */
})
```

https://github.com/Raynos/write-stream#example-array

```
stream
    .pipe(toArray(function (chunks) {
        /* manipulate chunks then do database thing */
    })
```

Both of those functions use pipe internally.

Alexey Petrushin

unread,
Oct 15, 2012, 1:46:45 AM10/15/12
to nod...@googlegroups.com
Not sure about this code:

    forEach(stream, function (chunk) {
        /* insert chunk into database or wait for them all */
    })
    
Classical `each` concept doesn't works in async world, this loop for example won't 
wait when operation of inserting chunk into database will be successfully (or not) finished.

It will consume memory if for example stream is big and fast and database it written to slow.

it should be something like:

    each(stream, function (chunk, next) {
        /* insert chunk into database or wait for them all */
        /* and call next() or next(err) when You finish and want next chunk */
    })
    
Same with toArray - it will load all data into memory.

Jake Verbaten

unread,
Oct 15, 2012, 1:52:56 AM10/15/12
to nod...@googlegroups.com
Classical `each` concept doesn't works in async world, this loop for example won't 
wait when operation of inserting chunk into database will be successfully (or not) finished.

It will consume memory if for example stream is big and fast and database it written to slow.

If you want to apply backpressure simply return false from the forEach iterator function.

If you want to catch errors simply `this.emit("error")` as `this` in the callback is the writable stream.

forEach is the same as `stream.pipe(WriteStream(iterator))`
 
Same with toArray - it will load all data into memory.

When you call toArray you want to load the entire thing into memory. That's a choice.

khs4473

unread,
Oct 16, 2012, 1:47:12 PM10/16/12
to nod...@googlegroups.com

i'm not engaging with your strawman Bruno.

i showed how we *actually* move data in node. this is not a debate, that's how it works. if anyone wants to use node, or write a module that has a stream that moves data, that's how they do it.

Wow - that's a really scary response.  Bruno's callback approach to streaming (and unix's, BTW) actually provide a much more solid foundation than your over-engineered streams.  Boom!  ; P

Kevin

Jeff Barczewski

unread,
Oct 17, 2012, 12:09:45 PM10/17/12
to nod...@googlegroups.com, i...@izs.me
@isaacs I assume that your https://github.com/isaacs/readable-stream will be the module to watch as this evolves.

Will this module be the way to use new interface with older versions of node? 

Thanks!

Jeff

Marco Rogers

unread,
Oct 18, 2012, 12:25:14 AM10/18/12
to nod...@googlegroups.com, i...@izs.me
My understanding is that the readable-stream module is a good way to try out these experimental api changes. It's a reference implementation so to speak. But the plan is to integrate this new api into core and support it fully. And it will be fully backwards compatible with the old api. So eventually this module will be obsoleted for the newest version of node. But yes you could continue to use it on older versions.

:Marco

Isaac Schlueter

unread,
Oct 18, 2012, 7:34:07 AM10/18/12
to Marco Rogers, nod...@googlegroups.com
> On Wednesday, October 17, 2012 9:09:46 AM UTC-7, Jeff Barczewski wrote:
>>
>> @isaacs I assume that your https://github.com/isaacs/readable-stream will
>> be the module to watch as this evolves.
>>

Marco is correct. Also:

You can also watch the progress on the "streams2" branch in git.

>> Will this module be the way to use new interface with older versions of
>> node?

Yes, the idea would be that you can use the readable-stream module if
you want to use the new interface with older versions of node.

There's also a readable.wrap(oldStyleStream) method to use a
'data'-event style stream as the data source for a read()-style
stream.
Reply all
Reply to author
Forward
0 new messages