How to handle Connection error using kaka-python

2,343 views
Skip to first unread message

Kunal Gupta

unread,
Dec 2, 2015, 2:16:37 AM12/2/15
to kafka-clients
Hello all,

I there any way where i get noticed that when i am sending the messages to kafka server using kaka-python

 1) Whether zookeeper is down
 2) Whether Kafka server is down
 3) Whether Kafka Broker is down
 4) Partition is not available
 5) Topic is not available

I want my code to tell me if anyone is down then i get a message related to it not a a traceback return by python.

Please help me to overcome with this.

 

Brad Ruderman

unread,
Dec 21, 2015, 2:51:25 PM12/21/15
to kafka-clients
I am also wondering the same thing. More generically asking what are best practices around long running producers and consumers handling exceptions. My volume is not that high (10-20ish records per second), so I am using a SimpleProducer with blocking.

Dana Powers

unread,
Dec 21, 2015, 8:17:45 PM12/21/15
to Brad Ruderman, kafka-clients
Hi Brad + Kunal --

I haven't ever found a great set of documentation for all possible server exceptions and which ones specifically are retriable. Sometimes the java code has inline documentation, but sometimes not:

As for kafka-python, the async producer checks for a set of errors that can be retried here:
based on the list of errors from:

Similarly, KafkaConsumer retries FetchRequests on a handful of retriable-errors:


My general advice is to use one of the high-level classes (KafkaConsumer or SimpleProducer in async mode) rather than attempting to manage low-level exceptions. Most of the error handling should be addressed internally -- if not it's a bug; file a github issue and a fix is usually in the works fairly quickly.

With respect to general monitoring, kafka-python isn't intended as a monitoring tool. I think best practice there is to collect client logs centrally and alert from detected error messages.

-Dana


--
You received this message because you are subscribed to the Google Groups "kafka-clients" group.
To unsubscribe from this group and stop receiving emails from it, send an email to kafka-client...@googlegroups.com.
To post to this group, send email to kafka-...@googlegroups.com.
Visit this group at https://groups.google.com/group/kafka-clients.
To view this discussion on the web visit https://groups.google.com/d/msgid/kafka-clients/6ab44e9e-1875-4daf-ab86-fdca2b5000a9%40googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Brad Ruderman

unread,
Dec 23, 2015, 12:19:16 PM12/23/15
to Dana Powers, kafka-clients
Dana,

Thanks for this writeup, very helpful. I did migrate my implementation to the Async class.

self.client = KafkaClient(kwargs['kafka_uri'])
self.topic = kwargs['kafka_topic'].encode('utf8')
self.producer = SimpleProducer(self.client, async=True)

I noticed from a few other documents that during startup its best to call client.ensure_topic_exists(topic), to create the topic if it doesn't already exist. However do you recommend we explicitly call close()  on the client or .stop() on the producer to make sure all messages are sent and received by kafka before shutting down?

In addition I know you mentioned the async producer handles that list of errors that can be retried, but are there any best-practices around handling other types of exceptions that are perhaps recoverable but outside that list?

Thanks,
Brad

Dana Powers

unread,
Dec 23, 2015, 2:37:29 PM12/23/15
to Brad Ruderman, kafka-clients
<inline>

On Wed, Dec 23, 2015 at 9:18 AM, Brad Ruderman <br...@upcounsel.com> wrote:
Dana,

Thanks for this writeup, very helpful. I did migrate my implementation to the Async class.

self.client = KafkaClient(kwargs['kafka_uri'])
self.topic = kwargs['kafka_topic'].encode('utf8')
self.producer = SimpleProducer(self.client, async=True)

I noticed from a few other documents that during startup its best to call client.ensure_topic_exists(topic), to create the topic if it doesn't already exist.

The kafka-python async producer will handle the backoff/retry required during initial topic creation (LeaderNotAvailable), so client.ensure_topic_exists(topic) shouldn't be necessary in that mode.

 
However do you recommend we explicitly call close()  on the client or .stop() on the producer to make sure all messages are sent and received by kafka before shutting down?

producer.stop() should block until all messages are delivered.

 

In addition I know you mentioned the async producer handles that list of errors that can be retried, but are there any best-practices around handling other types of exceptions that are perhaps recoverable but outside that list?

I'm not aware of other errors that are recoverable w/o manual intervention -- typically either requiring configuration or application changes. The full list of error codes is:



That said, in production I am aware of some folks writing a custom async wrapper around the sync SimpleProducer client that logs unhandled exceptions and dumps undelivered messages to syslog or other local logging facility.

Hope this helps,

-Dana
Reply all
Reply to author
Forward
0 new messages