Does stream.unshift(chunk) reexecute _read() with "chunk"?

118 views
Skip to first unread message

kyogron

unread,
Mar 30, 2013, 8:55:10 AM3/30/13
to nod...@googlegroups.com
Hello,

I am currently studying the stream api.
In the below example from the api docs "unshift" is used to reparse chunk which belongs to the body and not the header.

This actually works but I do not understand how. 
1. Does unshift recall _read with the passed chunk?
2. How can I imagine the read queue? I currently only know that push will recall _read and emit the given data.

// A parser for a simple data protocol.
// The "header" is a JSON object, followed by 2 \n characters, and
// then a message body.
//
// Note: This can be done more simply as a Transform stream.  See below.

function SimpleProtocol(source, options) {
  if (!(this instanceof SimpleProtocol))
    return new SimpleProtocol(options);

  Readable.call(this, options);
  this._inBody = false;
  this._sawFirstCr = false;

  // source is a readable stream, such as a socket or file
  this._source = source;

  var self = this;
  source.on('end', function() {
    self.push(null);
  });

  // give it a kick whenever the source is readable
  // read(0) will not consume any bytes
  source.on('readable', function() {
    self.read(0);
  });

  this._rawHeader = [];
  this.header = null;
}

SimpleProtocol.prototype = Object.create(
  Readable.prototype, { constructor: { value: SimpleProtocol }});

SimpleProtocol.prototype._read = function(n) {
  if (!this._inBody) {
    var chunk = this._source.read();

    // if the source doesn't have data, we don't have data yet.
    if (chunk === null)
      return this.push('');

    // check if the chunk has a \n\n
    var split = -1;
    for (var i = 0; i < chunk.length; i++) {
      if (chunk[i] === 10) { // '\n'
        if (this._sawFirstCr) {
          split = i;
          break;
        } else {
          this._sawFirstCr = true;
        }
      } else {
        this._sawFirstCr = false;
      }
    }

    if (split === -1) {
      // still waiting for the \n\n
      // stash the chunk, and try again.
      this._rawHeader.push(chunk);
      this.push('');
    } else {
      this._inBody = true;
      var h = chunk.slice(0, split);
      this._rawHeader.push(h);
      var header = Buffer.concat(this._rawHeader).toString();
      try {
        this.header = JSON.parse(header);
      } catch (er) {
        this.emit('error', new Error('invalid simple protocol data'));
        return;
      }
      // now, because we got some extra data, unshift the rest
      // back into the read queue so that our consumer will see it.
      var b = chunk.slice(split);
      this.unshift(b);

      // and let them know that we are done parsing the header.
      this.emit('header', this.header);
    }
  } else {
    // from there on, just provide the data to our consumer.
    // careful not to push(null), since that would indicate EOF.
    var chunk = this._source.read();
    if (chunk) this.push(chunk);
  }
};

// Usage:
var parser = new SimpleProtocol(source);
// Now parser is a readable stream that will emit 'header'
// with the parsed header data.

Best Regards,
Bodo

kyogron

unread,
Mar 30, 2013, 10:53:13 AM3/30/13
to nod...@googlegroups.com
Ah now I understand it.

this.unshift really only puts the chunk in front of the buffer which than will get emitted. 
If there will come some more body chunk than it will pushed as normal.

I just do not get the use case actually. 
I mean in the given case it would actually not matter if we use unshift or push because it will be the first item in the buffer array anyway.

Is this correct that this is a bad example or have I missed something?

Andrew Hart

unread,
Mar 30, 2013, 11:38:38 AM3/30/13
to nod...@googlegroups.com
Am Samstag, 30. März 2013 13:55:10 UTC+1 schrieb kyogron:
I am currently studying the stream api.
In the below example from the api docs "unshift" is used to reparse chunk which belongs to the body and not the header.

On Saturday, March 30, 2013 8:53:13 AM UTC-6, kyogron wrote:
Ah now I understand it.

this.unshift really only puts the chunk in front of the buffer which than will get emitted. 
If there will come some more body chunk than it will pushed as normal.

I just do not get the use case actually. 
I mean in the given case it would actually not matter if we use unshift or push because it will be the first item in the buffer array anyway.

Is this correct that this is a bad example or have I missed something?


I think you're right; in this case this.unshift is only called when nothing has ever been pushed so it will be the first item in the array.

It may be more instructive to replace:

      // now, because we got some extra data, unshift the rest
      // back into the read queue so that our consumer will see it.
      var b = chunk.slice(split);
      this.unshift(b);
with:

      // now, because we got some extra data, unshift the rest

      // back into the source so that we will see it.
      var b = chunk.slice(split);
      this._source.unshift(b);

      // try again; next time we'll take the this._inBody
      // path and see the b which was just unshifted.
      this.push('');

Bodo Kaiser

unread,
Mar 30, 2013, 11:45:57 AM3/30/13
to nod...@googlegroups.com
If you wouldn't mind I would send an pull request about this documentation issue

--
--
Job Board: http://jobs.nodejs.org/
Posting guidelines: https://github.com/joyent/node/wiki/Mailing-List-Posting-Guidelines
You received this message because you are subscribed to the Google
Groups "nodejs" group.
To post to this group, send email to nod...@googlegroups.com
To unsubscribe from this group, send email to
nodejs+un...@googlegroups.com
For more options, visit this group at
http://groups.google.com/group/nodejs?hl=en?hl=en
 
---
You received this message because you are subscribed to a topic in the Google Groups "nodejs" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/nodejs/hTA91YugVAk/unsubscribe?hl=en.
To unsubscribe from this group and all its topics, send an email to nodejs+un...@googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.
 
 

Reply all
Reply to author
Forward
0 new messages