levelup interface

21 views
Skip to first unread message

Mark Lester

unread,
May 30, 2016, 2:42:00 PM5/30/16
to Sequelize
I need an implementation of the levelup interface. it looks easy enough, i just wondered if anyone had done anything on this,

Mark Lester

unread,
May 31, 2016, 12:51:35 PM5/31/16
to Sequelize
I stumbled across the serialise/stream interface discussion https://github.com/sequelize/sequelize/issues/2454 but i am not concerned about performance over a large set, but i realise that gets messy.

Mick Hansen

unread,
May 31, 2016, 12:58:01 PM5/31/16
to Mark Lester, Sequelize
Haven't heard of anyone working with levelup.
Streaming is something we'd like to do but it requires a large refactor.

On Tue, May 31, 2016 at 6:51 PM, Mark Lester <mark.chr...@gmail.com> wrote:
I stumbled across the serialise/stream interface discussion https://github.com/sequelize/sequelize/issues/2454 but i am not concerned about performance over a large set, but i realise that gets messy.



--
Mick Hansen
@mhansendev
mhansen.io

Mark Lester

unread,
May 31, 2016, 2:29:14 PM5/31/16
to Sequelize, mark.chr...@gmail.com
I've got this merkle tree code from ethereum https://github.com/ethereumjs/merkle-patricia-tree, its expecting just a lightweight hash. i guess streaming is al about how much you cache to make it efficient but I dont care about that right now, I just would rather leave this crypto stuff as is.

Mick Hansen

unread,
Jun 1, 2016, 2:56:58 AM6/1/16
to Mark Lester, Sequelize
Streaming is mostly for working with larger than memory data sets

Mark Lester

unread,
Jun 1, 2016, 6:54:17 AM6/1/16
to Sequelize, mark.chr...@gmail.com
This streams thing is about  events right,and knowing that someone is listening to your 'data', so I guess I have to override the on() method so I know about registrations for 'data', and in that case then call the real on() with a wrapper around the users callback so we can know data has been consumed, so we'll know when we've run out of data and need to fetch the next block and thus send a block of more data messages to be consumed. Handling more than one reader makes that slightly harder.
"Paused" mode is obviously trivial.

Does this 'data' event clash with anything ?. I am supposed to emit 'data','error','close',and 'end'.
If I start going
model.trigger('data',record)
or any of those, does that interfere with anything ?.

I can easily take a where clause to the  options and pass that along with the offsetand the limit values required to findAll().
So you can go
Model.createReadStream({where:{...},buffer_size:100}).on('data',function(record){...



Mark Lester

unread,
Jun 1, 2016, 7:12:27 AM6/1/16
to Sequelize, mark.chr...@gmail.com
scrub the last bit, its a different object so no matter, I am not triggering or listening to the model but this stream.
My confusion really is that this doesnt seem to be that hard, and it obviously is otherwise you would have done it.
I think I have what I need for this anyway.

Mark Lester

unread,
Jun 1, 2016, 9:07:12 AM6/1/16
to Sequelize, mark.chr...@gmail.com
I came up with this. I need to extend it a bit so it will just stream the value field you specified to look for,not the JSON of the entire record, as thats what a simple key-value system wants of course.
So thats the hard bit, i just need a get and put for my levelup with readable stream interface.

module.exports=function (options){
    options
.model=this;
   
return Stream(options);
}

var Stream=function(options){
   
var s=new (require('stream').Readable)(options);
    s
._read=_read;
    s
.options=options;
    s
.model=options.model;
    s
.offset=0;
   
return s;
}

function _read(size){
   
var self=this;
   
this.options.offset=this.offset;
   
this.options.limit=size;
   
return this.model.findAll(this.options)
   
.then(function(records){
       
if (!records.length)
           
self.emit('end');

       
self.offset+=records.length;
        _
.each(records,function(r){
           
self.push(JSON.stringify(r));
       
});
   
});
}



Mick Hansen

unread,
Jun 2, 2016, 4:21:17 AM6/2/16
to Mark Lester, Sequelize
I'm not sure what you're looking to do here Mark.
Having a pseudo stream is effectively pointless, a stream is only valueable if it's built from the ground up (preferrably with database level streaming aswell).

Mark Lester

unread,
Jun 2, 2016, 8:58:38 AM6/2/16
to Sequelize, mark.chr...@gmail.com
yes, i was kind of getting there in terms of it needing to be working directly with the DB platform for it to be any notable use.
what I have ended up with atm is this, which i am sticking in classMethods.
So what's the point ?, I have some stuff which is expecting a levelup interface of put and get and createReadableStream. its crypto stuff, and so while I want this merkle trie in the DB platform so we can keep it safe and also extend it (we need lots of tries), I dont want to touch ethereum's code

module.exports={
    createReadableStream
:function (options){
        options
=options||{};
        options
.model=this;
       
this.valueField=options.valuefield=options.valueField || (this.options.Options && this.options.Options.valueField);
       
this.keyField=options.keyField=options.keyField || (this.options.Options && this.options.Options.keyField);
        options
.where=options.where || this.where;
//console.log("MAKING A STREAM "+JSON.stringify(options));
       
return Stream(options);
   
},

   
get:function(key,options,callback){
       
var self=this;
       
var where={};
       
where[this.keyField||'key']=key;
       
return this.findOne({
           
where:where
       
})
       
.then(function(record){
           
if (!record)
               
return null;
           
if (self.options.Options && self.options.Options.valueField)
               
return record.get(self.options.Options.valueField)
           
else
               
return JSON.stringify(record);
       
})
       
.then(function(result){
           
if (callback)
                callback
(result);
       
});
   
},

    put
:function (key,value,options,callback){
       
var record={};
        record
[this.options.Options.keyField||'key']=key;
        record
[this.options.Options.valueField||'value']=value;
       
return this.upsert(record)
       
.then(function(result){
           
if (callback)
                callback
(result);
           
return;
       
})

   
}

}


var Stream=function(options){
   
var s=new (require('stream').Readable)(options);
    s
._read=_read;
    s
.options=options;
    s
.model=options.model;
    s
.offset=0;
   
return s;
}


function _read(size){
   
var self=this;

   
var opts={};
    opts
.limit=size;
    opts
.offset=this.offset;
   
var O=this.options.model.options.Options;
    opts
.where=O.where;
   
return this.model.findAll(opts)
   
.then(function(records){
       
if (!records.length)
           
self.push(null);

       
self.offset+=records.length;
        _
.each(records,function(r){
           
if (O && O.valueField)
               
self.push(r.get(O.valueField));
           
else
               
self.push(JSON.stringify(r));
       
});
   
});
}




Mark Lester

unread,
Jun 2, 2016, 2:20:48 PM6/2/16
to Sequelize, mark.chr...@gmail.com
btw, I know that i am confused about what "size" means, which is part of the reason it needs to be wired more directly to the DB stuff, and the database language doesnt have the concept of event streams, but I dont think it breaks anything other than loading way more stuff than 16kb's worth.
Anyway, I hope the requirement make sense.

Reply all
Reply to author
Forward
0 new messages