local test (no env) + kafka plugin

57 views
Skip to first unread message

art...@appsflyer.com

unread,
Dec 2, 2015, 11:28:45 PM12/2/15
to Onyx
Hello.

I'm trying to evolve from simple sliding window test with onyx.core plugin input to onyx.kafka plugin input instead.

Here's the example:

I compile it into uberjar and run with 'java -jar ...'

I'm using local zookeeper for peers sync and remote zookeeper to read from kafka (assuming there are messages in its topic, since I can see them using kafka console tool).

I've removed the environment creation as I'm planning to use the code on more then one machine later.

The test worked with onyx.core plugin (as appears in onyx.examples for sliding window test), but since I've changed the :in task to work with kafka - nothing happens - the output is empty.
Am I missing something ? I've tried to re-read onyx.kafka explanations, but it seems I do everything as written....


Thanks.

Lucas Bradstreet

unread,
Dec 3, 2015, 1:00:22 AM12/3/15
to art...@appsflyer.com, Onyx
Hi,

The main problem I see is a combination of your deserializer-fn
https://gist.github.com/anonymous/20a168573b11d83c60c3#file-gistfile1-txt-L26,
and the use of take-segments! to read from the output medium.

take-segments! is a testing convenience function that tries to take
from the channel until it reads a :done, which is only useful if
you're trying out batch workloads, where you have a :done on your
input source.

Your deserializer fn needs to convert bytes read from your kafka topic
into clojure data. You may have changed this for testing purposes,
however the problem is that it will then never read a :done, and thus
it will never exit.

If you are using a topic from elsewhere which will not ever have a
:done on the topic, or this is a long running streaming topic, then
you will need to decide how to decide when you are done reading from
the data source (e.g. no new data within some time period), and should
then kill the job.

Lucas
> --
> You received this message because you are subscribed to the Google Groups
> "Onyx" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to onyx-user+...@googlegroups.com.
> To post to this group, send email to onyx...@googlegroups.com.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/onyx-user/761120aa-ce1c-47ff-b294-aeffdcd70640%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Artyom Shein

unread,
Dec 3, 2015, 2:08:10 AM12/3/15
to Lucas Bradstreet, Onyx

Ok, i get it. But what confused me is that even if i put a println inside the deserialize-fn, i never see it printed out to stdout, which means that the function isn't being called, and what's more important - no onyx consumer id is registered on zookeeper during the execution...

Lucas Bradstreet

unread,
Dec 3, 2015, 2:13:38 AM12/3/15
to Artyom Shein, Onyx
That is admittedly odd. Is any exception written to the file "onyx.log"?

(resent to group)

Artyom Shein

unread,
Dec 3, 2015, 10:52:09 AM12/3/15
to Lucas Bradstreet, Onyx
nothing in onyx.log
last line there is 'Starting ZooKeeper client connection. If Onyx hangs here it may indicate a difficulty connecting to ZooKeeper'

However, using kafka console consumer with same zk host and port as configured in the onyx.kafka plugin - I can fetch messages from kafka.

(resend to group)
--
--
Artyom Shein

Lucas Bradstreet

unread,
Dec 3, 2015, 10:55:36 AM12/3/15
to Artyom Shein, Onyx
This line is the problem then:
https://gist.github.com/anonymous/20a168573b11d83c60c3#file-gistfile1-txt-L11

You're pointing the Onyx ZooKeeper peer coordination at a local
address, but you haven't used start-env, and configured an env-config
that start's a local ZooKeeper. If you use the same address as the
kafka task, then you should be up and running.

Artyom Shein

unread,
Dec 3, 2015, 11:54:02 AM12/3/15
to Lucas Bradstreet, Onyx
Is this a problem even If I have the local zookeeper running using its './zkServer -start' command ?
Should I always create onyx environment when I use localhost zookeeper for sync between peers ?

--
--
Artyom Shein

Lucas Bradstreet

unread,
Dec 3, 2015, 12:16:39 PM12/3/15
to Artyom Shein, Onyx
Local zookeeper running via zkServer should be fine. No need for start-env in this case. Do you still have this issue with a local server running?

Artyom Shein

unread,
Dec 3, 2015, 12:36:20 PM12/3/15
to Lucas Bradstreet, Onyx
With local zookeeper without creating environment - yes.
When I t tried (as you suggested) to use remotely (on AWS) running zookeeper - I see in log that the virtual peers created now, which wasn't happening before.
But then I received this exception in onyx.log and this one in stdout.
The running code is here (I've removed the 'take-segment!' command and use blocking on finished job instead - knowing that it won't be finished without the sentinel, but I just want to see my printouts and registered consumer id on kafka...)
--
--
Artyom Shein

Artyom Shein

unread,
Dec 3, 2015, 12:54:55 PM12/3/15
to Lucas Bradstreet, Onyx
i see that the exception 'clojure.lang.PersistentArrayMap cannot be cast to java.lang.String' is being thrown in onyx/static/validation.clj:

(defn coerce-uuid [uuid]
  (if (instance? java.util.UUID uuid)
    uuid
  (java.util.UUID/fromString uuid)))

the id i'm using in peer-config passes this function, what other id can cause an issue there ?


--
--
Artyom Shein

Lucas Bradstreet

unread,
Dec 3, 2015, 1:02:18 PM12/3/15
to Artyom Shein, Onyx
Do you have the line number for that exception?

Artyom Shein

unread,
Dec 3, 2015, 1:04:50 PM12/3/15
to Lucas Bradstreet, Onyx
that's what I get when I wrap in try/catch:

WARNING: /dev/shm/aeron-artyom already exists.
INFO: Aeron directory /dev/shm/aeron-artyom exists
INFO: Aeron CnC file /dev/shm/aeron-artyom/cnc exists
INFO: Aeron toDriver consumer heartbeat is 108168 ms old
main #error {
 :cause clojure.lang.PersistentArrayMap cannot be cast to java.lang.String
 :via
 [{:type java.lang.ClassCastException
   :message clojure.lang.PersistentArrayMap cannot be cast to java.lang.String
   :at [onyx.static.validation$coerce_uuid invoke validation.clj 337]}]
 :trace
 [[onyx.static.validation$coerce_uuid invoke validation.clj 337]
  [onyx.api$await_job_completion invoke api.clj 235]
  [onyx.api$await_job_completion invoke api.clj 233]
  [af_sliding_window.core$_main doInvoke core.clj 115]
  [clojure.lang.RestFn invoke RestFn.java 397]
  [clojure.lang.AFn applyToHelper AFn.java 152]
  [clojure.lang.RestFn applyTo RestFn.java 132]
  [af_sliding_window.core main nil -1]]}

--
--
Artyom Shein

Lucas Bradstreet

unread,
Dec 3, 2015, 1:07:28 PM12/3/15
to Artyom Shein, Onyx
Ah, submit-job returns a map and await-job-completion expects a job-id
Try:
{:keys [job-id] :as job} (onyx.api/submit-job peer-config job)]

Artyom Shein

unread,
Dec 3, 2015, 1:30:06 PM12/3/15
to Lucas Bradstreet, Onyx
eliminated the casting exception, but the tasks still aren't being executed: https://gist.github.com/anonymous/0f80119da79272e5953f
--
--
Artyom Shein

Artyom Shein

unread,
Dec 3, 2015, 1:49:52 PM12/3/15
to Lucas Bradstreet, Onyx
I can't believe I didn't reqire [onyx.plugin.kafka] at all... so dumb :/
--
--
Artyom Shein
Reply all
Reply to author
Forward
0 new messages