The problem is that the data events are fired asynchronously, and the
data then needs to have a sliding window across those events to
extract the actual json chunks between newlines, and each of those
needs to be asynchronously processed, and detect when it's all done.
The data could flow in faster or slower than the processing (or both
during the exchange) so it's a matter of coupling two independent
async streams loosely.
I've tried it a few ways all of which had problems, either in the
parsing, ordering, buffering (we want to process as fast as we can and
not buffer it all first) or end detection. Currently it's using async
queues but it feels like there should be a more ideal solution here,
possibly using node's streams somehow?
function gatherFromUrl(svcId, callback) {
var url = path.join("Me", svcId, "?all=true&stream=true");
url = lconfig.lockerBase + "/" + url;
logger.info("updating from "+url);
var req = request.get({uri:url}, function(err){
if(err) logger.error(err);
});
var total = 0;
var q = async.queue(function(chunk, cb){
if(chunk == "") return cb();
total++;
try{
exports.add(svcId, JSON.parse(chunk), cb);
}catch(E){
logger.error("got "+E+" processing "+chunk);
return cb();
}
},1);
var buff = "";
req.on("data",function(data){
buff += data.toString();
var chunks = buff.split('\n');
buff = chunks.pop(); // if was end \n, == '', if mid-stream
it'll be a not-yet-complete chunk of json
chunks.forEach(q.push);
});
var ended = false;
q.drain = function(){
if(!ended) return; // drain can be called many times, we only
care when it's after data is done coming in
logger.info("indexed "+total+" items from "+svcId);
callback();
};
req.on("end",function(){
ended = true;
q.push(""); // this triggers the drain if there was no data, GOTCHA
});
}
Thanks!
Jer
You could replace the line-oriented bit with the carrier module, which I
found in the npm registry and pulled into our package.json already.
I don't understand quite what your code is trying to do though.
--
- mdz