Modified job hangs when running locally

65 views
Skip to first unread message

Luke Snape

unread,
Jan 20, 2016, 7:02:51 AM1/20/16
to Onyx
Hello,

I am trying to use Onyx to process messages from a Kafka topic and compute some aggregates for analytics purposes.

As a first step I have created a project from the (latest) onyx template. From here I'm able to start Onyx locally and run the sample job. So far so easy.

Now, I've attempted to swap the input with a Kafka topic using the onyx-kafka plug-in. I've removed the intermediate tasks and added my own flow condition, and also modified `submit-job` to not stub and initialise the input.

When I submit the job, however, it hangs after the following 2 lines are logged:

Lukes-MacBook-Pro.local INFO [onyx.log.zookeeper] - Starting ZooKeeper client connection. If Onyx hangs here it may indicate a difficulty connecting to ZooKeeper.
Lukes-MacBook-Pro.local INFO [onyx.log.zookeeper] - Stopping ZooKeeper client connection

At first I thought this might be Zookeeper connection issues but I've confirmed that this is not the case. The sample job doesn't hang here and proceeds with:

Lukes-MacBook-Pro.local INFO [onyx.messaging.messenger-buffer] - Starting Messenger Buffer

and completes successfully, but my modified job never gets this far.

Here is my job workflow:

  {:workflow [[:read-messages :write-output]],
   :catalog
     :onyx/plugin :onyx.plugin.kafka/read-messages,
     :onyx/medium :kafka,
     :kafka/offset-reset :smallest,
     :kafka/force-reset? true,
     :onyx/type :input,
     :onyx/name :read-messages,
     :kafka/topic "music-events",
     :kafka/group-id "onyx-kafka-consumer",
     :onyx/max-peers 8,
     :onyx/min-peers 8,
     :onyx/doc "Reads messages from the Kafka music-events topic",
     :kafka/empty-read-back-off 500,
     :kafka/fetch-size 307200,
     :onyx/batch-size 100,
     :kafka/deserializer-fn :scout.jobs.emerging-artists/parse-kafka-event,
     :kafka/commit-interval 500,
     :kafka/chan-capacity 1000}
    {:onyx/plugin :onyx.plugin.core-async/output, :onyx/medium :core.async, :onyx/batch-timeout 50, :onyx/type :output, :onyx/name :write-output, :onyx/max-peers 1, :onyx/doc "Writes segments to a core.async channel", :onyx/batch-size 20}],
   :lifecycles [{:lifecycle/task :read-messages, :lifecycle/calls :onyx.plugin.kafka/read-messages-calls}],
   :flow-conditions [{:flow/from :read-messages, :flow/to [:write-output], :flow/predicate :scout.jobs.emerging-artists/valid?, :flow/short-circuit? true}],
   :task-scheduler :onyx.task-scheduler/balanced}

And the modified submit-job fn:

  (defn submit-job
    [dev-env]
    (let [dev-cfg (-> "dev-peer-config.edn" resource slurp read-string)
          peer-config (assoc dev-cfg :onyx/id (:onyx-id dev-env))
          ;; Turn :read-lines and :write-lines into core.async I/O channels
          stubs [:write-output]
          ;; Stubs the catalog entries for core.async I/O
          dev-catalog (u/in-memory-catalog emerging-artists-catalog stubs)
          ;; Stubs the lifecycles for core.async I/O
          dev-lifecycles (u/in-memory-lifecycles emerging-artists-lifecycles dev-catalog stubs)]
      ;; Automatically pipes the data structure into the channel, attaching :done at the end
      ;; (u/bind-inputs! dev-lifecycles {:read-lines dev-inputs/lines})
      (let [job {:workflow emerging-artists-workflow
                 :catalog dev-catalog
                 :lifecycles dev-lifecycles
                 :flow-conditions emerging-artists-flow-conditions
                 :task-scheduler :onyx.task-scheduler/balanced}]
        (onyx.api/submit-job peer-config job)
        ;; Automatically grab output from the stubbed core.async channels,
        ;; returning a vector of the results with data structures representing
        ;; the output.
        (u/collect-outputs! dev-lifecycles [:write-output]))))

I've tried to only change the submit-job implementation where necessary, but still failing to see what's missing here.

As you can probably tell I'm very new to Onyx. Any ideas on what the problem might be would be greatly appreciated :)

Thanks in advance,

Luke

Lucas Bradstreet

unread,
Jan 20, 2016, 7:07:12 AM1/20/16
to Luke Snape, Onyx
Hi Luke,

Thanks for all of the detail you've provided. Would you be willing to share your onyx.log with me?

I'm in the gitter channel and on the clojurians slack channel if you want to organise this there.

Thanks,

Lucas
--
You received this message because you are subscribed to the Google Groups "Onyx" group.
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+...@googlegroups.com.
To post to this group, send email to onyx...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/onyx-user/4a216c6e-ca72-4b11-87a6-6a0d97c0852b%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

lucas.br...@onyxplatform.org

unread,
Jan 20, 2016, 7:50:46 AM1/20/16
to Onyx, ldjs...@gmail.com
For any observers, the issue was that not enough peers had been started to meet the minumum to start the job. The number of peers required to start any job is the sum of onyx/min-peers for the workflow tasks (defaulting to 1 where onyx/min-peers is not set).
To unsubscribe from this group and stop receiving emails from it, send an email to onyx-user+unsubscribe@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages