lost in streams v0.11

135 views
Skip to first unread message

manimal45

unread,
Aug 4, 2014, 8:46:42 AM8/4/14
to nod...@googlegroups.com
Hi, I'm unable to understand how exactly readablestream 'data' event works (I'm using node v0.11.13).

I have this simple example :

var util = require('util'),
    stream
= require("stream");

// A simple readable stream

var R = function(){
    stream.Readable.call(this,{objectMode : true});
    this.index = 1;
}
// R inherits from Readable
util.inherits(R, stream.Readable);

// @resume
// On Resume, starts a timer pushing {index : #} every second

// Fire readable after 1.5 second
// Clear interval after 5 seconds
// So basically, a consumer should receive about 5 messages
R.prototype.resume= function(){
    console.log("R.resume");
    var self= this;
    this.timer = setInterval(this.pusher.bind(this), 1000);
    setTimeout(this.emit.bind(this, "readable"),1500);
    setTimeout(this.emit.bind(this, null), 5000);
    setTimeout(function(){clearInterval(self.timer)}, 5000);
}


// The function invoked by resume to push messages
R.prototype.pusher = function(){
    console.log("R.push");
    var msg = {index : this.index++};
       this.push(msg);
};

R.prototype._read= function(){
    var self= this;
    console.log("R._read");
}

R.prototype.pause= function(){
    console.log("R.pause");
    clearInterval(this.timer);
}
if (!module.parent){
    var r = new R();
   
// This is never called ?!
    r.on("data", function(data){
        console.log("received data", data);
    });

 
// This works but good luck to manage edge cases !
    //r.on('readable', function() {
    //    var chunk;
    //    while (null !== (msg = r.read())) {
    //        console.log('got %d bytes of data', msg);
    //    }
    //});
}


The 'data' listener never gets called.
The while loop works after 'readable' event gets fired but does not look like what doc says.
 
Any help very welcome !







greelgorke

unread,
Aug 5, 2014, 4:04:51 AM8/5/14
to nod...@googlegroups.com
You have overridden the resume method. streams in 0.11 are in paused mode, that's why 'data' event is never fired. resume would put the stream into floating mode, where data-events happen, but your resume doesn't call the original one.

tbh, your implementation looks pretty adventurous to me. what exactly do you want to do? the readable stream should do it's work in the _read method, nowhere else. your implementation makes the stream logic obsolete, and probably it would be easier for you to go with a simple event emitter instance. 

Floby

unread,
Aug 5, 2014, 7:50:24 AM8/5/14
to nod...@googlegroups.com
Hello,

You shouldn't be the one implementing .resume() .pause() and emit('readble') or emit('data')
This is why streams have been rewritten.
The API for implementing a readable stream is to inherit from stream.Readable and implement _write() and optionally _flush()

greelgorke

unread,
Aug 6, 2014, 3:46:48 AM8/6/14
to nod...@googlegroups.com
you mean _read() of course, right? ;)

Floby

unread,
Aug 6, 2014, 3:59:29 AM8/6/14
to nod...@googlegroups.com
Thinking a bit harder about this, you'd probably be better off with something like this :

module.exports = function TimeSerieStream() {
  var result = new stream.PassThrough({objectMode: true});
  
  setInterval(writeStuff, 500);

  function writeStuff() {
    result.write({my: 'data'})
  }
}

On Monday, 4 August 2014 14:46:42 UTC+2, manimal45 wrote:

Floby

unread,
Aug 7, 2014, 4:43:50 AM8/7/14
to nod...@googlegroups.com
Oh yeah, defnitely.


On Monday, 4 August 2014 14:46:42 UTC+2, manimal45 wrote:
Reply all
Reply to author
Forward
0 new messages