Distinct (Aggregation) Example

44 views
Skip to first unread message

Punit Naik

unread,
May 16, 2017, 4:42:06 AM5/16/17
to Onyx
Hi Guys

I have written the following code to aggregate all distinct segments from the input:

(ns aggregation.core
  (:require [clojure.core.async :refer [chan >!! <!! close!]]
            [onyx.extensions :as extensions]
            [onyx.plugin.core-async :refer [take-segments!]]
            [onyx.api]))

(defn distinct-aggregation-fn-init [window]
  #{})

(defn distinct-aggregation-apply-log [window state v]
  ;; Log command is not needed for single transition type
  (clojure.core/conj state v))

(defn distinct-aggregation-fn [window state segment]
  ;; Log command is not needed for single transition type
  segment)

(defn distinct-super-aggregation [window state-1 state-2]
  (into state-1 state-2))

(def ^:export distinct
  {:aggregation/init distinct-aggregation-fn-init
   :aggregation/create-state-update distinct-aggregation-fn
   :aggregation/apply-state-update distinct-aggregation-apply-log
   :aggregation/super-aggregation-fn distinct-super-aggregation})
      
(def workflow
  [[:in :distinct-segments]
   [:distinct-segments :out]])

(def capacity 1000)

(def input-chan (chan capacity))
(def input-buffer (atom {}))

(def output-chan (chan capacity))

(def batch-size 10)

(def catalog
   ; Taking in the input
  [{:onyx/name :in
    :onyx/plugin :onyx.plugin.core-async/input
    :onyx/type :input
    :onyx/medium :core.async
    :onyx/batch-size batch-size
    :onyx/max-peers 1
    :onyx/doc "Reads segments from a core.async channel"}
   
   ; Applying overall distinct i.e. on the entire dataset
   {:onyx/name :distinct-segments
    :onyx/fn :clojure.core/identity
    :onyx/type :function
    :onyx/batch-size 1000}
   
   ; Output the data
   {:onyx/name :out
    :onyx/plugin :onyx.plugin.core-async/output
    :onyx/type :output
    :onyx/medium :core.async
    :onyx/max-peers 1
    :onyx/batch-size batch-size
    :onyx/doc "Writes segments to a core.async channel"}])

(def windows
  [{:window/id :distinct-segments-window
    :window/task :distinct-segments
    :window/type :global
    :window/aggregation ::distinct}])

(def triggers
  [{:trigger/window-id :distinct-segments-window
    :trigger/id :distinct-segments-trigger
    :trigger/refinement :onyx.refinements/accumulating
    :trigger/on :onyx.triggers/segment
    :trigger/threshold [1 :element]
    :trigger/sync ::dump-window!}])

(defn dump-window!
  [event window trigger {:keys [lower-bound upper-bound] :as window-data} state]
  (do
    (println "window-id -> " (:window/id window))
    (println "current state -> " state)
    state))

(def input-segments
  ["punit naik" "punit naik"])

(doseq [segment input-segments]
  (>!! input-chan segment))

(close! input-chan)

(def id (java.util.UUID/randomUUID))
    
(def always-true (constantly true))

(def env-config
  {:zookeeper/address "127.0.0.1:2188"
   :zookeeper/server? true
   :zookeeper.server/port 2188
   :onyx/tenancy-id id})

(def peer-config
  {:zookeeper/address "127.0.0.1:2188"
   :onyx/tenancy-id id
   :onyx.peer/job-scheduler :onyx.job-scheduler/balanced
   :onyx.messaging/impl :aeron
   :onyx.messaging/peer-port 40200
   :onyx.messaging/bind-addr "localhost"})

(def env (onyx.api/start-env env-config))

(def peer-group (onyx.api/start-peer-group peer-config))

(def n-peers (count (set (mapcat identity workflow))))

(def v-peers (onyx.api/start-peers n-peers peer-group))

(defn inject-in-ch [event lifecycle]
  {:core.async/buffer input-buffer
   :core.async/chan input-chan})

(defn inject-out-ch [event lifecycle]
  {:core.async/chan output-chan})

(def in-calls
  {:lifecycle/before-task-start inject-in-ch})

(def out-calls
  {:lifecycle/before-task-start inject-out-ch})

(def lifecycles
  [{:lifecycle/task :in
    :lifecycle/calls :aggregation.core/in-calls}
   {:lifecycle/task :in
    :lifecycle/calls :onyx.plugin.core-async/reader-calls}
   {:lifecycle/task :out
    :lifecycle/calls :aggregation.core/out-calls}
   {:lifecycle/task :out
    :lifecycle/calls :onyx.plugin.core-async/writer-calls}])

(defn -main
  [& args]
  (do
    (as-> (onyx.api/submit-job peer-config
                        {:workflow workflow
                         :catalog catalog
                         :lifecycles lifecycles
                         :windows windows
                         :triggers triggers
                         ;:flow-conditions flow-conditions
                         :task-scheduler :onyx.task-scheduler/balanced}) $
          (onyx.api/await-job-completion peer-config (:job-id $)))
    ; Print the output
    (println "out -> " (onyx.plugin.core-async/take-segments! output-chan 50))
    (doseq [v-peer v-peers]
      (onyx.api/shutdown-peer v-peer))
    (onyx.api/shutdown-peer-group peer-group)
    (onyx.api/shutdown-env env)))

Ideally I should have gotten only ["punit naik"] in the output. But I am getting all the originals segments (["punit naik" "punit  naik"]) instead of the output from the :distinct-segments task.

I have based this "distinct" aggregation code on the "conj" aggregation code.
Is there anything wrong with the code? Why am I not getting the desired result?

Mike Drogalis

unread,
May 16, 2017, 10:13:53 PM5/16/17
to Punit Naik, Onyx
Your middle task (distinct-segments) is using an onyx/fn of clojure.core/identity. identity is passing your original segments through to the output. Your aggregation is using trigger/sync - it's calling the sync function whenever the trigger fires. I presume from your previous thread you'd want to use trigger/emit here instead.

--
You received this message because you are subscribed to the Google Groups "Onyx" group.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+unsubscribe@googlegroups.com.
To post to this group, send email to onyx...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/onyx-user/c8716441-9c83-4b9e-bcf7-23dba463f327%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Punit Naik

unread,
May 16, 2017, 10:29:50 PM5/16/17
to Mike Drogalis, Onyx

Hi Mike

I will fix these two issues and try again.

Punit Naik

unread,
May 17, 2017, 2:07:53 AM5/17/17
to Onyx, naik.p...@gmail.com
Hi Mike

So instead of identity function in the distinct-segments, I applied another function which returns an empty vector and I changed trigger/sync to trigger/emit.
But I am still getting an empty vector in my output channel.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+...@googlegroups.com.

Mike Drogalis

unread,
May 17, 2017, 11:11:47 AM5/17/17
to Punit Naik, Onyx
Onyx never transmits a vector type between to tasks, so I'm not sure how you ended up getting that on your output channel.

To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+unsubscribe@googlegroups.com.

To post to this group, send email to onyx...@googlegroups.com.

Punit Naik

unread,
May 17, 2017, 11:27:28 AM5/17/17
to Onyx, naik.p...@gmail.com
I used an empty vector because you told me to do so here -> https://groups.google.com/d/msg/onyx-user/FWUS7r6C4ww/7bA7cXnVAwAJ
But if that is not the way it is done, what is the other way?

Mike Drogalis

unread,
May 17, 2017, 11:29:26 AM5/17/17
to Punit Naik, Onyx
Right, what I'm saying is that if you return a vector from a function, Onyx unrolls the maps inside of it and transits them one-by-one downstream. Vectors don't turn up as the input to another function or output.

To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+unsubscribe@googlegroups.com.

To post to this group, send email to onyx...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages