var util = require('util'),
stream = require("stream");
// A simple readable stream
var R = function(){
stream.Readable.call(this,{objectMode : true});
this.index = 1;
}
// R inherits from Readable
util.inherits(R, stream.Readable);
// @resume
// On Resume, starts a timer pushing {index : #} every second
// Fire readable after 1.5 second
// Clear interval after 5 seconds
// So basically, a consumer should receive about 5 messages
R.prototype.resume= function(){
console.log("R.resume");
var self= this;
this.timer = setInterval(this.pusher.bind(this), 1000);
setTimeout(this.emit.bind(this, "readable"),1500);
setTimeout(this.emit.bind(this, null), 5000);
setTimeout(function(){clearInterval(self.timer)}, 5000);
}
// The function invoked by resume to push messages
R.prototype.pusher = function(){
console.log("R.push");
var msg = {index : this.index++};
this.push(msg);
};
R.prototype._read= function(){
var self= this;
console.log("R._read");
}
R.prototype.pause= function(){
console.log("R.pause");
clearInterval(this.timer);
}
if (!module.parent){
var r = new R();
// This is never called ?!
r.on("data", function(data){
console.log("received data", data);
});
// This works but good luck to manage edge cases !
//r.on('readable', function() {
// var chunk;
// while (null !== (msg = r.read())) {
// console.log('got %d bytes of data', msg);
// }
//});
}