Questions around semantics

91 views
Skip to first unread message

David Greenberg

unread,
Sep 29, 2014, 9:46:47 PM9/29/14
to onyx...@googlegroups.com
I have a few questions about the semantics of Onyx that I'm trying to wrap my head around.

Is there any notion of a shuffle-like operation like in MapReduce or Storm?

Is it possible to do joins?

Thanks,
David

Mike Drogalis

unread,
Sep 30, 2014, 4:37:48 PM9/30/14
to David Greenberg, onyx...@googlegroups.com
Hey David, thanks for asking!

In short... 

Shuffle is the default algorithm for Onyx when it disperses work across peers allocated to the same task. The only other option is to use :onyx/type of :grouper to pin all segments that are a function of the same value to a particular virtual peer. I'll expand below on what this actually looks like under the hood if you're interested.

As far as joining, I understand two meanings here. The first is a batch-operation type of join - like trying to join two tables by a common key. Onyx currently doesn't support a joining operation the developer without using durable storage, and I can't think of an easy way for the framework to do that. Onyx is a hybrid that builds batch processing on top of streaming. Spark is a hybrid that builds streaming on top of batching. Both of these techniques have trade-offs, and lack of batch joining is one that I gave up. Onyx can't do sorting without user-level durable storage, either.

The other sense of "join" is in the streaming and Storm world, where data can be forked to two different tasks, and optionally "joined", by sending the data back to a single task. Onyx's workflows are trees. You can write them in graph-style, like how Storm supports it, then "unravel" the graph. See https://groups.google.com/d/msg/clojure/OmHzAEfYe9U/jpV2eBW3PeAJ for a discussion of how graph unraveling can potentially work.

Okay, in long...

Here's how everything is working under the hood. I haven't written documentation about this, but I think this will be a good start, and might help clarify the end to what you seek.

Here's a few facts to get started:
- Each virtual peer executes exactly zero or one tasks at any given time.
- Every keyword in a workflow is a task.
- Each task has a exactly one ingress HornetQ queue associated with it.
- Each task has n egress HornetQ queues, depending on how many children the task has in the workflow.

So for the workflow:

{:in {:trans-1 {:trans-2 :out-1
                      :trans-3 :out-2}}}

Onyx builds chains of HornetQ queue's behind the scenes. :trans-1 has exactly one ingress queue, and two egress queues. The ingress queue is :in's egress queue. The egress queues are the ingress queues for :trans-2 and :trans-3. The capacity of these queues can be scaled by using HornetQ clustering, and that's described in the User Guide.

When segments move from a task to task, they're compressed with Fressian, by default, enqueued on HornetQ. HornetQ uses a load balancer internally to figure out which physical queue to put the segment on, hence the "shuffle grouping". Admittedly, this isn't a lightning fast architecture since the segments hit disk between tasks. But using a 10 gigabit switch between nodes in your data center might make this good enough for your data set. I'd like to rectify this in the future, but it's the way it is for now.

I think that's probably enough for the moment, time for some feedback. Does that make sense and answer your questions? I'm happy to clarify.


--
You received this message because you are subscribed to the Google Groups "Onyx" group.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+...@googlegroups.com.
To post to this group, send email to onyx...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/onyx-user/d2ab17ed-cd5c-4428-9294-f838a25aed37%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply all
Reply to author
Forward
0 new messages