Hello,
I am trying to use Onyx to process messages from a Kafka topic and compute some aggregates for analytics purposes.
As a first step I have created a project from the (latest) onyx template. From here I'm able to start Onyx locally and run the sample job. So far so easy.
Now, I've attempted to swap the input with a Kafka topic using the onyx-kafka plug-in. I've removed the intermediate tasks and added my own flow condition, and also modified `submit-job` to not stub and initialise the input.
When I submit the job, however, it hangs after the following 2 lines are logged:
Lukes-MacBook-Pro.local INFO [onyx.log.zookeeper] - Starting ZooKeeper client connection. If Onyx hangs here it may indicate a difficulty connecting to ZooKeeper.
Lukes-MacBook-Pro.local INFO [onyx.log.zookeeper] - Stopping ZooKeeper client connection
At first I thought this might be Zookeeper connection issues but I've confirmed that this is not the case. The sample job doesn't hang here and proceeds with:
Lukes-MacBook-Pro.local INFO [onyx.messaging.messenger-buffer] - Starting Messenger Buffer
and completes successfully, but my modified job never gets this far.
Here is my job workflow:
{:workflow [[:read-messages :write-output]],
:catalog
:onyx/plugin :onyx.plugin.kafka/read-messages,
:onyx/medium :kafka,
:kafka/offset-reset :smallest,
:kafka/force-reset? true,
:onyx/type :input,
:onyx/name :read-messages,
:kafka/topic "music-events",
:kafka/group-id "onyx-kafka-consumer",
:onyx/max-peers 8,
:onyx/min-peers 8,
:onyx/doc "Reads messages from the Kafka music-events topic",
:kafka/empty-read-back-off 500,
:kafka/fetch-size 307200,
:onyx/batch-size 100,
:kafka/deserializer-fn :scout.jobs.emerging-artists/parse-kafka-event,
:kafka/commit-interval 500,
:kafka/chan-capacity 1000}
{:onyx/plugin :onyx.plugin.core-async/output, :onyx/medium :core.async, :onyx/batch-timeout 50, :onyx/type :output, :onyx/name :write-output, :onyx/max-peers 1, :onyx/doc "Writes segments to a core.async channel", :onyx/batch-size 20}],
:lifecycles [{:lifecycle/task :read-messages, :lifecycle/calls :onyx.plugin.kafka/read-messages-calls}],
:flow-conditions [{:flow/from :read-messages, :flow/to [:write-output], :flow/predicate :scout.jobs.emerging-artists/valid?, :flow/short-circuit? true}],
:task-scheduler :onyx.task-scheduler/balanced}
And the modified submit-job fn:
(defn submit-job
[dev-env]
(let [dev-cfg (-> "dev-peer-config.edn" resource slurp read-string)
peer-config (assoc dev-cfg :onyx/id (:onyx-id dev-env))
;; Turn :read-lines and :write-lines into core.async I/O channels
stubs [:write-output]
;; Stubs the catalog entries for core.async I/O
dev-catalog (u/in-memory-catalog emerging-artists-catalog stubs)
;; Stubs the lifecycles for core.async I/O
dev-lifecycles (u/in-memory-lifecycles emerging-artists-lifecycles dev-catalog stubs)]
;; Automatically pipes the data structure into the channel, attaching :done at the end
;; (u/bind-inputs! dev-lifecycles {:read-lines dev-inputs/lines})
(let [job {:workflow emerging-artists-workflow
:catalog dev-catalog
:lifecycles dev-lifecycles
:flow-conditions emerging-artists-flow-conditions
:task-scheduler :onyx.task-scheduler/balanced}]
(onyx.api/submit-job peer-config job)
;; Automatically grab output from the stubbed core.async channels,
;; returning a vector of the results with data structures representing
;; the output.
(u/collect-outputs! dev-lifecycles [:write-output]))))
I've tried to only change the submit-job implementation where necessary, but still failing to see what's missing here.
As you can probably tell I'm very new to Onyx. Any ideas on what the problem might be would be greatly appreciated :)
Thanks in advance,
Luke