Hi All,
I was wondering if you can help me with the following:
I have added a small part that checks the filesystem and in case of new files, it parses them and is sending them over to the rabbitmq server.
def publish_message(self):
if self._stopping:
return
properties = pika.BasicProperties(delivery_mode=1, app_id="_ip_acc", content_type="application/json", headers=None)
while True:
for fn in fnmatch.filter(os.listdir(os.environ['IPACC_PLG']), 'ip-acc.*.*.out'):
try:
pmacct_log.info("> Processing "+fn)
time_stamp = fn.split(".")[-3] ; uniq = fn.split(".")[-2] ;
with open(os.environ['IPACC_PLG']+"/"+fn, "r") as f:
next(f)
cnt = 0 ; dq = [] ; msg = {} ; msg[time_stamp] = {} ;
for _ in f:
dq.append(_.rstrip(os.linesep))
cnt += 1
if cnt==BIN_NO:
msg[time_stamp][uniq] = dq
self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY, simplejson.dumps(msg), properties, mandatory=False, immediate=False)
self._message_number += 1
self._deliveries.append(self._message_number)
rabbitmq_log.debug("IPacc"+" > Published message # "+str(self._message_number)+" : msg="+time_stamp+"__"+uniq)
cnt = 0 ; dq = [] ; msg = {} ; msg[time_stamp] = {} ;
msg[time_stamp][uniq] = dq
self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY, simplejson.dumps(msg), properties, mandatory=False, immediate=False)
self._message_number += 1
self._deliveries.append(self._message_number)
rabbitmq_log.debug("IPacc"+" > Published message # "+str(self._message_number)+" : msg="+time_stamp+"__"+uniq)
cnt = 0 ; dq = [] ; msg = {} ;
if os.path.isfile(os.environ['IPACC_PLG']+"/"+fn):
os.remove(os.environ['IPACC_PLG']+"/"+fn)
pmacct_log.info("> Deleting "+fn)
except Exception, e:
pmacct_log.error(str(e))
pycounters.report_value("_ip_acc_exception", 1)
pycounters.output_report()
time.sleep(1)
and the connection settings are like:
async = RMQpublisher("amqp://"+os.environ['USR']+":"+os.environ['PASS']+"@"+socket.gethostbyname("rabbitmq-IPacc")+":5672/"+"IPacc"+"?connection_attempts=3&heartbeat_interval=60")
the issue that am having is that:
after the 1st iteration of the loop where all files are getting processed,
and new files maybe become available after 5-20min,
it seems that am losing the connection and getting the following error:
2016-04-29 14:10:20.539 : ERROR : IPacc : publish_message : 'NoneType' object has no attribute 'basic_publish'
Only the function schedule_next_message has been removed to avoid periodic polling.
Could you please advise why the self._channel appears to be None ? What am I missing ?
with kind regards,
Nikos