a node data processing pattern question

17 views
Skip to first unread message

Jeremie Miller

unread,
Dec 16, 2011, 10:19:19 AM12/16/11
to singl...@googlegroups.com
Ok, so this is the first time I think I've encountered this pattern in
node, and here's my current solution, but I'd love input on if there's
any better ideas here!

The problem is that the data events are fired asynchronously, and the
data then needs to have a sliding window across those events to
extract the actual json chunks between newlines, and each of those
needs to be asynchronously processed, and detect when it's all done.
The data could flow in faster or slower than the processing (or both
during the exchange) so it's a matter of coupling two independent
async streams loosely.

I've tried it a few ways all of which had problems, either in the
parsing, ordering, buffering (we want to process as fast as we can and
not buffer it all first) or end detection. Currently it's using async
queues but it feels like there should be a more ideal solution here,
possibly using node's streams somehow?


function gatherFromUrl(svcId, callback) {
var url = path.join("Me", svcId, "?all=true&stream=true");
url = lconfig.lockerBase + "/" + url;
logger.info("updating from "+url);
var req = request.get({uri:url}, function(err){
if(err) logger.error(err);
});
var total = 0;
var q = async.queue(function(chunk, cb){
if(chunk == "") return cb();
total++;
try{
exports.add(svcId, JSON.parse(chunk), cb);
}catch(E){
logger.error("got "+E+" processing "+chunk);
return cb();
}
},1);
var buff = "";
req.on("data",function(data){
buff += data.toString();
var chunks = buff.split('\n');
buff = chunks.pop(); // if was end \n, == '', if mid-stream
it'll be a not-yet-complete chunk of json
chunks.forEach(q.push);
});
var ended = false;
q.drain = function(){
if(!ended) return; // drain can be called many times, we only
care when it's after data is done coming in
logger.info("indexed "+total+" items from "+svcId);
callback();
};
req.on("end",function(){
ended = true;
q.push(""); // this triggers the drain if there was no data, GOTCHA
});
}


Thanks!

Jer

Matt Zimmerman

unread,
Jan 28, 2012, 6:10:18 PM1/28/12
to singl...@googlegroups.com
On Fri, Dec 16, 2011 at 09:19:19AM -0600, Jeremie Miller wrote:
> Ok, so this is the first time I think I've encountered this pattern in
> node, and here's my current solution, but I'd love input on if there's
> any better ideas here!
>
> The problem is that the data events are fired asynchronously, and the
> data then needs to have a sliding window across those events to
> extract the actual json chunks between newlines, and each of those
> needs to be asynchronously processed, and detect when it's all done.
> The data could flow in faster or slower than the processing (or both
> during the exchange) so it's a matter of coupling two independent
> async streams loosely.
>
> I've tried it a few ways all of which had problems, either in the
> parsing, ordering, buffering (we want to process as fast as we can and
> not buffer it all first) or end detection. Currently it's using async
> queues but it feels like there should be a more ideal solution here,
> possibly using node's streams somehow?

You could replace the line-oriented bit with the carrier module, which I
found in the npm registry and pulled into our package.json already.

I don't understand quite what your code is trying to do though.

--
- mdz

Jeremie Miller

unread,
Feb 1, 2012, 1:13:49 PM2/1/12
to singl...@googlegroups.com
Carrier looks interesting and def overlaps some of what I did already in lutil's streamFromUrl(), I'll have to play with it to see how well it gets along with pipes from http instead of a raw socket, should be totally fine though, thx!
Reply all
Reply to author
Forward
0 new messages