Group-By-Key Example

131 views
Skip to first unread message

Punit Naik

unread,
Mar 3, 2016, 6:37:12 AM3/3/16
to Onyx
Can anyone please post a simple group-by-key example for onyx for performing a simple wordcount. Tried to find example and even write the code for group-by-kry but could not manage to get both. 
Please help.

Lucas Bradstreet

unread,
Mar 3, 2016, 6:54:57 AM3/3/16
to Punit Naik, Onyx
Hi Punit,


You will need to use our new windowing / aggregations feature with group-by-key to get the result you're looking for. 

Feel free to join the gitter channel if you hit any problems with this example. 

Lucas

On 3 Mar 2016, at 7:37 PM, Punit Naik <naik.p...@gmail.com> wrote:

Can anyone please post a simple group-by-key example for onyx for performing a simple wordcount. Tried to find example and even write the code for group-by-kry but could not manage to get both. 
Please help.

--
You received this message because you are subscribed to the Google Groups "Onyx" group.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+...@googlegroups.com.
To post to this group, send email to onyx...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/onyx-user/00a020ee-abce-409b-b069-7b68a0ed28b5%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Punit Naik

unread,
Mar 3, 2016, 7:21:21 AM3/3/16
to Onyx, naik.p...@gmail.com
I had seen this example but I cannot understand how clojure.core/identity function will help in summing up all the counts for a particular key.

Mike Drogalis

unread,
Mar 3, 2016, 12:43:18 PM3/3/16
to Punit Naik, Onyx
Hi Punit,

In Onyx, every element in a workflow, called a task, has the ability to transform the data that it's receiving. The transformation function must be explicit. The example operates in two stages - the "map" portion - splitting a sentence into words, and the "reduce" portion - counting the words. The functional transformation in the map portion is clojure.string/split. There's no need for a functional transformation in the reduce phase because we're harnessing Onyx's stateful windowing primitives to perform a counting operation. Since no transformation is needed, we simply pass the segments through with the identify function.

Does that make sense?

Punit Naik

unread,
Mar 4, 2016, 3:38:10 AM3/4/16
to Onyx, naik.p...@gmail.com
But what if I wanted to do the counting without the windowing technique?

Lucas Bradstreet

unread,
Mar 4, 2016, 3:44:40 AM3/4/16
to Punit Naik, Onyx
You could do this by injecting an atom via a lifecycle, and updating it every time your function is called. I do not recommend this approach as you will lose your atom's state if the node counting it crashes/is rescheduled.

If you are worried about the way the windows are over a time period, use a global window. If not, why do you want to avoid using windows?

Thanks,

Lucas

Punit Naik

unread,
Mar 4, 2016, 3:57:13 AM3/4/16
to Onyx, naik.p...@gmail.com
Okay go it. Thanks :)

William Swaney

unread,
May 22, 2016, 5:14:05 PM5/22/16
to Onyx
Hi Everyone,

I'm quite new to Onyx, and have a question regarding the "aggregation" example - this seemed like the best spot to ask.

Where do I get the results of the word count?  I don't see anything as a result of the dump-window! function, nor does it emerge as a segment(s) from :out... I guarantee I'm missing something simple, but I can't see it yet :)

Thanks in advance and thanks for all the work on Onyx.

Regards,

Bill

Mike Drogalis

unread,
May 22, 2016, 11:57:11 PM5/22/16
to William Swaney, Onyx
Hi William!

On Sun, May 22, 2016 at 2:14 PM, William Swaney <the2...@gmail.com> wrote:
Hi Everyone,

I'm quite new to Onyx, and have a question regarding the "aggregation" example - this seemed like the best spot to ask.

Where do I get the results of the word count?  I don't see anything as a result of the dump-window! function, nor does it emerge as a segment(s) from :out... I guarantee I'm missing something simple, but I can't see it yet :)

Onyx doesn't yet support downstream emissions of segments from aggregators. We're working on it and hope to have this feature done within two months.

I checked out the 0.9.x branch of onyx-examples and ran the file. I did see a series of printlns from the dump-window! trigger sync function. Perhaps something else is awry? 

Thanks in advance and thanks for all the work on Onyx.

You're welcome! Thanks for the interest! 

Regards,

Bill



On Thursday, March 3, 2016 at 3:37:12 AM UTC-8, Punit Naik wrote:
Can anyone please post a simple group-by-key example for onyx for performing a simple wordcount. Tried to find example and even write the code for group-by-kry but could not manage to get both. 
Please help.

--
You received this message because you are subscribed to the Google Groups "Onyx" group.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+...@googlegroups.com.
To post to this group, send email to onyx...@googlegroups.com.

William Swaney

unread,
May 23, 2016, 12:03:36 AM5/23/16
to Onyx, the2...@gmail.com
Michael,

Thanks for the response.


