Kafka: msgpack.exceptions.ExtraData: unpack(b) received extra data.

557 views
Skip to first unread message

Michael Weber

unread,
Apr 5, 2016, 6:32:12 AM4/5/16
to frontera
Hi together, 

from one problem to another... :(

Now i got the exepction: msgpack.exceptions.ExtraData: unpack(b) received extra data.

The Spider writes now messages in frontier-done

Using 
Kafka version : 0.9.0.1
msgpack-python: 0.4.7
kafka-python: 1.0.2
python-snpayy: 0.5


[db-worker] Pushed new batch of 0 items
[db-worker] Consumed 0 items.
Unhandled error in Deferred:
[db-worker] [Failure instance: Traceback: <class 'msgpack.exceptions.ExtraData'>: unpack(b) received extra data.
/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py:1203:mainLoop
/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py:825:runUntilCurrent
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:393:callback
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:501:_startRunCallbacks
--- <exception caught here> ---
/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:588:_runCallbacks
/usr/lib/python2.7/dist-packages/frontera/utils/async.py:38:__call__
/usr/lib/python2.7/dist-packages/frontera/worker/db.py:112:consume_incoming
/usr/lib/python2.7/dist-packages/frontera/contrib/backends/remote/codecs/msgpack.py:79:decode
msgpack/_unpacker.pyx:143:msgpack._unpacker.unpackb (msgpack/_unpacker.cpp:143)
]


Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1203, in mainLoop
    self.runUntilCurrent()
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 825, in runUntilCurrent
    call.func(*call.args, **call.kw)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 393, in callback
    self._startRunCallbacks(result)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 501, in _startRunCallbacks
    self._runCallbacks()
--- <exception caught here> ---
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py", line 588, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
  File "/usr/lib/python2.7/dist-packages/frontera/utils/async.py", line 38, in __call__
    return self._func(*self._a, **self._kw)
  File "/usr/lib/python2.7/dist-packages/frontera/worker/db.py", line 112, in consume_incoming
    msg = self._decoder.decode(m)
  File "/usr/lib/python2.7/dist-packages/frontera/contrib/backends/remote/codecs/msgpack.py", line 79, in decode
    obj = unpackb(buffer)
  File "msgpack/_unpacker.pyx", line 143, in msgpack._unpacker.unpackb (msgpack/_unpacker.cpp:143)
    
msgpack.exceptions.ExtraData: unpack(b) received extra data.

Any hints?

Thanks!

Alexander Sibiryakov

unread,
Apr 5, 2016, 7:12:39 AM4/5/16
to Michael Weber, frontera

--
You received this message because you are subscribed to the Google Groups "frontera" group.
To unsubscribe from this group and stop receiving emails from it, send an email to frontera+u...@scrapinghub.com.
To post to this group, send email to fron...@scrapinghub.com.
To view this discussion on the web, visit https://groups.google.com/a/scrapinghub.com/d/msgid/frontera/c218be7a-7411-4b14-a3c2-0aded80c43ef%40scrapinghub.com.

Michael Weber

unread,
Apr 5, 2016, 7:33:44 AM4/5/16
to frontera, seomi...@googlemail.com
yes (after starting the zeromq broker)

Yuming Ye

unread,
Apr 7, 2016, 11:49:27 PM4/7/16
to frontera

Hi, 
I met the same problem as you described.
I  am using Kafka message bus,  with 1 DB worker, 
I have tried the workersettings like    BACKEND = 'frontera.contrib.backends.sqlalchemy.SQLAlchemyBackend'    and   BACKEND = 'frontera.contrib.backends.sqlalchemy.Distributed'.
In both settings, I met same error and  the DB worker could not receive anything. 

Is there any progress?
Thanks!

Alexander Sibiryakov

unread,
Apr 8, 2016, 6:33:08 AM4/8/16
to Yuming Ye, Michael Weber, frontera
+Michael,
+frontera

Michael,
you sent two different stack traces. First one breaks during spider log processing, and second one during new batch generation, meaning that corrupted data possible propagates to the database. So, I would try to replicate errors during spider log processing. Actually, you can also add try/catch and dump the corrupred message using binascii.hexlify, and post it here.

A.

On Fri, Apr 8, 2016 at 12:29 PM, Alexander Sibiryakov <sibir...@scrapinghub.com> wrote:
Hi,
Could you please post your stack trace, and message content from DB worker code, encoded using binascii.hexlify ?

A.

--
You received this message because you are subscribed to the Google Groups "frontera" group.
To unsubscribe from this group and stop receiving emails from it, send an email to frontera+u...@scrapinghub.com.
To post to this group, send email to fron...@scrapinghub.com.

Yuming Ye

unread,
Apr 11, 2016, 11:40:19 PM4/11/16
to frontera
Hi,
the exepction: msgpack.exceptions.ExtraData: unpack(b) received extra data

Using:
msgpack-python: 0.4.7
kafka-python: 1.0.2

with 1 DB worker, 1 Strategy worker , 2 spider on the same sever.

stack trace for  DB worker http://pastebin.com/nPurbMqE
stack trace for  Strategy worker  http://pastebin.com/HvSUzhda
Any hints?  is there any other info you need?

Thanks!
Message has been deleted

Yuming Ye

unread,
Apr 12, 2016, 4:02:42 AM4/12/16
to frontera, yym....@gmail.com, mich...@geminio.de

Hi,
this message may help, which is from kafka in topic ( frontier-done ) in Hexlify

>>> message = binascii.hexlify('¢as·http://www.terralis.es/¯scrapy_callback&domain¦netloc¯www.terralis.es¤name¯www.terralis.es«fingerprintبcad5e275056206045ac6bdcaaa023ddb09b9c9£sld ¦scheme¤http©subdomaintldscrapy_meta²origin_is_frontierëfingerprintb784b0f02aefe3f3dbc68571ca6ea32f51118b14®scrapy_errback')

>>> message 
'c2a26173c2b7687474703a2f2f7777772e74657272616c69732e65732fc2af7363726170795f63616c6c6261636b26646f6d61696ec2a66e65746c6f63c2af7777772e74657272616c69732e6573c2a46e616d65c2af7777772e74657272616c69732e6573c2ab66696e6765727072696e74d8a863616435653237353035363230363034356163366264636161613032336464623039623935626339c2a3736c6420c2a6736368656d65c2a468747470c2a9737562646f6d61696e746c647363726170795f6d657461c2b26f726967696e5f69735f66726f6e74696572c3ab66696e6765727072696e7462373834623066303261656665336633646263363835373163613665613332663531313138623134c2ae7363726170795f6572726261636b'

Thanks!

Ye




On Friday, April 8, 2016 at 6:33:08 PM UTC+8, Alexander Sibiryakov wrote:

Animesh Pratap Singh Sikarwar

unread,
Apr 12, 2016, 9:48:03 AM4/12/16
to frontera
I was also facing the same issue with Kakfa as message queue.

https://github.com/scrapinghub/frontera/blob/master/frontera/contrib/messagebus/kafkabus.py#L103
https://github.com/scrapinghub/frontera/blob/master/frontera/contrib/backends/remote/kafka.py#L70

i could overcome this by changing CODEC_SNAPPY to CODEC_NONE


I am still figuring out how to run this this.

Alexander Sibiryakov

unread,
Apr 12, 2016, 11:04:59 AM4/12/16
to Yuming Ye, frontera
Hi,

I need to replicate it, therefore you have to dump the message where decoder is throwing exception:
  1.   File "/home/hadoop/.local/lib/python2.7/site-packages/frontera/worker/strategy.py", line 51, in work
  2.     msg = self._decoder.decode(m)

Please add before this line something like
print binascii.hexlify(m)

And put the dumped hex string here.

A.


Message has been deleted
Message has been deleted

Animesh Pratap Singh Sikarwar

unread,
Apr 12, 2016, 5:12:46 PM4/12/16
to frontera

[backend] to fetch 0 from 0
[strategy-worker] Consumed 0 items.
82534e4150505900000000010000000100000121f00200000901200500000164f7bfed3b050ff07228363739653737323838316135303939353231656266363938363161656663356163643662653063340000012e92a261739194b9687474703a2f2f7777772e7375726c617461626c652e636f6d808086af7363726170795f63616c6c6261636bc0a6646f6d61696e87a66e65746c6f63b277773e360010a46e616d654a1800f058ab66696e6765727072696e74da002839346334346366373935646663333834616635373639646434373063363563663864656461366635a3736c64a0a6736368656d65a468747470a9737562646f6d61696ea0a3746c64a0ab0da3606d65746180b26f726967696e5f69735f66726f6e74696572c33a79009e22013cae7363726170795f6572726261636bc0
Unhandled error in Deferred:


Traceback (most recent call last):
  File "/Users/animesh/.virtualenvs/crawler/lib/python2.7/site-packages/twisted/internet/base.py", line 1194, in run
    self.mainLoop()
  File "/Users/animesh/.virtualenvs/crawler/lib/python2.7/site-packages/twisted/internet/base.py", line 1203, in mainLoop
    self.runUntilCurrent()
  File "/Users/animesh/.virtualenvs/crawler/lib/python2.7/site-packages/twisted/internet/base.py", line 825, in runUntilCurrent
    call.func(*call.args, **call.kw)
  File "/Users/animesh/.virtualenvs/crawler/lib/python2.7/site-packages/twisted/internet/task.py", line 239, in __call__
    d = defer.maybeDeferred(self.f, *self.a, **self.kw)
--- <exception caught here> ---
  File "/Users/animesh/.virtualenvs/crawler/lib/python2.7/site-packages/twisted/internet/defer.py", line 150, in maybeDeferred
    result = f(*args, **kw)
  File "/Users/animesh/.virtualenvs/crawler/lib/python2.7/site-packages/frontera/worker/strategy.py", line 52, in work
    msg = self._decoder.decode(m)
  File "/Users/animesh/.virtualenvs/crawler/lib/python2.7/site-packages/frontera/contrib/backends/remote/codecs/msgpack.py", line 79, in decode
    obj = unpackb(buffer)
  File "msgpack/_unpacker.pyx", line 143, in msgpack._unpacker.unpackb (msgpack/_unpacker.cpp:143)
    
msgpack.exceptions.ExtraData: unpack(b) received extra data.



Spider/Scrapy logs:

internal queue empty after fetch - returning None
got None from _get_messages
got 0 messages: []
<BrokerConnection host=192.168.0.6 port=9092> Request 5: ProduceRequest(required_acks=1, timeout=1000, topics=[(topic='frontier-done1', partitions=[(partition=0, messages=[(offset=0, message_size=0, message=(crc=2049265919, magic=0, attributes=2, key='679e772881a5099521ebf69861aefc5acd6be0c4', value='\x82SNAPPY\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x01\x1e\xf0\x02\x00\x00\x15\x01\x14\x01d\xf7\xbf\xed;\x05\x0f\xf0r(679e772881a5099521ebf69861aefc5acd6be0c4\x00\x00\x01.\x92\xa2as\x91\x94\xb9http://www.surlatable.com\x80\x80\x86\xafscrapy_callback\xc0\xa6domain\x87\xa6netloc\xb2ww>6\x00\x10\xa4nameJ\x18\x00\xf0X\xabfingerprint\xda\x00(94c44cf795dfc384af5769dd470c65cf8deda6f5\xa3sld\xa0\xa6scheme\xa4http\xa9subdomain\xa0\xa3tld\xa0\xab\r\xa3`meta\x80\xb2origin_is_frontier\xc3:y\x00\x9e"\x01<\xaescrapy_errback\xc0'))])])])
<BrokerConnection host=192.168.0.6 port=9092> Response 5: ProduceResponse(topics=[(topic=u'frontier-done1', partitions=[(partition=0, error_code=0, offset=5)])])
getting 256 messages
calling _get_message block=True timeout=4.9997048378
internal queue empty, fetching more messages



Thanks
Animesh

Alexander Sibiryakov

unread,
Apr 13, 2016, 6:53:32 AM4/13/16
to Animesh Pratap Singh Sikarwar, 叶宇铭, Michael Weber, frontera
The problem seems to be connected with Kafka-python upgrade to 1.0.x series. They completely reworked kafka client internals. 

Another issue connected with this is SW or DB workers are hanging from time to time, in particular on SIGINT.

So, as a fast fix I recommend to downgrade kafka-python to 0.9.5 until frontera will be fixed and tested with 1.0.x. Currently, I'm working on this fix and going to release it in the next days.

2Michael: Please make sure you don't send unicode to Kafka producers from backends. This is not supported by kafka-python.

A.

--
You received this message because you are subscribed to the Google Groups "frontera" group.
To unsubscribe from this group and stop receiving emails from it, send an email to frontera+u...@scrapinghub.com.
To post to this group, send email to fron...@scrapinghub.com.

Alexander Sibiryakov

unread,
Apr 26, 2016, 6:59:56 AM4/26/16
to frontera, horror...@gmail.com, yym....@gmail.com, mich...@geminio.de
So guys,
Does it work with 0.9.5 or lower versions? can you live with that for some time?

A.
To unsubscribe from this group and stop receiving emails from it, send an email to frontera+unsubscribe@scrapinghub.com.

To post to this group, send email to fron...@scrapinghub.com.
Reply all
Reply to author
Forward
0 new messages