Kafka stream application not consuming new messages after ProducerFencedException

6,529 views
Skip to first unread message

Felix D'Souza

unread,
Mar 27, 2018, 7:00:36 AM3/27/18
to Confluent Platform
Hi,

We have a kafka stream application (1.0.1) that consumes from a topic with three partitions. We have three brokers(1.0), replication factor set to 3 and min in sync replicas set to 2 and EOS is enabled.

The following exception trace contains the topology of the stream processor. At one point ProducerFencedException is thrown and then there is no more processing of new messages. But the stream processor is not down.

We use Transformer API with punctuator that is scheduled using wall clock time. When the ProducerFencedException is thrown , we usually see Punctuation happening just before the exception.  Restarting stream processor solves the issue. But it again runs into ProducerFencedException after a while.
Appreciate any help !

org.apache.kafka.common.errors.ProducerFencedException: task [0_0] Abort sending since producer got fenced with a previous record (key 223EF710-B061-412D-86A0-03C4593B0311::raabe1522056733906_1241733441::940033365 value [B@3dec19db timestamp 1522056735537) to topic demo-service0.5-events-processed-changelog, error message: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
Wrapped by: org.apache.kafka.streams.errors.TaskMigratedException: StreamsTask taskId: 0_0
ProcessorTopology:
KSTREAM-SOURCE-0000000000:
topics: [rewards]
children: [KSTREAM-MAP-0000000001]
KSTREAM-MAP-0000000001:
children: [KSTREAM-MAP-0000000002]
KSTREAM-MAP-0000000002:
children: [KSTREAM-TRANSFORM-0000000003]
KSTREAM-TRANSFORM-0000000003:
states: [events-processed]
children: [KSTREAM-MAP-0000000004]
KSTREAM-MAP-0000000004:
children: [KSTREAM-FILTER-0000000005, KSTREAM-FILTER-0000000010]
KSTREAM-FILTER-0000000005:
children: [KSTREAM-MAP-0000000006]
KSTREAM-MAP-0000000006:
children: [KSTREAM-MAP-0000000007]
KSTREAM-MAP-0000000007:
children: [KSTREAM-MAP-0000000008]
KSTREAM-MAP-0000000008:
children: [KSTREAM-SINK-0000000009]
KSTREAM-SINK-0000000009:
topic: rewards-failed
KSTREAM-FILTER-0000000010:
children: [KSTREAM-TRANSFORM-0000000011]
KSTREAM-TRANSFORM-0000000011:
states: [rewards-state]
children: [KSTREAM-FLATMAPVALUES-0000000012]
KSTREAM-FLATMAPVALUES-0000000012:
children: [KSTREAM-FILTER-0000000013, KSTREAM-FILTER-0000000017]
KSTREAM-FILTER-0000000013:
children: [KSTREAM-MAP-0000000014]
KSTREAM-MAP-0000000014:
children: [KSTREAM-MAP-0000000015]
KSTREAM-MAP-0000000015:
children: [KSTREAM-SINK-0000000016]
KSTREAM-SINK-0000000016:
topic: rewards-failed
KSTREAM-FILTER-0000000017:
children: [KSTREAM-MAP-0000000018]
KSTREAM-MAP-0000000018:
children: [KSTREAM-MAP-0000...


Some more failure logs

March 26th 2018, 11:32:16.294 demo-service error [Producer clientId=demo-service0.5-c34b672e-3d39-4273-b8d9-9c2a6a7a36e6-StreamThread-1-0_0-producer, transactionalId=demo-service0.5-0_0] Aborting producer batches due to fatal error
March 26th 2018, 11:32:16.294 demo-service error task [0_0] Error sending record (key 223EF710-B061-412D-86A0-03C4593B0311::raabe1522056733906_1241733441::940033365 value [0, 0, 0, 0, 0, 0, 0, 0] timestamp 1522056735537) to topic demo-service0.5-events-processed-changelog due to {}; No more records will be sent and no more offsets will be recorded for this task.
March 26th 2018, 11:32:16.300 demo-service warn stream-thread [demo-service0.5-c34b672e-3d39-4273-b8d9-9c2a6a7a36e6-StreamThread-1] Detected a task that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Trying to rejoin the consumer group now.
March 26th 2018, 14:02:16.014 demo-service error [Producer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-0_0-producer, transactionalId=demo-service0.5-0_0] Aborting producer batches due to fatal error
March 26th 2018, 14:02:16.014 demo-service error task [0_0] Error sending record (key 3DA28CFF-2699-44BA-B013-D0D7B226CFD0::doyle1522065733818::Darrell::Africa/Maputo::85 value [0, 0, 0, 1] timestamp 1522065735286) to topic demo-service0.5-rewards-state-changelog due to {}; No more records will be sent and no more offsets will be recorded for this task.
March 26th 2018, 14:02:16.023 demo-service warn stream-thread [demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1] Detected a task that got migrated to another thread. This implies that this thread missed a rebalance and dropped out of the consumer group. Trying to rejoin the consumer group now.




We enabled Trace logs when the stream was in stuck state. In the following logs you can see that the consumer is getting information about the latest offset and it is trying to consume the record with the latest offset. But the recordsSizeInBytes=0


    March 27th 2018, 08:55:03.322 demo-service trace stream-thread [demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1] Committing all active tasks [] and standby tasks [] since 200ms has elapsed (commit interval is 100ms)
March 27th 2018, 08:55:03.322 demo-service debug stream-thread [demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms
March 27th 2018, 08:55:03.322 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Skipping fetch for partition rewards-0 because there is an in-flight request to kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:03.342 demo-service debug stream-thread [demo-service0.5-fb75ef7d-ed24-4726-828a-012918ca65d8-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms
March 27th 2018, 08:55:03.342 demo-service trace stream-thread [demo-service0.5-fb75ef7d-ed24-4726-828a-012918ca65d8-StreamThread-1] Committing all active tasks [] and standby tasks [] since 200ms has elapsed (commit interval is 100ms)
March 27th 2018, 08:55:03.416 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Found least loaded node kafka03.nix.com:9092 (id: 2 rack: null)
March 27th 2018, 08:55:03.416 demo-service debug [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Sending metadata request (type=MetadataRequest, topics=<ALL>) to node kafka03.nix.com:9092 (id: 2 rack: null)
March 27th 2018, 08:55:03.416 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Sending METADATA {topics=null,allow_auto_topic_creation=true} with correlation id 240926 to node 2
March 27th 2018, 08:55:03.416 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Skipping fetch for partition rewards-0 because there is an in-flight request to kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:03.416 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Skipping fetch for partition rewards-0 because there is an in-flight request to kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:03.416 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Removing node kafka01.nix.com:9092 (id: 0 rack: null) from least loaded node selection: is-blacked-out: false, in-flight-requests: 0
March 27th 2018, 08:55:03.419 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Skipping fetch for partition rewards-0 because there is an in-flight request to kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:03.419 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Skipping fetch for partition rewards-0 because there is an in-flight request to kafka02.nix.com:9092 (id: 1 rack: null)

March 27th 2018, 08:55:03.422 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Skipping fetch for partition rewards-0 because there is an in-flight request to kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:03.497 demo-service trace stream-thread [demo-service0.5-6b4aeeb0-374b-4c83-af65-3f77fdb82076-StreamThread-1] Committing all active tasks [] and standby tasks [] since 201ms has elapsed (commit interval is 100ms)
March 27th 2018, 08:55:03.497 demo-service debug stream-thread [demo-service0.5-6b4aeeb0-374b-4c83-af65-3f77fdb82076-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms
March 27th 2018, 08:55:03.516 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Completed receive from node 1 for FETCH with correlation id 240925, received {throttle_time_ms=0,responses=[{topic=rewards,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=135789,last_stable_offset=135789,log_start_offset=111782,aborted_transactions=[]},record_set=[]}]}]}

March 27th 2018, 08:55:04.448 demo-service trace stream-thread [demo-service0.5-fb75ef7d-ed24-4726-828a-012918ca65d8-StreamThread-1] Committing all active tasks [] and standby tasks [] since 200ms has elapsed (commit interval is 100ms)
March 27th 2018, 08:55:04.449 demo-service debug stream-thread [demo-service0.5-fb75ef7d-ed24-4726-828a-012918ca65d8-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms
March 27th 2018, 08:55:04.519 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Completed receive from node 1 for FETCH with correlation id 240928, received {throttle_time_ms=0,responses=[{topic=rewards,partition_responses=[{partition_header={partition=0,error_code=0,high_watermark=135789,last_stable_offset=135789,log_start_offset=111782,aborted_transactions=[]},record_set=[]}]}]}
March 27th 2018, 08:55:04.519 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Updating high watermark for partition rewards-0 to 135789
March 27th 2018, 08:55:04.519 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Updating last stable offset for partition rewards-0 to 135789
March 27th 2018, 08:55:04.519 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Returning fetched records at offset 135789 for assigned partition rewards-0 and update position to 135789

March 27th 2018, 08:55:04.519 demo-service debug [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Added READ_COMMITTED fetch request for partition rewards-0 at offset 135789 to node kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:04.519 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Skipping fetch for partition rewards-0 because there is an in-flight request to kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:04.519 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Sending FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=1,topics=[{topic=rewards,partitions=[{partition=0,fetch_offset=135789,log_start_offset=-1,max_bytes=1048576}]}]} with correlation id 240931 to node 1
March 27th 2018, 08:55:04.519 demo-service debug [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Sending READ_COMMITTED fetch for partitions [rewards-0] to broker kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:04.519 demo-service debug [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Fetch READ_COMMITTED at offset 135789 for partition rewards-0 returned fetch data (error=NONE, highWaterMark=135789, lastStableOffset = 135789, logStartOffset = 111782, abortedTransactions = [], recordsSizeInBytes=0)
March 27th 2018, 08:55:04.519 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Preparing to read 0 bytes of data for partition rewards-0 with offset 135789
March 27th 2018, 08:55:04.520 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Skipping fetch for partition rewards-0 because there is an in-flight request to kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:04.526 demo-service trace [Consumer clientId=demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer, groupId=demo-service0.5] Skipping fetch for partition rewards-0 because there is an in-flight request to kafka02.nix.com:9092 (id: 1 rack: null)
March 27th 2018, 08:55:04.526 demo-service debug stream-thread [demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms
March 27th 2018, 08:55:04.526 demo-service trace stream-thread [demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1] Committing all active tasks [] and standby tasks [] since 200ms has elapsed (commit interval is 100ms)
March 27th 2018, 08:55:04.549 demo-service debug stream-thread [demo-service0.5-fb75ef7d-ed24-4726-828a-012918ca65d8-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms
March 27th 2018, 08:55:04.549 demo-service trace stream-thread [demo-service0.5-fb75ef7d-ed24-4726-828a-012918ca65d8-StreamThread-1] Committing all active tasks [] and standby tasks [] since 101ms has elapsed (commit interval is 100ms)
March 27th 2018, 08:55:04.600 demo-service trace stream-thread [demo-service0.5-6b4aeeb0-374b-4c83-af65-3f77fdb82076-StreamThread-1] Committing all active tasks [] and standby tasks [] since 200ms has elapsed (commit interval is 100ms)
March 27th 2018, 08:55:04.600 demo-service debug stream-thread [demo-service0.5-6b4aeeb0-374b-4c83-af65-3f77fdb82076-StreamThread-1] Committed all active tasks [] and standby tasks [] in 0ms

 And this the consumer group status:

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                                                
rewards                        0          131298          135789          4491       demo-service0.5-2ade2323-e3c8-4ef6-a437-e2123a04f483-StreamThread-1-consumer-ff32b9ad-a52e-45fb-b13d-906f9aac0e85
-                              -          -               -               -          demo-service0.5-6b4aeeb0-374b-4c83-af65-3f77fdb82076-StreamThread-1-consumer-6aecac59-b240-4ecb-a07f-f93ab01ae3d2
-                              -          -               -               -          demo-service0.5-fb75ef7d-ed24-4726-828a-012918ca65d8-StreamThread-1-consumer-5c949c54-8f1c-43d0-a037-288721a54c5f



Guozhang Wang

unread,
Mar 27, 2018, 6:01:34 PM3/27/18
to Confluent Platform
Hello Felix,

Your encountered issue seems related to some of the known bugs: https://issues.apache.org/jira/browse/KAFKA-6534https://issues.apache.org/jira/browse/KAFKA-6634

It is triggered if sometimes your processing takes too long for some records, and causing the heartbeat thread of the consumer to decide that the caller thread (i.e. the Kafka Streams thread) has dead and hence stop. I'd suggest two actions for you:

1) If you can build from the source code, try apply the patch of KAFKA-6534 and KAFKA-6634 to your Kafka Streams library and see if it helps.

2) Try reducing your config "max.poll.records" and increasing the "max.poll.interval.ms" values to try to reduce the likelihood that heartbeat thread will stop working.



Guozhang

Felix D'Souza

unread,
Apr 20, 2018, 6:30:55 AM4/20/18
to Confluent Platform
Hi Guozhang,

We upgraded to kafka streams 1.1.0 and also adjusted the max poll settings as advised by you. We still get the producer fenced exception.

After some investigation we found out that it happens in specific pattern

1. Punctuation is triggered, we delete some records from the State store (logging enabled) . A transaction is registered on the broker. txnState=Ongoing
2. There are no new events to be consumed (period of inactivity) and no new punctuation happening.
3. Broker aborts the transaction after about 60000 ms  (default transaction timeout duration for the stream) , txnState=Ongoing->txnState=PrepareAbort -> txnState=CompleteAbort
4. A new event comes in and hits producer fenced exception

Looks like the transaction handling for punctuation does not work properly if new events do not arrive before the transaction started by punctuation times out.

Thanks for the help!
Reply all
Reply to author
Forward
0 new messages