> On Apr 6, 2020, at 12:27 PM, David Raymond <
David....@tomtom.com> wrote:
>
> Looks like this will get what you need.
>
>
> def some_complex_function(x):
> global q
> #stuff using q
>
> def pool_init(q2):
> global q
> q = q2
>
> def main():
> #initalize the Queue
> mp_comm_queue = mp.Queue()
>
> #Set up a pool to process a bunch of stuff in parallel
> pool = mp.Pool(initializer = pool_init, initargs = (mp_comm_queue,))
> ...
>
>
Gotcha, thanks. I’ll look more into that initializer argument and see how I can leverage it to do multiprocessing using spawn rather than fork in the future. Looks straight-forward enough. Thanks again!
---
Israel Brewster
Software Engineer
Alaska Volcano Observatory
Geophysical Institute - UAF
2156 Koyukuk Drive
Fairbanks AK 99775-7320
Work:
907-474-5172
cell:
907-328-9145
>
> -----Original Message-----
> From: David Raymond
> Sent: Monday, April 6, 2020 4:19 PM
> To:
pytho...@python.org
> Subject: RE: Multiprocessing queue sharing and python3.8
>
> Attempting reply as much for my own understanding.
>
> Are you on Mac? I think this is the pertinent bit for you:
> Changed in version 3.8: On macOS, the spawn start method is now the default. The fork start method should be considered unsafe as it can lead to crashes of the subprocess. See bpo-33725.
>
> When you start a new process (with the spawn method) it runs the module just like it's being imported. So your global " mp_comm_queue2=mp.Queue()" creates a new Queue in each process. Your initialization of mp_comm_queue is also done inside the main() function, which doesn't get run in each process. So each process in the Pool is going to have mp_comm_queue as None, and have its own version of mp_comm_queue2. The ID being the same or different is the result of one or more processes in the Pool being used repeatedly for the multiple steps in imap, probably because the function that the Pool is executing finishes so quickly.
>
> Add a little extra info to the print calls (and/or set up logging to stdout with the process name/id included) and you can see some of this. Here's the hacked together changes I did for that.
>
> import multiprocessing as mp
> import os
>
> mp_comm_queue = None #Will be initalized in the main function
> mp_comm_queue2 = mp.Queue() #Test pre-initalized as well
>
> def some_complex_function(x):
> print("proc id", os.getpid())
> print("mp_comm_queue", mp_comm_queue)
> print("queue2 id", id(mp_comm_queue2))
> mp_comm_queue2.put(x)
> print("queue size", mp_comm_queue2.qsize())
> print("x", x)
> return x * 2
>
> def main():
> global mp_comm_queue
> #initalize the Queue
> mp_comm_queue = mp.Queue()
>
> #Set up a pool to process a bunch of stuff in parallel
> pool = mp.Pool()
> values = range(20)
> data = pool.imap(some_complex_function, values)
>
> for val in data:
> print(f"**{val}**")
> print("final queue2 size", mp_comm_queue2.qsize())
>
> if __name__ == "__main__":
> main()
>
>
>
> When making your own Process object and stating it then the Queue should be passed into the function as an argument, yes. The error text seems to be part of the Pool implementation, which I'm not as familiar with enough to know the best way to handle it. (Probably something using the "initializer" and "initargs" arguments for Pool)(maybe)
> Traceback (most recent call last):
> File "test_multi.py", line 32, in <module>
> main()
> File "test_multi.py", line 28, in main
> for val in data:
> File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 748, in next
> raise value
> File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py", line 431, in _handle_tasks
> put(task)
> File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 206, in send
> self._send_bytes(_ForkingPickler.dumps(obj))
> File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py", line 51, in dumps
> cls(buf, protocol).dump(obj)
> File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py", line 58, in __getstate__
> context.assert_spawning(self)
> File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py", line 356, in assert_spawning
> ' through inheritance' % type(obj).__name__
> RuntimeError: Queue objects should only be shared between processes through inheritance
>
> after I add the following to the code to try passing the queue rather than having it global:
>
> #Try by passing queue
> values=[(x,mp_comm_queue) for x in range(20)]
> data=pool.imap(some_complex_function,values)
> for val in data:
> print(f"**{val}**")
>
> So if I can’t pass it as an argument, and having it global is incorrect (at least starting with 3.8), what is the proper method of getting multiprocessing queues to child processes?
>
> ---
> Israel Brewster
> Software Engineer
> Alaska Volcano Observatory
> Geophysical Institute - UAF
> 2156 Koyukuk Drive
> Fairbanks AK 99775-7320
> Work:
907-474-5172
> cell:
907-328-9145
>
> --
>
https://mail.python.org/mailman/listinfo/python-list
> --
>
https://mail.python.org/mailman/listinfo/python-list