(ns aggregation.core (:require [clojure.core.async :refer [chan >!! <!! close!]] [onyx.extensions :as extensions] [onyx.plugin.core-async :refer [take-segments!]] [onyx.api]))
(defn distinct-aggregation-fn-init [window] #{})
(defn distinct-aggregation-apply-log [window state v] ;; Log command is not needed for single transition type (clojure.core/conj state v))
(defn distinct-aggregation-fn [window state segment] ;; Log command is not needed for single transition type segment)
(defn distinct-super-aggregation [window state-1 state-2] (into state-1 state-2))
(def ^:export distinct {:aggregation/init distinct-aggregation-fn-init :aggregation/create-state-update distinct-aggregation-fn :aggregation/apply-state-update distinct-aggregation-apply-log :aggregation/super-aggregation-fn distinct-super-aggregation}) (def workflow [[:in :distinct-segments] [:distinct-segments :out]])
(def capacity 1000)
(def input-chan (chan capacity))(def input-buffer (atom {}))
(def output-chan (chan capacity))
(def batch-size 10)
(def catalog ; Taking in the input [{:onyx/name :in :onyx/plugin :onyx.plugin.core-async/input :onyx/type :input :onyx/medium :core.async :onyx/batch-size batch-size :onyx/max-peers 1 :onyx/doc "Reads segments from a core.async channel"} ; Applying overall distinct i.e. on the entire dataset {:onyx/name :distinct-segments :onyx/fn :clojure.core/identity :onyx/type :function :onyx/batch-size 1000} ; Output the data {:onyx/name :out :onyx/plugin :onyx.plugin.core-async/output :onyx/type :output :onyx/medium :core.async :onyx/max-peers 1 :onyx/batch-size batch-size :onyx/doc "Writes segments to a core.async channel"}])
(def windows [{:window/id :distinct-segments-window :window/task :distinct-segments :window/type :global :window/aggregation ::distinct}])
(def triggers [{:trigger/window-id :distinct-segments-window :trigger/id :distinct-segments-trigger :trigger/refinement :onyx.refinements/accumulating :trigger/on :onyx.triggers/segment :trigger/threshold [1 :element] :trigger/sync ::dump-window!}])
(defn dump-window! [event window trigger {:keys [lower-bound upper-bound] :as window-data} state] (do (println "window-id -> " (:window/id window)) (println "current state -> " state) state))
(def input-segments ["punit naik" "punit naik"])
(doseq [segment input-segments] (>!! input-chan segment))
(close! input-chan)
(def id (java.util.UUID/randomUUID)) (def always-true (constantly true))
(def env-config {:zookeeper/address "127.0.0.1:2188" :zookeeper/server? true :zookeeper.server/port 2188 :onyx/tenancy-id id})
(def peer-config {:zookeeper/address "127.0.0.1:2188" :onyx/tenancy-id id :onyx.peer/job-scheduler :onyx.job-scheduler/balanced :onyx.messaging/impl :aeron :onyx.messaging/peer-port 40200 :onyx.messaging/bind-addr "localhost"})
(def env (onyx.api/start-env env-config))
(def peer-group (onyx.api/start-peer-group peer-config))
(def n-peers (count (set (mapcat identity workflow))))
(def v-peers (onyx.api/start-peers n-peers peer-group))
(defn inject-in-ch [event lifecycle] {:core.async/buffer input-buffer :core.async/chan input-chan})
(defn inject-out-ch [event lifecycle] {:core.async/chan output-chan})
(def in-calls {:lifecycle/before-task-start inject-in-ch})
(def out-calls {:lifecycle/before-task-start inject-out-ch})
(def lifecycles [{:lifecycle/task :in :lifecycle/calls :aggregation.core/in-calls} {:lifecycle/task :in :lifecycle/calls :onyx.plugin.core-async/reader-calls} {:lifecycle/task :out :lifecycle/calls :aggregation.core/out-calls} {:lifecycle/task :out :lifecycle/calls :onyx.plugin.core-async/writer-calls}])
(defn -main [& args] (do (as-> (onyx.api/submit-job peer-config {:workflow workflow :catalog catalog :lifecycles lifecycles :windows windows :triggers triggers ;:flow-conditions flow-conditions :task-scheduler :onyx.task-scheduler/balanced}) $ (onyx.api/await-job-completion peer-config (:job-id $))) ; Print the output
(println "out -> " (onyx.plugin.core-async/take-segments! output-chan 50)) (doseq [v-peer v-peers] (onyx.api/shutdown-peer v-peer)) (onyx.api/shutdown-peer-group peer-group) (onyx.api/shutdown-env env)))--
You received this message because you are subscribed to the Google Groups "Onyx" group.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+unsubscribe@googlegroups.com.
To post to this group, send email to onyx...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/onyx-user/c8716441-9c83-4b9e-bcf7-23dba463f327%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Hi Mike
I will fix these two issues and try again.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+...@googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+unsubscribe@googlegroups.com.
To post to this group, send email to onyx...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/onyx-user/6ea069e6-d967-41c9-9e7f-70876a1ff812%40googlegroups.com.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+unsubscribe@googlegroups.com.
To post to this group, send email to onyx...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/onyx-user/e8fb12e4-cd48-428e-86d1-3d21d5ecaa5f%40googlegroups.com.