It uses threading.local and creates a *new* connection upon failure.
I have included a small performance test to check reconnection.
```
import logging
import threading
import sys
import time
import pika
import simplejson as json
#### CONFIG ####
HOST = "rabbitmq"
PORT = 5672
USER = "guest"
PASS = "guest"
VHOST = "vhost"
EXCHANGE = "exchange"
TEST_DURATION = 60
LOG_FILE = "rmq.log"
#### CONFIG ####
logger = logging.getLogger(__name__)
logger.addHandler(logging.FileHandler(LOG_FILE))
thread = threading.local()
def retry(func, retries=0, delay=0, exceptions=None, *args, **kwargs):
retry = 0
while retry < retries:
try:
return func(*args, **kwargs)
except exceptions or Exception:
logger.exception(f"Retry attempt: {retry + 1}")
retry += 1
time.sleep(delay)
return func(*args, **kwargs)
class RMQ:
def __init__(self):
self.parameters = {
"host": HOST,
"port": PORT,
"virtual_host": VHOST,
"socket_timeout": 5
}
self.parameters["credentials"] = pika.PlainCredentials(
USER,
PASS
)
self.exchange = EXCHANGE
self.ack = True
def new_connection(self):
parameters = pika.ConnectionParameters(**self.parameters)
self.connection = pika.BlockingConnection(parameters)
def new_channel(self):
self.channel = self.connection.channel()
if self.ack:
self.channel.confirm_delivery()
self.channel.exchange_declare(
exchange=self.exchange,
exchange_type="direct",
durable=True,
auto_delete=False
)
def ensure_channel(self):
if not hasattr(self, "connection") or self.connection.is_closed:
self.new_connection()
self.new_channel()
def publish(self, queue, message, options=None):
properties = {"content_type": "application/json", "delivery_mode": 2}
if isinstance(options, dict):
properties.update(**options)
self.ensure_channel()
self.channel.basic_publish(
exchange=self.exchange,
routing_key=queue,
body=message,
properties=pika.BasicProperties(**properties)
)
def publish_retry(
self,
queue,
message,
retries=3,
delay=2,
options=None,
):
retry(
self.publish,
retries=retries,
delay=delay,
queue=queue,
message=message,
options=options,
)
def close(self):
self.channel.close()
self.connection.close()
@staticmethod
def object(config=None):
if not getattr(thread, "rmq", None):
thread.rmq = RMQ()
return thread.rmq
if __name__ == "__main__":
success = 0
failure = 0
rmq = RMQ.object()
rmq.ensure_channel()
try:
rmq.channel.queue_delete("test")
except:
pass
rmq.channel.queue_declare("test", durable=True)
rmq.channel.queue_bind("test", EXCHANGE)
def send_msg(name):
global success
global failure
while True:
try:
rmq = RMQ.object()
rmq.publish_retry(
queue="test",
message=json.dumps("test")
)
body = rmq.channel.basic_get(queue="test")[2]
assert json.loads(body) == "test"
sys.stdout.write(".")
sys.stdout.flush()
success += 1
except:
logger.exception("Exception encountered:")
sys.stdout.write("x")
sys.stdout.flush()
failure += 1
time.sleep(0.25)
try:
threads = 4
ts = []
for each in range(0, threads):
t = threading.Thread(target=send_msg, args=[each])
t.daemon = True
t.start()
ts.append(t)
time.sleep(TEST_DURATION)
print()
except KeyboardInterrupt:
print(" recieved, stopping.")
finally:
print("Published: {}".format(success))
print("Failed: {}".format(failure))
print("Total: {}".format(success + failure))
try:
rmq.channel.queue_delete("test")
except:
pass
sys.exit()
```