Joining two streams

74 views
Skip to first unread message

Max Ott

unread,
Sep 17, 2013, 4:12:18 AM9/17/13
to sk...@googlegroups.com
A newbie question, but I can't find anything in the documentation.

I have two event streams. To simplify things let's assume one is reporting events of the type <entity_id, color, time> and the other one <entity_id, size, time> where both event streams are completely uncorrelated in time and order (event_ids are the same).

Now how do I, let's say, extend any incoming 'color' event with the most recent 'size' event for the same event_id?

Cheers,
-max

Ben Johnson

unread,
Sep 17, 2013, 1:09:05 PM9/17/13
to sk...@googlegroups.com
hey Matt-

The docs are going to get a make over with v0.4.0. They're pretty rough right now.

To overlay values from two streams you'll need to use a non-transient property for "size". Let's say you have the following events:

{"timestamp":"2000-01-01T00:00:00Z", "data":{"size":10}}
{"timestamp":"2000-01-01T00:01:00Z", "data":{"color":"blue"}}
{"timestamp":"2000-01-01T00:02:00Z", "data":{"color":"red"}}
{"timestamp":"2000-01-01T00:04:00Z", "data":{"size":20}}
{"timestamp":"2000-01-01T00:05:00Z", "data":{"color":"red"}}

Assuming "size" is non-transient then you can run a query like this:

$ curl localhost:8585/tables/my_table/query -d '
WHEN color != "" THEN
  SELECT count() GROUP BY size, color
END
'

And you'll get:

{
  "size":{
    "10":{
      "color":{
        "blue":{"count":1},
        "red":{"count":1}
      }
    },
    "20":{
      "color":{
        "red":{"count":1}
      }
    }
  }
}
    
Since the "size" property is non-transient then it'll persist until changed and the value can be used in other events that occur after it is set -- even if those events come from an unrelated source. This also has an effect that if you delete the "size" change at "2000-01-01T00:04:00Z" then the second "red" will then get grouped under "size=10".

Does that make sense?


Ben

Max Ott

unread,
Sep 18, 2013, 12:01:06 AM9/18/13
to sk...@googlegroups.com
Thanks, that makes sense. Now is there are an efficient way to import large datasets? My events are either in AVRO files or LevelDB. 

Cheers,
-max

Ben Johnson

unread,
Sep 18, 2013, 5:06:07 PM9/18/13
to sk...@googlegroups.com
Max (sorry I called you Matt last time)-

There's a sky-importer project here: https://github.com/skydb/sky-importer

To use that you'd need to export into JSON and then import. The API is pretty simple though. The importer is just one Go file with the actual insert logic in importData(). You'll need to have the Sky Go client set up to point to the "unstable" branch. That's the only caveat.

I'm looking through it now and it doesn't look like it has the bulk importing set up. It just does one event at a time. I'm on my way to Strange Loop right now but I can try to update the sky-importer later. Also, you can use the "-nosync" option on skyd to not flush to disk on every write.

Let me know if you have any issues with it.


Reply all
Reply to author
Forward
0 new messages