On Sunday, May 22, 2016 at 8:57:11 PM UTC-7, Mike Drogalis wrote:
Hi William!

On Sun, May 22, 2016 at 2:14 PM, William Swaney <the2...@gmail.com> wrote:
Hi Everyone,

I'm quite new to Onyx, and have a question regarding the "aggregation" example - this seemed like the best spot to ask.

Where do I get the results of the word count?  I don't see anything as a result of the dump-window! function, nor does it emerge as a segment(s) from :out... I guarantee I'm missing something simple, but I can't see it yet :)

Onyx doesn't yet support downstream emissions of segments from aggregators. We're working on it and hope to have this feature done within two months.

Just to clarify, as I'm not sure I'm asking this correctly then.  Am I able to somehow get hold of the results of the word count?  If not that is of course fine, I assume then that a custom function would be needed to sum the counts.



I checked out the 0.9.x branch of onyx-examples and ran the file. I did see a series of printlns from the dump-window! trigger sync function. Perhaps something else is awry? 

Actually I was not clear - I see the printlns from dump-window!, just not anything in them that contains the results of the count :)

Mike Drogalis

unread,
May 23, 2016, 12:34:30 AM5/23/16
to William Swaney, Onyx
Looks like you found a bug! We changed the signature of the sync function a while ago, looks like we missed this one. Apologies!

I've pushed the fixes to the 0.9.x and master branches of onyx-examples. You should clearly see each word and its counts being printed out now. Make sure you drop a `lein clean` in since I also upgraded the Onyx version to 0.9.6.

Thanks for the heads up! :)

Punit Naik

unread,
May 23, 2016, 12:46:32 AM5/23/16
to Mike Drogalis, William Swaney, Onyx
Hi Everyone

I did a slight change in the aggregation example's core.clj code. I encapsulated the code block from 'onyx.api/submit-job' till the end in a function and supplied that in the :main in project.clj and performed a lein run. I got the following output:

