Problem converting JSON into Riemann events

120 views
Skip to first unread message

JS

unread,
Aug 25, 2016, 4:25:12 AM8/25/16
to Riemann Users
Hi all,

I am exploring the possibilites of Riemann and I would like to have the ability to consume Telegraf (https://influxdata.com/time-series-platform/telegraf/) metrics from Kafka.
I have been able to build the Riemann Kafka (https://github.com/pyr/riemann-kafka) plugin and load into Riemann.
Riemann is capable of reading the JSON events from Kafka.

The JSON events look like:

{"fields":"time_guest":0,"time_guest_nice":0,"time_idle":25096.69,"time_iowait":75.16,"time_irq":0,"time_nice":2.02,"time_softirq":3.17,"time_steal":0.72,"time_system":77.78,"time_user":598.4,"usage_guest":0,"usage_guest_nice":0,"usage_idle":81.69014084507094,"usage_iowait":0.10060362173030406,"usage_irq":0,"usage_nice":0,"usage_softirq":0.1006036217303934,"usage_steal":0,"usage_system":0.8048289738431472,"usage_user":17.303822937628308},"name":"cpu","tags":{"cpu":"cpu-total","host":"host-10-66-51-143"},"timestamp":1472052550}


My Riemann configuration looks like:

(load-plugins)


(defn my-json-decoder
 
[input]
 
(let [decoded-msg (cheshire.core/parse-string (String. input) true)]
   
(map riemann.common/event decoded-msg)))


(kafka/kafka-consumer {:topic "telegraf"
                       
:zookeeper.connect "host123:2181"
                       
:group.id "riemann.consumer"
                       
:auto.commit.enable "false"
                       
:auto.offset.reset "smallest"
                       
:decoder my-json-decoder})



Starting Riemann in the foreground:

...snip...

INFO [2016-08-25 08:13:50,719] riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-leader-finder-thread - kafka.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1472112830082] Added fetcher for partitions ArrayBuffer([[telegraf,0], initOffset 4782 to broker id:0,host:host-10-66-51-143.net-10-66-0-0.tt3.com,port:9092] )
INFO [2016-08-25 08:13:50,720] ConsumerFetcherThread-riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-0-0 - kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-0-0], Starting 
ERROR [2016-08-25 08:13:50,882] clojure-agent-send-off-pool-2 - riemann.plugin.kafka - interrupted consumption
java.lang.IllegalArgumentException: Key must be integer
at clojure.lang.APersistentVector.invoke(APersistentVector.java:284)
at riemann.common$event.invoke(common.clj:137)
at clojure.core$map$fn__4245.invoke(core.clj:2559)
at clojure.lang.LazySeq.sval(LazySeq.java:40)
at clojure.lang.LazySeq.seq(LazySeq.java:49)
at clojure.lang.RT.seq(RT.java:484)
at clojure.core$seq.invoke(core.clj:133)
at riemann.plugin.kafka$start_kafka_thread$fn__458.invoke(kafka.clj:53)
at clojure.core$binding_conveyor_fn$fn__4145.invoke(core.clj:1910)
at clojure.lang.AFn.call(AFn.java:18)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
INFO [2016-08-25 08:13:50,884] clojure-agent-send-off-pool-2 - kafka.consumer.ZookeeperConsumerConnector - [riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22], ZKConsumerConnector shutting down
INFO [2016-08-25 08:13:50,889] clojure-agent-send-off-pool-2 - kafka.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1472112830082] Stopping leader finder thread
INFO [2016-08-25 08:13:50,890] clojure-agent-send-off-pool-2 - kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - [riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-leader-finder-thread], Shutting down
INFO [2016-08-25 08:13:50,891] riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-leader-finder-thread - kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - [riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-leader-finder-thread], Stopped 
 
...snip...

When starting Riemann without the "decoder" then it continuous working but returns following message for each consumed message:

