pika | NoneType' object has no attribute 'basic_publish'

662 views
Skip to first unread message

Nikos Skalis

unread,
Apr 29, 2016, 8:27:11 AM4/29/16
to rabbitmq-users
Hi All,

I was wondering if you can help me with the following:
I am using the original pika library as a publisher according to : https://pika.readthedocs.io/en/0.10.0/examples/asynchronous_publisher_example.html

I have added a small part that checks the filesystem and in case of new files, it parses them and is sending them over to the rabbitmq server.

def publish_message(self):
       
if self._stopping:
           
return
        properties
= pika.BasicProperties(delivery_mode=1, app_id="_ip_acc", content_type="application/json", headers=None)


       
while True:
           
for fn in fnmatch.filter(os.listdir(os.environ['IPACC_PLG']), 'ip-acc.*.*.out'):
               
try:
                    pmacct_log
.info("> Processing "+fn)
                    time_stamp
= fn.split(".")[-3] ; uniq = fn.split(".")[-2] ;
                   
with open(os.environ['IPACC_PLG']+"/"+fn, "r") as f:
                       
next(f)


                        cnt
= 0 ; dq = [] ; msg = {} ; msg[time_stamp] = {} ;
                       
for _ in f:
                            dq
.append(_.rstrip(os.linesep))
                            cnt
+= 1
                           
if cnt==BIN_NO:
                                msg
[time_stamp][uniq] = dq
                               
self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY, simplejson.dumps(msg), properties, mandatory=False, immediate=False)
                               
self._message_number += 1
                               
self._deliveries.append(self._message_number)
                                rabbitmq_log
.debug("IPacc"+" > Published message # "+str(self._message_number)+" : msg="+time_stamp+"__"+uniq)
                                cnt
= 0 ; dq = [] ; msg = {} ; msg[time_stamp] = {} ;
                        msg
[time_stamp][uniq] = dq
                       
self._channel.basic_publish(self.EXCHANGE, self.ROUTING_KEY, simplejson.dumps(msg), properties, mandatory=False, immediate=False)
                       
self._message_number += 1
                       
self._deliveries.append(self._message_number)
                        rabbitmq_log
.debug("IPacc"+" > Published message # "+str(self._message_number)+" : msg="+time_stamp+"__"+uniq)
                        cnt
= 0 ; dq = [] ; msg = {} ;


                   
if os.path.isfile(os.environ['IPACC_PLG']+"/"+fn):
                        os
.remove(os.environ['IPACC_PLG']+"/"+fn)
                        pmacct_log
.info("> Deleting "+fn)
               
except Exception, e:
                    pmacct_log
.error(str(e))
                    pycounters
.report_value("_ip_acc_exception", 1)


                pycounters
.output_report()
            time
.sleep(1)

and the connection settings are like:
async = RMQpublisher("amqp://"+os.environ['USR']+":"+os.environ['PASS']+"@"+socket.gethostbyname("rabbitmq-IPacc")+":5672/"+"IPacc"+"?connection_attempts=3&heartbeat_interval=60")

the issue that am having is that:
after the 1st iteration of the loop where all files are getting processed,
and new files maybe become available after 5-20min,
it seems that am losing the connection and getting the following error:
2016-04-29 14:10:20.539 : ERROR : IPacc : publish_message : 'NoneType' object has no attribute 'basic_publish'

Only the function schedule_next_message has been removed to avoid periodic polling.

Could you please advise why the self._channel appears to be None ? What am I missing ?

with kind regards,
Nikos

Laing, Michael

unread,
Apr 29, 2016, 8:56:42 AM4/29/16
to rabbitm...@googlegroups.com
Probably your time.sleep() is causing heartbeats to be missed and the channel/connection closed.

Try using a pika timeout instead.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Nikos Skalis

unread,
Apr 29, 2016, 9:20:25 AM4/29/16
to rabbitmq-users
Thanks Michael Confirmed. I have disabled heartbeats by setting it to 0.

How I would I set the

timeout

in order to stop the process and exit the event loop ?

Nikos

Laing, Michael

unread,
Apr 29, 2016, 4:31:30 PM4/29/16
to rabbitm...@googlegroups.com
The simplest way to modify your code (I think) would be to:
  • eliminate the 'while True:' loop, adjusting formatting
  • replace the 'time.sleep(1)' with 'self._connection.add_timeout(1, self.publish_message)'
During the timeout, the event loop will continue to run, handling heartbeats and other events.

After the timeout, your publish_message method will be executed again.

Caveat: I did not actually try this with your code.

ml
Reply all
Reply to author
Forward
0 new messages