How to signal to a readable stream to stop reading (and close)?

8,740 views
Skip to first unread message

Michael Hart

unread,
Jun 28, 2013, 3:01:46 AM6/28/13
to nod...@googlegroups.com
Hi all,

I'm struggling with a fairly basic problem to do with limiting (ie, truncating) streams and signalling to the source readable to stop reading and close.

Here's a simple example that hopefully illustrates what I mean:

fs.createReadStream('myHugeFile')
  .pipe(someTransform)
  .pipe(someTruncatingStream)
  .pipe(process.stdout)

Where someTruncatingStream wants to only take the first n bytes of the transformed stream, and then signal "hey, I'm done (ie, no need to keep reading the massive file)".

Is there a way to do this from someTruncatingStream without it having direct access to the source read stream? ie, some way to signal up the pipe to stop reading and close everything down (ie, not just pause)?

In my case it's actually not a file stream that I want to do this for - I just figured that was the easiest illustration. Mine is an object stream I want to create in front of a DB which is being queried a page at a time - I want to expose a stream for this that can continue paging behind the scenes and spitting out data, but that knows to stop paging when the consumers have consumed all the data they want. ie:

db.createReadStream({some: query}).pipe(someOtherFilter).pipe(stringify).pipe(take100).pipe(process.stdout)

I could use another paradigm for this, a lazy collection for example - but it seems to me that streams should support this and are more of a lingua franca to expose from a module API (and indeed can be transformed into lazy collections themselves, a la lazy.js/node-lazy)

Cheers,

Michael

Michael Hart

unread,
Jul 1, 2013, 5:54:05 AM7/1/13
to nod...@googlegroups.com
As an update to this (I realise nodeconf has been on... pity I missed it), I've tried a few things and I still can't figure out the idiomatic way to deal with this situation. Here's an example:

var stream = require('stream'),
    fs = require('fs')

var readStream = fs.createReadStream(__filename, {highWaterMark: 5}),
    limitStream = new stream.Transform(),
    limit = 5

// Just an example of a write stream that only wants a certain amount of data
limitStream._transform = function(chunk, encoding, cb) {
  if (--limit >= 0)
    return cb(null, chunk + '\n')

  // All we want is 5 chunks, then we want to close
  this.end()
  // Wait a while and check if the file stream has closed
  setTimeout(checkFd, 1000)
  cb()
}

// fd is set to null when the file is closed
function checkFd() {
  console.log(readStream.fd != null ? 'Bugger. File still open!' : 'All good')
}

readStream.on('unpipe', function() { console.log('unpipe emitted from readStream') })
readStream.on('end', function() { console.log('end emitted from readStream') })
readStream.on('close', function() { console.log('close emitted from readStream') })

limitStream.on('unpipe', function() { console.log('unpipe emitted from limitStream') })
limitStream.on('end', function() { console.log('end emitted from limitStream') })
limitStream.on('close', function() { console.log('close emitted from limitStream') })

readStream.pipe(limitStream).pipe(process.stdout)

When run (in node v0.10.12), this results in:

$ node test.js
var s
tream
 = re
