{"fields":"time_guest":0,"time_guest_nice":0,"time_idle":25096.69,"time_iowait":75.16,"time_irq":0,"time_nice":2.02,"time_softirq":3.17,"time_steal":0.72,"time_system":77.78,"time_user":598.4,"usage_guest":0,"usage_guest_nice":0,"usage_idle":81.69014084507094,"usage_iowait":0.10060362173030406,"usage_irq":0,"usage_nice":0,"usage_softirq":0.1006036217303934,"usage_steal":0,"usage_system":0.8048289738431472,"usage_user":17.303822937628308},"name":"cpu","tags":{"cpu":"cpu-total","host":"host-10-66-51-143"},"timestamp":1472052550}
(load-plugins)
(defn my-json-decoder
[input]
(let [decoded-msg (cheshire.core/parse-string (String. input) true)]
(map riemann.common/event decoded-msg)))
(kafka/kafka-consumer {:topic "telegraf"
:zookeeper.connect "host123:2181"
:group.id "riemann.consumer"
:auto.commit.enable "false"
:auto.offset.reset "smallest"
:decoder my-json-decoder})
INFO [2016-08-25 08:13:50,719] riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-leader-finder-thread - kafka.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1472112830082] Added fetcher for partitions ArrayBuffer([[telegraf,0], initOffset 4782 to broker id:0,host:host-10-66-51-143.net-10-66-0-0.tt3.com,port:9092] )
INFO [2016-08-25 08:13:50,720] ConsumerFetcherThread-riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-0-0 - kafka.consumer.ConsumerFetcherThread - [ConsumerFetcherThread-riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-0-0], Starting
ERROR [2016-08-25 08:13:50,882] clojure-agent-send-off-pool-2 - riemann.plugin.kafka - interrupted consumption
java.lang.IllegalArgumentException: Key must be integer
at clojure.lang.APersistentVector.invoke(APersistentVector.java:284)
at riemann.common$event.invoke(common.clj:137)
at clojure.core$map$fn__4245.invoke(core.clj:2559)
at clojure.lang.LazySeq.sval(LazySeq.java:40)
at clojure.lang.LazySeq.seq(LazySeq.java:49)
at clojure.lang.RT.seq(RT.java:484)
at clojure.core$seq.invoke(core.clj:133)
at riemann.plugin.kafka$start_kafka_thread$fn__458.invoke(kafka.clj:53)
at clojure.core$binding_conveyor_fn$fn__4145.invoke(core.clj:1910)
at clojure.lang.AFn.call(AFn.java:18)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
INFO [2016-08-25 08:13:50,884] clojure-agent-send-off-pool-2 - kafka.consumer.ZookeeperConsumerConnector - [riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22], ZKConsumerConnector shutting down
INFO [2016-08-25 08:13:50,889] clojure-agent-send-off-pool-2 - kafka.consumer.ConsumerFetcherManager - [ConsumerFetcherManager-1472112830082] Stopping leader finder thread
INFO [2016-08-25 08:13:50,890] clojure-agent-send-off-pool-2 - kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - [riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-leader-finder-thread], Shutting down
INFO [2016-08-25 08:13:50,891] riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-leader-finder-thread - kafka.consumer.ConsumerFetcherManager$LeaderFinderThread - [riemann.consumer_host-10-66-51-158.net-10-66-0-0.tt3.com-1472112829940-59339e22-leader-finder-thread], Stopped
ERROR [2016-08-25 08:21:41,950] clojure-agent-send-off-pool-2 - riemann.plugin.kafka - could not decode msg
com.google.protobuf.InvalidProtocolBufferException: Protocol message tag had invalid wire type.
at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:99)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:498)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:461)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:579)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:280)
at com.google.protobuf.CodedInputStream.readGroup(CodedInputStream.java:240)
at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:488)
at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:193)
at com.aphyr.riemann.Proto$Msg.<init>(Proto.java:3889)
at com.aphyr.riemann.Proto$Msg.<init>(Proto.java:3847)
at com.aphyr.riemann.Proto$Msg$1.parsePartialFrom(Proto.java:3970)
at com.aphyr.riemann.Proto$Msg$1.parsePartialFrom(Proto.java:3965)
at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
at com.aphyr.riemann.Proto$Msg.parseFrom(Proto.java:4227)
at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at clojure.lang.Reflector.invokeMatchingMethod(Reflector.java:93)
at clojure.lang.Reflector.invokeStaticMethod(Reflector.java:207)
at riemann.plugin.kafka$protobuf_decoder.invoke(kafka.clj:18)
at riemann.plugin.kafka$safe_decode.invoke(kafka.clj:25)
at riemann.plugin.kafka$start_kafka_thread$fn__458.invoke(kafka.clj:53)
at clojure.core$binding_conveyor_fn$fn__4145.invoke(core.clj:1910)
at clojure.lang.AFn.call(AFn.java:18)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.IllegalArgumentException: Key must be integer
(load-plugins)
(defn my-json-decoder
[input]
(let [decoded-msg (cheshire.core/parse-string (String. input) true)]
(map riemann.common/event decoded-msg)))
(kafka/kafka-consumer {:topic "telegraf"
:zookeeper.connect "host123:2181"
:group.id "riemann.consumer"
:auto.commit.enable "false"
:auto.offset.reset "smallest"
:decoder my-json-decoder})
(defn my-json-decoder
[input]
(let [decoded-msg (cheshire.core/parse-string (String. input) true)]
(map riemann.common/event [decoded-msg])))
.... so I'm guessing you're running some older version.