Joins in Onyx

112 views
Skip to first unread message

Mor Yoavi

unread,
Dec 13, 2015, 10:46:58 AM12/13/15
to Onyx
I haven't done anything with Onyx since many releases ago. I was wondering if there is any prescribed methodology, if at all, to do database style joins (inner, outer) on data in Onyx or anything on the horizon that could be used to perform joins (ex: datalog). I believe I read in a post from awhile ago that it was not possible to do joins in Onyx, however given some of the recent updates, I am wondering if that changed.

For instance, I was thinking the new windowing, triggers, and aggregation functionality could be used together to achieve at least some kind of results. Another crude solution is to do queries/joins in the tasks but it is my view that doing any kind of queries in a processing step whether in Onyx, Storm, or others is generally a bad idea.

For now I am performing joins outside Onyx, but I want to keep things contained in Onyx as much as possible. The main reason is that the minute I have to use another framework, not only do I have more moving parts, but I generally lose a lot of the dynamic data-driven approach that is the reason I am using Onyx in the first place. Moreover, this forces me to break up workflows at points where I need to join data which introduces all kinds of extra overhead.

Anyway, other streaming frameworks do have full or limited support for joins as I am sure you aware. For example in Spark, you can do this a few ways (pair RDDs, data frames, etc.) - 


Please let me know if you have any recommendations about joins in the meantime or if there is anything in planning.

Thanks once again for building Onyx and I just want to say I am very pleased with the last release, you really delivered exactly what I needed. Keep up the good work.

Mike Drogalis

unread,
Dec 13, 2015, 6:31:36 PM12/13/15
to Mor Yoavi, Onyx
Hi Mor,


On Sun, Dec 13, 2015 at 7:46 AM, Mor Yoavi <ytra...@gmail.com> wrote:
I haven't done anything with Onyx since many releases ago. I was wondering if there is any prescribed methodology, if at all, to do database style joins (inner, outer) on data in Onyx or anything on the horizon that could be used to perform joins (ex: datalog). I believe I read in a post from awhile ago that it was not possible to do joins in Onyx, however given some of the recent updates, I am wondering if that changed.

In the next few months we'll be beefing up support for more advanced aggregations via the Windowing feature. 

For instance, I was thinking the new windowing, triggers, and aggregation functionality could be used together to achieve at least some kind of results. Another crude solution is to do queries/joins in the tasks but it is my view that doing any kind of queries in a processing step whether in Onyx, Storm, or others is generally a bad idea.

Doing joins in a task is fine and completely necessary as long as its not a potentially permanently blocking operation. Performing a join is no different than doing a database read, write, or any other operation that needs to talk to the network.

There are two ways to go about doing joins. The strategy you pick depends on the size of the data set.

If your join space is small, you can use group-by-key and a global window to put all data with the same key in the same bucket, then write a custom aggregation operation to put like-with-like. This is pretty straightforward as it doesn't involve any external processes, but has the downside that all your data needs to fit in memory.

If your join space is large, you need to use a database. Query the database and look for a match. If it doesn't exist, write to the database. If it matches, do the join and write the results back to the database. Be sure that your transactional semantics are correct here.

There was a good blog post about streaming joins and all the different ways you can handle it, but ultimately if you want to deal with data across all of time, you need to use stable storage.
 
For now I am performing joins outside Onyx, but I want to keep things contained in Onyx as much as possible. The main reason is that the minute I have to use another framework, not only do I have more moving parts, but I generally lose a lot of the dynamic data-driven approach that is the reason I am using Onyx in the first place. Moreover, this forces me to break up workflows at points where I need to join data which introduces all kinds of extra overhead.

Anyway, other streaming frameworks do have full or limited support for joins as I am sure you aware. For example in Spark, you can do this a few ways (pair RDDs, data frames, etc.) - 


Spark can do this because it's designed as a batch processing platform, and hence has its own stable storage via the distributed file system it connects to. This is basically the equivalent of using a database. 

Please let me know if you have any recommendations about joins in the meantime or if there is anything in planning.

Thanks once again for building Onyx and I just want to say I am very pleased with the last release, you really delivered exactly what I needed. Keep up the good work.

Thanks for the kind words! 

--
You received this message because you are subscribed to the Google Groups "Onyx" group.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+...@googlegroups.com.
To post to this group, send email to onyx...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/onyx-user/c2f0c2bf-6298-44ac-a8d6-f44bfe4c5ebd%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Mor Yoavi

unread,
Dec 13, 2015, 9:24:06 PM12/13/15
to Onyx, ytra...@gmail.com
Mike,

Sounds good, thank you. Just wanted a sanity check. I figured as much regarding what you said about the joins. 

The main things I was concerned about were bottle-necking and memory consumption, particularly because I need lots of infinite streams processed relatively fast (real or near real-time within reason). Typically the input streams are either from Kafka or Datomic and each workflow often has 1 or more input streams. Generally I'm trying to get people to move away from the need to have multiple input streams and do more in Kafka, but there are some Kafka specific issues I won't go into here that need to be resolved. Mostly we just need more machines :)

Anyway, bottle-necking is mostly mitigated at least for me by carefully controlling how many peers are involved and how hard I am hitting any DB I am joining. There's enough knobs and control over my input streams as well I think to deal with this, but I still hate doing a query to join per task, but these results are a bit large for memory.  Luckily I'm also using datomic most of the time, so it will at least be hitting the peer caches pretty heavily. My other concern regarding memory is pretty much confirmed by what you said about aggregation. I think just common sense here is enough, though in my dynamic situations I need to be careful. I was doing a lot of things manually in datalog in an older app, so I would imagine I can replicate in-memory datalog with aggregation features pretty easily. Looking forward to seeing more on the advanced aggregations - IMO, the more aggregation functionality, the better.

Thanks again.
Reply all
Reply to author
Forward
0 new messages