quire
('str
unpipe emitted from limitStream
end emitted from limitStream
Bugger. File still open!

And the process exits. Although if the timeout was longer (for example), then the file would have stayed open the whole time with the stream continuing to read even though it's going nowhere (imagine it was /dev/random or similar).

So the file descriptor is left open, even though the pipe is closed - and no events seem to be signalled on readStream at all, only on the writable limitStream.

Now in my case I'm actually developing the read stream, I'm not using a file stream - so I certainly have more control than this example - but I still have no idea what the read stream needs to look for or listen to to know to stop reading from the source (ie, to know that all destinations have ended).

There are a few possibilities:

  1. Check the _readableState property when _read is called - if pipes is null (or pipesCount is 0) then don't continue to fetch data. Ugly, and using hidden variables.
  2. Override the unpipe() function and call close() if no pipes left - again, still need some way to know how many pipes are left - could keep our own count?
  3. Check to see if there are any listeners left for 'readable' (and/or 'data'?) - if none, then close(). Not sure where to do this. During _read()? Or override unpipe()?
  4. Override the pipe() function and add listeners to each destination to see when they end - and hope that they do the same to all streams they pipe to.
Is there a more idiomatic way to do achieve this? Is one of these options the "nicest"?

Or are readable streams really just supposed to keep reading and reading, never closing (until they eventually reach the end of their source, assuming there is an end), regardless of whether they've been piped to writable streams that subsequently finish?

This would be a pity as it would impede the ability to compose streams together in a `dbReadStream.pipe(take(20))` kind of fashion.

M

Isaac Schlueter

unread,
Jul 3, 2013, 1:13:08 AM7/3/13
to nodejs
Here's one way to do it: https://github.com/isaacs/truncating-stream

npm install truncating-stream

var Trunc = require('truncating-stream')
var t = new Trunc({ limit: 100 })

Now no matter how much you t.write() only 100 bytes will come out.

Of course, the thing you're piping your reader INTO won't be able to
close the reader. That'd be way too intimate. It's up to you to
decide when to do that.

But, you can listen for the finish of the truncating stream and
forcibly destroy the reader if you want to.

reader.pipe(t)
t.on('finish', function() {
reader.destroy()
})
> --
> --
> Job Board: http://jobs.nodejs.org/
> Posting guidelines:
> https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
> You received this message because you are subscribed to the Google
> Groups "nodejs" group.
> To post to this group, send email to nod...@googlegroups.com
> To unsubscribe from this group, send email to
> nodejs+un...@googlegroups.com
> For more options, visit this group at
> http://groups.google.com/group/nodejs?hl=en?hl=en
>
> ---
> You received this message because you are subscribed to the Google Groups
> "nodejs" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to nodejs+un...@googlegroups.com.
> For more options, visit https://groups.google.com/groups/opt_out.
>
>

Michael Hart

unread,
Jul 3, 2013, 2:04:24 AM7/3/13
to nod...@googlegroups.com, i...@izs.me
Ah, so I might not have been clear enough with the way I phrased it - I didn't actually want to know how to write the *truncating* stream (although thank you!) - I wanted to know how to write the *read* stream.

If I'm writing a module that exposes a read stream that is reading from a database - and that is piped into a truncating stream of some kind - how can my readable stream know to stop reading from the database? I don't want it to keep reading the entire database (and I might not know this at the time I created the read stream).

Imagine a grep'ing transform of some sort - that will preclude you from knowing how much you want to read up front - and then imagine you only want the first 5 matches.

So:

myReadStream.pipe(grep('hello')).pipe(take(5)).pipe(process.stdout)

If myReadStream is coming from a huge file, or from paging calls to a DB (in my case), then it will just continue to read and read and read...

Unless in my app code I specifically listen for the `take` stream to end and then call `close` on myReadStream, or something like that. But that's a lot of work for the module consumer to write. It stops the pipe().pipe().pipe() construct from being so useful.

So is there some way *for the read stream itself* to know if it can stop reading? Or at least some idiomatic way to achieve it (keeping track of how many pipes have been attached/unattached, etc)

Michael Hart

unread,
Jul 3, 2013, 2:35:12 AM7/3/13
to nod...@googlegroups.com, i...@izs.me
In a convo with @isaacs on Twitter, the example of EPIPE in Unix pipes was brought up - if I understand correctly, this is exactly the mechanism I'm looking for in node.js streams.

So when you do `cat myfile | grep 'hello' | head -5` - EPIPE will be thrown if there are more than 5 matches and `cat` will shut the file descriptor and close down gracefully (correct me if I'm wrong here - I'm not 100% sure on this).

So `cat` is the readable stream in our case - and it needs to know when to stop reading and close gracefully - whether it be as a result of a downstream write error, or whatever.

If `myReadableStream` is the equivalent of `cat`, and the truncating `take` stream is the equivalent of `head` (ignore process.stdout here as it'll probably confuse things given it *can* actually throw EPIPE) - then what can `myReadableStream` listen for to know to close gracefully? (keep in mind that `myReadableStream` won't have a direct reference to `take`, just as `cat` doesn't to `head` either)

Does that make things a bit clearer?

Gil Pedersen

unread,
Jul 3, 2013, 4:38:47 AM7/3/13
to nod...@googlegroups.com
The difference, in this case, between stream pipes and unix pipes, is that unix pipes are fixed once they are set up. Stream pipes are dynamic and when all streams are unpiped, it simply means that there are currently no receivers to pipe data to, thus "pausing" the stream. It is quite possible to later attach another pipe, or consume it using the normal API.

What you want would require the readable to detect that no one is listening, assume no one will use it the future, and close the resource. This would probably make a nice base feature for Readable, perhaps exposing it behind an autoClose option?

Isaac Schlueter

unread,
Jul 3, 2013, 11:24:10 AM7/3/13
to nodejs
This is a terrible idea for a default behavior. It will generally
result in lost data. However, an easy readable class for your closing
use case might look like this:


util.inherits(AutoClose, stream.Readable);

function AutoClose(options) {
stream.Readable.call(this, options);

var pipes = 0;

this.on('pipe', function(dest) {
pipes++;
});

this.on('unpipe', function(dest) {
pipes--;
if (pipes === 0)
this.destroy();
});
}

// don't implement _read. This is still an abstract class
// extend it in actual implementations

Michael Hart

unread,
Jul 3, 2013, 10:29:27 PM7/3/13
to nod...@googlegroups.com, i...@izs.me
Yep, cool, so that's essentially #2 in my suggestions in my second message. I'll run with that.

Indeed - I was never suggesting the default behaviour should be to destroy - I was merely looking for a mechanism to be notified.

So the only thing I'll need to do is to ensure that every transform in the pipe chain also exhibits this behaviour otherwise the pipes will stay open, just waiting in case someone wants to attach to them later on...

Alexey Petrushin

unread,
Aug 30, 2013, 3:25:26 PM8/30/13
to nod...@googlegroups.com
Spend a lot of time trying to solve similar issue (terminate nod needed anymore reading stream in case of app error), seems like there's no `destroy` method in Stream's Documentation, would be nice to have it there.
Or is it deprecated?
Reply all
Reply to author
Forward
Message has been deleted
0 new messages