I keep getting 'Broken pipe' error message

867 views
Skip to first unread message

Ziv Kaspersky

unread,
Nov 28, 2020, 12:51:33 PM11/28/20
to Pika
I'm using `BlockingConnection` with no other spacial set up lets say this is my code striped of other sensitive company data:
```
cred = PlainCredentials(username=username, password=password, erase_on_connect=True)
self_connection = BlockingConnection(ConnectionParameters(host=server,
port=port,
credentials=cred))
self._channel = self._connection.channel()
self._channel.queue_declare(
queue=self.queue_name,
durable=True)
for method, properties, body in self._channel.consume(
self.queue_name, inactivity_timeout=60):
if not method:
break
self._channel.basic_ack(method.delivery_tag)
*** handle logic that can take up to 3 hours ***
```

Ziv Kaspersky

unread,
Nov 28, 2020, 12:52:04 PM11/28/20
to Pika
error message:
```
[19:35] pika.adapters.utils.io_services_utils  [ERROR]: _AsyncBaseTransport._produce() failed, aborting connection: error=BrokenPipeError(32, 'Broken pipe'); sock=<socket.socket fd=16, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('10.212.134.201', 55753)>; Caller's stack:
Traceback (most recent call last):
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable
    self._produce()
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 820, in _produce
    self._tx_buffers[0])
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
    return func(*args, **kwargs)
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send
    return sock.send(data)
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 1097, in _on_socket_writable
    self._produce()
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 820, in _produce
    self._tx_buffers[0])
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 79, in retry_sigint_wrap
    return func(*args, **kwargs)
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/utils/io_services_utils.py", line 861, in _sigint_safe_send
    return sock.send(data)
BrokenPipeError: [Errno 32] Broken pipe
[19:35] pika.adapters.base_connection   [ERROR]: connection_lost: StreamLostError: ("Stream connection lost: BrokenPipeError(32, 'Broken pipe')",)
[19:35] pika.adapters.blocking_connection  [ERROR]: Unexpected connection close detected: StreamLostError: ("Stream connection lost: BrokenPipeError(32, 'Broken pipe')",)
[19:35] dal.main                        [ERROR]: DalService Stopped unexpectedly: Stream connection lost: BrokenPipeError(32, 'Broken pipe')
[19:35] dal.main                        [DEBUG]: Stack Trace:
Traceback (most recent call last):
  File "/Users/zivkaspersky/work/cymptom-collector/dal/main.py", line 41, in run_service
    self.handle_db_tasks()
  File "/Users/zivkaspersky/work/cymptom-collector/dal/main.py", line 81, in handle_db_tasks
    for task in self.task_queue.pull_tasks():
  File "/Users/zivkaspersky/work/cymptom-collector/cymptomlib/msg_queue/rabbit_mq.py", line 194, in pull_tasks
    self.queue_name, inactivity_timeout=int(timeout_ms/1000) if isinstance(timeout_ms, int) else timeout_ms
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 1992, in consume
    self._process_data_events(time_limit=delta)
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
    self.connection.process_data_events(time_limit=time_limit)
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 828, in process_data_events
    self._flush_output(timer.is_ready, common_terminator)
  File "/Users/zivkaspersky/work/cymptom-collector/venv/lib/python3.7/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: BrokenPipeError(32, 'Broken pipe')
```

Luke Bakken

unread,
Nov 29, 2020, 9:47:58 AM11/29/20
to Pika
Hello,

Any time your application experiences an unexpected error, always check the RabbitMQ log for messages at the same time.

self._channel.basic_ack(method.delivery_tag)
*** handle logic that can take up to 3 hours ***

During those three hours, your code block's Pika's I/O loop. Do your work on another thread, and ack after the work is done:


Luke

Ziv Kaspersky

unread,
Nov 30, 2020, 5:28:01 AM11/30/20
to pika-...@googlegroups.com
Thanks, I'll keep that in mind (checking the RabbitMQ log )
and try to move to asynchronous design.
---
Ziv Kaspersky
Senior Security Researcher

Cymptom
w:cymptom.com  e: z...@cymptom.com

WE TLV Tower, 4th FloorDerech Menachem Begin 150, Tel Aviv-Yafo

CONFIDENTIALITY NOTICE -- This email is intended only for the person(s) named in the message header.
Unless otherwise indicated, it contains information that is confidential, privileged and/or exempt from disclosure
under applicable law. If you have received this message in error, please notify the sender of the error and delete the message. 


--
You received this message because you are subscribed to a topic in the Google Groups "Pika" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/pika-python/or1aDXpE1Fg/unsubscribe.
To unsubscribe from this group and all its topics, send an email to pika-python...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/pika-python/2e56b9d0-c3fd-4d45-98b8-ee57a4d7b290n%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages