Hello all.
I'm running Riemann 0.3.6, Dockerized, in an AWS ECS task, producing and consuming events from Kafka. Occasionally, for a reason yet to be determined, I see these messages in the logs:
ERROR [2021-01-21 20:58:34,560] clojure-agent-send-off-pool-3 - riemann.kafka - Interrupted consumption
java.net.SocketException: Broken pipe (Write failed)
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:254)
at riemann.graphite.GraphiteTCPClient.send_lines(graphite.clj:41)
at riemann.graphite$graphite$fn__10762.invoke(graphite.clj:174)
at riemann.core$stream_BANG_$fn__10000.invoke(core.clj:20)
at riemann.core$stream_BANG_.invokeStatic(core.clj:19)
at riemann.core$stream_BANG_.invoke(core.clj:15)
at riemann.kafka$start_kafka_thread$fn__11398.invoke(kafka.clj:86)
at clojure.core$binding_conveyor_fn$fn__5476.invoke(core.clj:2022)
at clojure.lang.AFn.call(AFn.java:18)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
DEBUG [2021-01-21 20:58:34,570] kafka-coordinator-heartbeat-thread | metrics - org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=riemann-consumer-2-252, groupId=metrics] Heartbeat thread has closed
DEBUG [2021-01-21 20:58:34,575] clojure-agent-send-off-pool-3 - org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:
... (more similar "Removed sensor with name [sensor]" messages)
DEBUG [2021-01-21 20:58:34,585] clojure-agent-send-off-pool-3 - org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=riemann-consumer-2-252, groupId=metrics] Kafka consumer has been closed
WARN [2021-01-22 22:15:06,631] Thread-4 - riemann.core - instrumentation service caught
java.net.SocketException: Connection reset
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:115)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
at java.base/sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:316)
at java.base/sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:153)
at java.base/java.io.OutputStreamWriter.flush(OutputStreamWriter.java:254)
at riemann.graphite.GraphiteTCPClient.send_lines(graphite.clj:41)
at riemann.graphite$graphite$fn__10762.invoke(graphite.clj:174)
at riemann.core$stream_BANG_$fn__10000.invoke(core.clj:20)
at riemann.core$stream_BANG_.invokeStatic(core.clj:19)
at riemann.core$stream_BANG_.invoke(core.clj:15)
at riemann.core$instrumentation_service$measure__10009.invoke(core.clj:59)
at riemann.service.ThreadService$thread_service_runner__6715$fn__6716.invoke(service.clj:101)
at riemann.service.ThreadService$thread_service_runner__6715.invoke(service.clj:100)
at clojure.lang.AFn.run(AFn.java:22)
at java.base/java.lang.Thread.run(Thread.java:844)
```
At this point, the Kafka consumer has exited and no metrics are being pulled from Kafka. However, other agents such as graphite and the Riemann task itself happily continue executing. As the ECS task is healthy I receive no alerts nor is the task automatically replaced by AWS.
Is there a mechanism to force Riemann to stop if one or all of the Kafka threads exits? Or, alternatively, can the Kafka consumer thread be re-spawned?