Auto-reload the Kafka Consumer

93 views
Skip to first unread message

Vítor Brandão

unread,
Jan 25, 2021, 11:40:36 AM1/25/21
to Riemann Users
Hello all.

Note: I've originally published this query as a GitHub Issue - https://github.com/riemann/riemann/issues/991 -  however, this may be a better medium for my question. I'll link the GitHub issue to this thread.

I'm running Riemann 0.3.6, Dockerized, in an AWS ECS task, producing and consuming events from Kafka. Occasionally, for a reason yet to be determined, I see these messages in the logs:

ERROR [2021-01-21 20:58:34,560] clojure-agent-send-off-pool-3 - riemann.kafka - Interrupted consumption
java.net.SocketException: Broken pipe (Write failed)
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:254)
at riemann.graphite.GraphiteTCPClient.send_lines(graphite.clj:41)
at riemann.graphite$graphite$fn__10762.invoke(graphite.clj:174)
at riemann.core$stream_BANG_$fn__10000.invoke(core.clj:20)
at riemann.core$stream_BANG_.invokeStatic(core.clj:19)
at riemann.core$stream_BANG_.invoke(core.clj:15)
at riemann.kafka$start_kafka_thread$fn__11398.invoke(kafka.clj:86)
at clojure.core$binding_conveyor_fn$fn__5476.invoke(core.clj:2022)
at clojure.lang.AFn.call(AFn.java:18)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)

DEBUG [2021-01-21 20:58:34,570] kafka-coordinator-heartbeat-thread | metrics - org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=riemann-consumer-2-252, groupId=metrics] Heartbeat thread has closed

DEBUG [2021-01-21 20:58:34,575] clojure-agent-send-off-pool-3 - org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
... (more similar "Removed sensor with name [sensor]" messages)

DEBUG [2021-01-21 20:58:34,585] clojure-agent-send-off-pool-3 - org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=riemann-consumer-2-252, groupId=metrics] Kafka consumer has been closed

WARN [2021-01-22 22:15:06,631] Thread-4 - riemann.core - instrumentation service caught
java.net.SocketException: Connection reset
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:254)
at riemann.graphite.GraphiteTCPClient.send_lines(graphite.clj:41)
at riemann.graphite$graphite$fn__10762.invoke(graphite.clj:174)
at riemann.core$stream_BANG_$fn__10000.invoke(core.clj:20)
at riemann.core$stream_BANG_.invokeStatic(core.clj:19)
at riemann.core$stream_BANG_.invoke(core.clj:15)
at riemann.core$instrumentation_service$measure__10009.invoke(core.clj:59)
at riemann.service.ThreadService$thread_service_runner__6715$fn__6716.invoke(service.clj:101)
at riemann.service.ThreadService$thread_service_runner__6715.invoke(service.clj:100)
at clojure.lang.AFn.run(AFn.java:22)
at java.base/java.lang.Thread.run(Thread.java:844)
```

At this point, the Kafka consumer has exited and no metrics are being pulled from Kafka. However, other agents such as graphite and the Riemann task itself happily continue executing. As the ECS task is healthy I receive no alerts nor is the task automatically replaced by AWS.

Is there a mechanism to force Riemann to stop if one or all of the Kafka threads exits? Or, alternatively, can the Kafka consumer thread be re-spawned?

Many thanks,
Vitor

Sanel Zukan

unread,
Feb 23, 2021, 5:50:03 PM2/23/21
to Vítor Brandão, Riemann Users
Hi,

Does it make sense to wrap it inside async-queue! [1]? From kafka.clj
code, it will close connection after exception is thrown.

Also, check exception-stream [2] and *exception-stream* dynamic
variable, explained in docstring. There you can send alert and maybe
call riemann.core/stop!.

[1] http://riemann.io/api/riemann.config.html#var-async-queue.21
[2] http://riemann.io/api/riemann.streams.html#var-exception-stream

Best,
Sanel
> --
> You received this message because you are subscribed to the Google Groups "Riemann Users" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to riemann-user...@googlegroups.com.
> To view this discussion on the web visit https://groups.google.com/d/msgid/riemann-users/d2059592-6a7f-40e5-a197-f95dd4631f6cn%40googlegroups.com.

Vítor Brandão

unread,
Feb 23, 2021, 5:53:27 PM2/23/21
to Riemann Users
Hi Sanel,

thanks so much for your suggestions. I'll explore these albeit my Clojure knowledge is very limited. I'll probably come back with more questions :)

Kind regards,
Vitor
Reply all
Reply to author
Forward
0 new messages