zeromq and python multiprocessing

66 views
Skip to first unread message

Davoud Taghawi-Nejad

unread,
Jul 4, 2015, 8:12:58 AM7/4/15
to zer...@googlegroups.com
Hello,

I have an agent-based model, where several agents are started by a central process and communicate via another central process. Every agent and the communication process communicate via zmq. However, when I start more than 100 agents standard_out sends:

Invalid argument (src/stream_engine.cpp:143)
Too many open files (src/ipc_listener.cpp:292) 

and Mac Os prompts a problem report :
Python quit unexpectedly while using the libzmq.5.dylib plug-in.

The problem appears to me that too many contexts are opened. But how can I avoid this with multiprocessing?

I attach part of the code below:

class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process):
    def __init__(self, idn, group, _addresses, trade_logging):
        multiprocessing.Process.__init__(self)
        ....

    def run(self):
        self.context = zmq.Context()
        self.commands = self.context.socket(zmq.SUB)
        self.commands.connect(self._addresses['command_addresse'])
        self.commands.setsockopt(zmq.SUBSCRIBE, "all")
        self.commands.setsockopt(zmq.SUBSCRIBE, self.name)
        self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out = self.context.socket(zmq.PUSH)
        self.out.connect(self._addresses['frontend'])
        time.sleep(0.1)
        self.database_connection = self.context.socket(zmq.PUSH)
        self.database_connection.connect(self._addresses['database'])
        time.sleep(0.1)
        self.logger_connection = self.context.socket(zmq.PUSH)
        self.logger_connection.connect(self._addresses['logger'])

        self.messages_in = self.context.socket(zmq.DEALER)
        self.messages_in.setsockopt(zmq.IDENTITY, self.name)
        self.messages_in.connect(self._addresses['backend'])

        self.shout = self.context.socket(zmq.SUB)
        self.shout.connect(self._addresses['group_backend'])
        self.shout.setsockopt(zmq.SUBSCRIBE, "all")
        self.shout.setsockopt(zmq.SUBSCRIBE, self.name)
        self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out.send_multipart(['!', '!', 'register_agent', self.name])

        while True:
            try:
                self.commands.recv()  # catches the group adress.
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s,self.commands.recv() to catch own adress ~1888' % (self.name))
                break
            command = self.commands.recv()
            if command == "!":
                subcommand = self.commands.recv()
                if subcommand == 'die':
                    self.__signal_finished()
                    break
            try:
                self._methods[command]()
            except KeyError:
                if command not in self._methods:
                    raise SystemExit('The method - ' + command + ' - called in the agent_list is not declared (' + self.name)
                else:
                    raise
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s, Current command: %s ~1984' % (self.name, command))
                break

            if command[0] != '_':
                self.__reject_polled_but_not_accepted_offers()
                self.__signal_finished()
        #self.context.destroy()

Thanks so much for your work!


Davoud
Reply all
Reply to author
Forward
0 new messages