Producer retries configuration relationship with exception received in 0.8.2+ producer API

1,301 views
Skip to first unread message

Ahmad Alkilani

unread,
Dec 15, 2015, 6:57:29 PM12/15/15
to Confluent Platform
I am trying to understand the relationship between setting the producer to retry failures and any exceptions received in the callback of the new producer API.
Using Kafka 0.8.2.1
Specifically, using the config defined by ProducerConfig.RETRIES_CONFIG
We're running Kafka 0.8.2.1

The code mimics the API documentation. Simple async producer with a callback

producer.send(record, new Callback {
override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {
if (e != null) {
e.printStackTrace()
}
println(s"SENT: $datum Part: ${recordMetadata.partition()} Offset : ${recordMetadata.offset()}")
}
})

The question is: What role does the producer send retries, if any, play in case an exception is received in the callback? Here's a sample exception that we are having problems with running 0.8.2.1 inside docker:

org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition

This is what the e.printStackTrace() prints and the producer simply moves on even though the retries and retry backoff are set high enough to allow for a resend.

What are the scenarios where the producer would indeed retry sending the message?
If this is indeed a scenario the producer auto retry logic does not cover, any recommendations for implementing that as part of the callback?

Thanks!


FYI: The problems we're having with Kafka 0.8.2.1 in docker goes away with Kafka 0.9. We think the problem is related to this Jira: https://issues.apache.org/jira/browse/KAFKA-1461
We have not had any luck getting Kafka 0.8.2.1 to behave well on broker restarts when run inside of docker. Have tried manually assigning partitions to other brokers before shutdown. Still results in loss of messages produced by the producer and 
the retries setting doesn't seem to be doing much. Always get exceptions in the callback and that particular message would be "lost".


Ewen Cheslack-Postava

unread,
Dec 15, 2015, 9:22:14 PM12/15/15
to Confluent Platform
Ahmed,

Retries are handled internally in the producer before the callback is invoked. The callback should only ever be invoked once the producer is completely done with the record, either because it was acknowledged or because it gave up (timed out and no further retries). So by the time you are printing the stack trace, it has already retried as many times as you requested and the last exception that ultimately caused it to give up was an UnknownTopicOrPartitionException.

The producer will retry sending as long as it has retries left and the exception is a RetriableException. UnknownTopicOrPartitionException is a RetriableException, so it should be retried as many times as the `retries` setting you use for the producer.

I'm not sure why running in Docker would affect any of this behavior. Perhaps turn logging up enough to see if you can see metadata being refreshed, which should happen if you restart brokers and leadership is moved to other brokers?

-Ewen

--
You received this message because you are subscribed to the Google Groups "Confluent Platform" group.
To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platf...@googlegroups.com.
To post to this group, send email to confluent...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/confluent-platform/991aa136-0e47-45c8-b447-277992e8d408%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Thanks,
Ewen

Emil Koutanov

unread,
Oct 1, 2018, 9:24:20 AM10/1/18
to Confluent Platform
Sorry to reply on a stale post, but I don't believe this is entirely correct. While the documentation would suggest that this ought to be the case, the implementation differs. In particular, see KIP-91 for a stab at fixing this (which isn't due until 2.1 at the time of writing). In summary, request.timeout.ms has an overriding effect on the message retry logic, such that records may be purged from the accumulator when the timeout elapses, in spite still having 1+ retry attempts remaining.

I can also reproduce Ahmed's problem (broker version 2.0.0). With retries=MAX_INT and request.timeout.ms=10000, the callback in send() is invoked after 10 seconds even though in theory there should still be plenty of retries available  (retry.backoff.ms=100). Clearly, there is a problem with the retry logic. Looking at ProducerBatch.maybeExpire(), the code looks suspicious; records can be expired just after request.timeout.ms in spite of the retry state returned by inRetry(). (Exactly as alluded to by the author of KIP-91.)

To unsubscribe from this group and stop receiving emails from it, send an email to confluent-platform+unsub...@googlegroups.com.



--
Thanks,
Ewen
Reply all
Reply to author
Forward
0 new messages