Sorry about last post's title make confuse, so i deleted that.
In my system user can send their message whenever they want.
.
But when heartbeat time out i got this error in rabbitmq server
missed heartbeats from client, timeout: 60s
And this is error i get in client :
[2020-04-24 09:48:10,337] ERROR - Socket EOF; <socket.socket fd=916, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::1', 61960, 0, 0), raddr=('::1', 5671, 0, 0)> at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\adapters\utils\io_services_utils.py:796
[2020-04-24 09:48:10,338] ERROR - Transport indicated EOF. at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\adapters\base_connection.py:445
[2020-04-24 09:48:10,339] INFO - protocol.eof_received() elected to close: <socket.socket fd=916, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::1', 61960, 0, 0), raddr=('::1', 5671, 0, 0)> at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\adapters\utils\io_services_utils.py:1058
[2020-04-24 09:48:10,340] INFO - _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=916, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::1', 61960, 0, 0), raddr=('::1', 5671, 0, 0)> at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\adapters\utils\io_services_utils.py:907
[2020-04-24 09:48:10,341] INFO - Deactivating transport: state=1; <socket.socket fd=916, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::1', 61960, 0, 0), raddr=('::1', 5671, 0, 0)> at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\adapters\utils\io_services_utils.py:870
[2020-04-24 09:48:10,342] ERROR - connection_lost: StreamLostError: ('Transport indicated EOF',) at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\adapters\base_connection.py:429
[2020-04-24 09:48:10,343] INFO - AMQP stack terminated, failed to connect, or aborted: opened=True, error-arg=StreamLostError: ('Transport indicated EOF',); pending-error=None at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\connection.py:1999
[2020-04-24 09:48:10,344] INFO - Stack terminated due to StreamLostError: ('Transport indicated EOF',) at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\connection.py:2065
[2020-04-24 09:48:10,345] INFO - Closing transport socket and unlinking: state=3; <socket.socket fd=916, family=AddressFamily.AF_INET6, type=SocketKind.SOCK_STREAM, proto=6, laddr=('::1', 61960, 0, 0), raddr=('::1', 5671, 0, 0)> at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\adapters\utils\io_services_utils.py:883
[2020-04-24 09:48:10,346] ERROR - Unexpected connection close detected: StreamLostError: ('Transport indicated EOF',) at C:\Users\mdtai\AppData\Local\Continuum\anaconda3\envs\stream\lib\site-packages\pika\adapters\blocking_connection.py:521
[2020-04-24 09:48:10,347] ERROR - Transport indicated EOF at D:\ai-apis\api\controllers\object_detect.py:106
It will reconnect when i post new request
How can i fix this problem without disable heartbeat
this is may code to connect to rabbitmq
import pika
import os
import json
import time
from utils import logging
from pika.exceptions import AMQPConnectionError
__instance__ = None
def connect(id, host, port, username, password):
global __instance__
if __instance__ is not None:
return __instance__
try:
__instance__ = MQ(id, pika.ConnectionParameters(
host=host, port=port, credentials=pika.PlainCredentials(
username=username, password=password), heartbeat=5,
retry_delay=5, connection_attempts=10))
return __instance__
except Exception as err:
logging.error(err)
return None
class MQ(object):
def __init__(self, id, config):
self._id = id
self._config = config
self._connection = pika.BlockingConnection(self._config)
self._stage = os.getenv('FLASK_ENV')
self._channel = self._connection.channel()
def publish(self, request_type, message):
# <stage>.request_type.<cv,card.....>
routing_key = '{}-{}-{}'.format(
self._stage, 'task', request_type)
# Reconnect
self.connect()
# Publish
try:
self._channel.basic_publish(
exchange='', routing_key=routing_key, properties=pika.BasicProperties(
app_id = self._id,
delivery_mode = 2, # make message persistent
timestamp = int(time.time()),
), body=json.dumps(message))
except pika.exceptions.AMQPConnectionError as err:
raise err
def connect(self):
# Reconnect
if self._connection.is_closed:
self._connection = pika.BlockingConnection(self._config)
# Reopen channel
if self._channel.is_closed:
self._channel = self._connection.channel()
def close(self):
if self._channel is not None:
self._channel.close()
self._channel = None
if self._connection is not None:
self._connection.close()
self._connection = None
Enviroment:
python 3.7+
pika rabbitmq 1.1.0
thanks for reading
mdtai