ERROR [2016-08-25 08:21:41,950] clojure-agent-send-off-pool-2 - riemann.plugin.kafka - could not decode msg
com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.
at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:99)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:498)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:461)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:579)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:280)
at com.google.protobuf.CodedInputStream.readGroup(CodedInputStream.java:240)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:488)
at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:193)
at com.aphyr.riemann.Proto$Msg.<init>(Proto.java:3889)
at com.aphyr.riemann.Proto$Msg.<init>(Proto.java:3847)
at com.aphyr.riemann.Proto$Msg$1.parsePartialFrom(Proto.java:3970)
at com.aphyr.riemann.Proto$Msg$1.parsePartialFrom(Proto.java:3965)
at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at com.aphyr.riemann.Proto$Msg.parseFrom(Proto.java:4227)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeStaticMethod(Reflector.java:207)
at riemann.plugin.kafka$protobuf_decoder.invoke(kafka.clj:18)
at riemann.plugin.kafka$safe_decode.invoke(kafka.clj:25)
at riemann.plugin.kafka$start_kafka_thread$fn__458.invoke(kafka.clj:53)
at clojure.core$binding_conveyor_fn$fn__4145.invoke(core.clj:1910)
at clojure.lang.AFn.call(AFn.java:18)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

But I would expect that because the consumed data aren't Riemann events.


Why is the JSON decoding not working in the above example?
Where is the

java.lang.IllegalArgumentException: Key must be integer

error coming from?

Thanks

JS

JS

unread,
Aug 25, 2016, 10:15:45 AM8/25/16
to Riemann Users
(load-plugins)


(defn my-json-decoder
 
[input]
 
(let [decoded-msg (cheshire.core/parse-string (String. input) true)]
   
(map riemann.common/event decoded-msg)))


(kafka/kafka-consumer {:topic "telegraf"
                       
:zookeeper.connect "host123:2181"
                       
:group.id "riemann.consumer"
                       
:auto.commit.enable "false"
                       
:auto.offset.reset "smallest"
                       
:decoder my-json-decoder})




It seems changing the decoder like this solves the issue:

 (defn my-json-decoder
 
[input]
 
(let [decoded-msg (cheshire.core/parse-string (String. input) true)]

   
(map riemann.common/event [decoded-msg])))

The above stacktrace and crash shouldn't happen of course ...

Aphyr

unread,
Aug 25, 2016, 11:45:40 AM8/25/16
to rieman...@googlegroups.com
On 08/25/2016 03:25 AM, JS wrote:
> (defn my-json-decoder
> [input]
> (let [decoded-msg (cheshire.core/parse-string(String.input)true)]
> (map riemann.common/eventdecoded-msg)))
>
> java.lang.IllegalArgumentException: Key must be integer
> at clojure.lang.APersistentVector.invoke(APersistentVector.java:284)
> at riemann.common$event.invoke(common.clj:137)
> at clojure.core$map$fn__4245.invoke(core.clj:2559)
> at clojure.lang.LazySeq.sval(LazySeq.java:40)
> at clojure.lang.LazySeq.seq(LazySeq.java:49)
> at clojure.lang.RT.seq(RT.java:484)
> at clojure.core$seq.invoke(core.clj:133)
> at riemann.plugin.kafka$start_kafka_thread$fn__458.invoke(kafka.clj:53)

> Why is the JSON decoding not working in the above example?
> Where is the java.lang.IllegalArgumentException:Keymust be integer
> error coming from?

The stacktrace says it's coming from the call to riemann.common/event, on
common.clj line 137. In Riemann 2.11, line 137 is inside `ensure-event-time`,
not `event`, so I'm guessing you're running some older version. I'd imagine it's
likely

https://github.com/riemann/riemann/blob/master/src/riemann/common.clj#L142

which tries to fetch :time from the opts map you passed into `event`, but
instead of being a map, it got a vector (hence, APersistentVector.invoke)
instead. Vectors don't have keyword keys like :time--they use integer keys like
2, so that's why it's exploding.

tl;dr elements of `decoded-msg` appear to be vectors, not maps like you thought.

--Kyle

JS

unread,
Aug 26, 2016, 3:41:10 AM8/26/16
to Riemann Users
.... so I'm guessing you're running some older version. 

Yeah true, I'm obliged to use Riemann-0.2.9 because I can't get Riemann-kafka working, I have opened an issue for that: https://github.com/pyr/riemann-kafka/issues/13 
Reply all
Reply to author
Forward
0 new messages