KafkaProducer Extensive integration issue

43 views
Skip to first unread message

jordane....@gmail.com

unread,
Feb 10, 2018, 1:51:11 AM2/10/18
to Extensive Testing Users
Hi, 

I try to use kafka-python libs within a  custom adapter (https://github.com/dpkp/kafka-python). 

In a first time, I care about producing message use case.

I encounter issue using the most advanced implemententation KafkaProducer that send asynchoroniously message to broker using callback and background thread. Call to send method only reach timeout after producer have correctly retrieve broker and topic metadata.

Future object that will get asynchronously produce response is instancied but does not retrieve data. At network level, no any produce request is sended.

Used producer code have been  successfully tested  out of Extensive on the same server.

Kafka logger provide following trace:

Starting the Kafka producer
Added sensor with name connections-closed
Added sensor with name connections-created
Added sensor with name select-time
Added sensor with name io-time
Bootstrapping cluster metadata from [('127.0.0.1', 9092, 2)]
Attempting to bootstrap via node at 127.0.0.1:9092
Added sensor with name bytes-sent-received
Added sensor with name bytes-sent
Added sensor with name bytes-received
Added sensor with name request-latency
Added sensor with name node-bootstrap.bytes-sent
Added sensor with name node-bootstrap.bytes-received
Added sensor with name node-bootstrap.latency
<BrokerConnection node_id=bootstrap host=127.0.0.1/127.0.0.1 port=9092>: creating new socket
<BrokerConnection node_id=bootstrap host=127.0.0.1/127.0.0.1 port=9092>: setting socket option (6, 1, 1)
<BrokerConnection node_id=bootstrap host=127.0.0.1/127.0.0.1 port=9092>: connecting to 127.0.0.1:9092
<BrokerConnection node_id=bootstrap host=127.0.0.1/127.0.0.1 port=9092>: established TCP connection
<BrokerConnection node_id=bootstrap host=127.0.0.1/127.0.0.1 port=9092>: Connection complete.
Node bootstrap connected
<BrokerConnection node_id=bootstrap host=127.0.0.1/127.0.0.1 port=9092> Request 1: MetadataRequest_v0(topics=[])
<BrokerConnection node_id=bootstrap host=127.0.0.1/127.0.0.1 port=9092> Response 1: MetadataResponse_v0(brokers=[(node_id=0, host=u'127.0.0.1', port=9092)], topics=[(error_code=0, topic=u'OSS.LRR.v1', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic=u'test', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic=u'test1', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic=u'test3', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic=u'__consumer_offsets', partitions=[(error_code=0, partition=23, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=41, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=32, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=8, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=17, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=44, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=35, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=26, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=11, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=29, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=38, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=47, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=20, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=2, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=5, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=14, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=46, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=49, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=40, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=4, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=13, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=22, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=31, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=16, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=7, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=43, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=25, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=34, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=10, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=37, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=1, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=19, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=28, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=45, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=36, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=27, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=9, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=18, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=21, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=48, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=12, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=3, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=30, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=39, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=15, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=42, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=24, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=33, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=6, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic=u'test4', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic=u'test2', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])])])
Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 7, groups: 0)
Bootstrap succeeded: found 1 brokers and 7 topics.
<BrokerConnection node_id=bootstrap host=127.0.0.1/127.0.0.1 port=9092>: Closing connection.
<BrokerConnection node_id=bootstrap host=127.0.0.1/127.0.0.1 port=9092>: reconnect backoff 0.0450981064183 after 1 failures
Initiating connection to node 0 at 127.0.0.1:9092
Added sensor with name node-0.bytes-sent
Added sensor with name node-0.bytes-received
Added sensor with name node-0.latency
<BrokerConnection node_id=0 host=127.0.0.1/127.0.0.1 port=9092>: creating new socket
<BrokerConnection node_id=0 host=127.0.0.1/127.0.0.1 port=9092>: setting socket option (6, 1, 1)
<BrokerConnection node_id=0 host=127.0.0.1/127.0.0.1 port=9092>: connecting to 127.0.0.1:9092
Node 0 connected
Broker version identifed as 0.10.2
Set configuration api_version=(0, 10, 2) to skip auto check_version requests on startup
Added sensor with name bufferpool-wait-time
Added sensor with name batch-size
Added sensor with name compression-rate
Added sensor with name queue-time
Added sensor with name produce-throttle-time
Added sensor with name records-per-request
Added sensor with name bytes
Added sensor with name record-retries
Added sensor with name errors
Added sensor with name record-size-max
Starting Kafka producer I/O thread.
Kafka producer started
Sending (key='test' value='barddddddddddddddddddd') to TopicPartition(topic='OSS.LRR.v1', partition=0)
Allocating a new 16384 byte message buffer for TopicPartition(topic='OSS.LRR.v1', partition=0)
Waking up the sender since TopicPartition(topic='OSS.LRR.v1', partition=0) is either full or getting a new batch
Closing the Kafka producer with 0 secs timeout.
Proceeding to force close the producer since pending requests could not be completed within timeout 0.
Beginning shutdown of Kafka producer I/O thread, sending remaining records.
Produced messages to topic-partition TopicPartition(topic='OSS.LRR.v1', partition=0) with base offset None and error IllegalStateError: Producer is closed forcefully..
<BrokerConnection node_id=0 host=127.0.0.1/127.0.0.1 port=9092>: Closing connection.
<BrokerConnection node_id=0 host=127.0.0.1/127.0.0.1 port=9092>: reconnect backoff 0.0405948041074 after 1 failures
Shutdown of Kafka producer I/O thread has completed.
The Kafka producer has closed.




I think that the background sender thread have some troubles to work in the ExtensiveTesting Adapter Context. 
Indeed, I'm able to use deprecated simpleclient (http://kafka-python.readthedocs.io/en/master/simple.html#simpleclient-deprecated) that don't use any advanced message processing.  
I've also try to integrate via Library module without more success.

I just want to know if this type of issue have been already encountered using other  protocols and client implementation, and if there is integration tips for adapter?

Thanks 

Jordane

Denis MACHARD

unread,
Feb 11, 2018, 4:24:21 PM2/11/18
to jordane....@gmail.com, Extensive Testing Users
Hi Jordane :)

Be aware that an adapter (or library) are itself a python thread, pershaps is because of that.
Can you try to integrate Kafka directly in an extensive testcase without adapter, just to see if it related to the adapter thread or not ?


Denis

--
If you like this project and you want to sponsor me then I am accepting donations :) via PayPal. The money received will be used to cover web site costs, domain name reservation and more;).
---
You received this message because you are subscribed to the Google Groups "Extensive Testing Users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to extensive-testing-users+unsub...@googlegroups.com.
To post to this group, send email to extensive-testing-users@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/extensive-testing-users/84eeff5a-5007-4cdc-b84e-f84516459f9d%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

jordane fillatre

unread,
Feb 12, 2018, 4:39:48 PM2/12/18
to Extensive Testing Users
Hello!

I obtain same behavior by calling directly python-kafka lib in testcase. Curious, currently i haven't no more idea about the reason of queuing/dequeuing issue, main advanced feature compared to simple consumer/producer.

I will try to embedding kafka as agent, may be more successfully...

Jordane
To post to this group, send email to extensive-t...@googlegroups.com.

Denis MACHARD

unread,
Feb 13, 2018, 4:49:03 AM2/13/18
to Extensive Testing Users
Hi,

Can you send me a example of your integration ?
I will try to reproduce on my side.

Denis
Reply all
Reply to author
Forward
0 new messages