Window extent {:window/id :word-counter, :window/task :count-words, :window/type :global, :window/aggregation :onyx.windowing.aggregation/count, :aggregate/record #object[onyx.windowing.window_extensions.GlobalWindow 0x7d8c45df "onyx.windowing.window_extensions.GlobalWindow@7d8c45df"], :aggregate/init #object[onyx.windowing.aggregation$count_aggregation_fn_init 0x6ed994e2 "onyx.windowing.aggregation$count_aggregation_fn_init@6ed994e2"], :aggregate/fn #object[onyx.windowing.aggregation$count_aggregation_fn 0x16411313 "onyx.windowing.aggregation$count_aggregation_fn@16411313"], :aggregate/super-agg-fn #object[onyx.windowing.aggregation$count_super_aggregation 0x7a7dda37 "onyx.windowing.aggregation$count_super_aggregation@7a7dda37"], :aggregate/apply-state-update #object[onyx.windowing.aggregation$set_value_aggregation_apply_log 0xd45fea3 "onyx.windowing.aggregation$set_value_aggregation_apply_log@d45fea3"]}, [{:trigger/window-id :word-counter, :trigger/refinement :accumulating, :trigger/on :segment, :trigger/threshold [5 :elements], :trigger/sync :aggregation.core/dump-window!, :trigger/id #uuid "ac733dd1-9241-7ad6-0fd8-fd9f36ac5057", :trigger/sync-fn #'aggregation.core/dump-window!} - {:window-id 1, :lower-bound -Infinity, :upper-bound Infinity, :context :new-segment}] contents: {"name" 1, "Coffee" 1, "Om" 1, "to" 1, "cold" 1}
Window extent {:window/id :word-counter, :window/task :count-words, :window/type :global, :window/aggregation :onyx.windowing.aggregation/count, :aggregate/record #object[onyx.windowing.window_extensions.GlobalWindow 0x7d8c45df "onyx.windowing.window_extensions.GlobalWindow@7d8c45df"], :aggregate/init #object[onyx.windowing.aggregation$count_aggregation_fn_init 0x6ed994e2 "onyx.windowing.aggregation$count_aggregation_fn_init@6ed994e2"], :aggregate/fn #object[onyx.windowing.aggregation$count_aggregation_fn 0x16411313 "onyx.windowing.aggregation$count_aggregation_fn@16411313"], :aggregate/super-agg-fn #object[onyx.windowing.aggregation$count_super_aggregation 0x7a7dda37 "onyx.windowing.aggregation$count_super_aggregation@7a7dda37"], :aggregate/apply-state-update #object[onyx.windowing.aggregation$set_value_aggregation_apply_log 0xd45fea3 "onyx.windowing.aggregation$set_value_aggregation_apply_log@d45fea3"]}, [{:trigger/window-id :word-counter, :trigger/refinement :accumulating, :trigger/on :segment, :trigger/threshold [5 :elements], :trigger/sync :aggregation.core/dump-window!, :trigger/id #uuid "ac733dd1-9241-7ad6-0fd8-fd9f36ac5057", :trigger/sync-fn #'aggregation.core/dump-window!} - {:window-id 1, :lower-bound -Infinity, :upper-bound Infinity, :context :new-segment}] contents: {"Mike" 1, "cold" 1, "coffee's" 1, "new" 1, "cup" 1, "Om" 1, "name" 1, "Coffee" 1, "to" 1, "get" 1}
Window extent {:window/id :word-counter, :window/task :count-words, :window/type :global, :window/aggregation :onyx.windowing.aggregation/count, :aggregate/record #object[onyx.windowing.window_extensions.GlobalWindow 0x7d8c45df "onyx.windowing.window_extensions.GlobalWindow@7d8c45df"], :aggregate/init #object[onyx.windowing.aggregation$count_aggregation_fn_init 0x6ed994e2 "onyx.windowing.aggregation$count_aggregation_fn_init@6ed994e2"], :aggregate/fn #object[onyx.windowing.aggregation$count_aggregation_fn 0x16411313 "onyx.windowing.aggregation$count_aggregation_fn@16411313"], :aggregate/super-agg-fn #object[onyx.windowing.aggregation$count_super_aggregation 0x7a7dda37 "onyx.windowing.aggregation$count_super_aggregation@7a7dda37"], :aggregate/apply-state-update #object[onyx.windowing.aggregation$set_value_aggregation_apply_log 0xd45fea3 "onyx.windowing.aggregation$set_value_aggregation_apply_log@d45fea3"]}, [{:trigger/window-id :word-counter, :trigger/refinement :accumulating, :trigger/on :segment, :trigger/threshold [5 :elements], :trigger/sync :aggregation.core/dump-window!, :trigger/id #uuid "ac733dd1-9241-7ad6-0fd8-fd9f36ac5057", :trigger/sync-fn #'aggregation.core/dump-window!} - {:window-id 1, :lower-bound -Infinity, :upper-bound Infinity, :context :new-segment}] contents: {"nom" 2, "Mike" 1, "cold" 1, "is" 1, "coffee's" 1, "new" 1, "cup" 1, "Om" 1, "name" 1, "a" 1, "Coffee" 1, "to" 1, "get" 1, "Time" 1}
Window extent {:window/id :word-counter, :window/task :count-words, :window/type :global, :window/aggregation :onyx.windowing.aggregation/count, :aggregate/record #object[onyx.windowing.window_extensions.GlobalWindow 0x7d8c45df "onyx.windowing.window_extensions.GlobalWindow@7d8c45df"], :aggregate/init #object[onyx.windowing.aggregation$count_aggregation_fn_init 0x6ed994e2 "onyx.windowing.aggregation$count_aggregation_fn_init@6ed994e2"], :aggregate/fn #object[onyx.windowing.aggregation$count_aggregation_fn 0x16411313 "onyx.windowing.aggregation$count_aggregation_fn@16411313"], :aggregate/super-agg-fn #object[onyx.windowing.aggregation$count_super_aggregation 0x7a7dda37 "onyx.windowing.aggregation$count_super_aggregation@7a7dda37"], :aggregate/apply-state-update #object[onyx.windowing.aggregation$set_value_aggregation_apply_log 0xd45fea3 "onyx.windowing.aggregation$set_value_aggregation_apply_log@d45fea3"]}, [{:trigger/window-id :word-counter, :trigger/refinement :accumulating, :trigger/on :segment, :trigger/threshold [5 :elements], :trigger/sync :aggregation.core/dump-window!, :trigger/id #uuid "ac733dd1-9241-7ad6-0fd8-fd9f36ac5057", :trigger/sync-fn #'aggregation.core/dump-window!} - {:window-id 1, :lower-bound -Infinity, :upper-bound Infinity, :context :new-segment}] contents: {"nom" 3, "Mike" 1, "cold" 1, "is" 1, "coffee's" 1, "new" 1, "cup" 1, "coffee" 2, "Om" 1, "name" 1, "My" 2, "a" 1, "Coffee" 1, "to" 1, "get" 1, "Time" 1}

So I guess the example is sort of working fine.

--
You received this message because you are subscribed to a topic in the Google Groups "Onyx" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/onyx-user/Ocm28i834cc/unsubscribe.
To unsubscribe from this group and all its topics, send an email to onyx-user+...@googlegroups.com.

To post to this group, send email to onyx...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Thank You

Regards

Punit Naik

William Swaney

unread,
May 23, 2016, 1:00:31 AM5/23/16
to Onyx, the2...@gmail.com
Mike, 

Thanks for this, it's working just fine now!

Regards,

Bill
Reply all
Reply to author
Forward
0 new messages