multiprocessing?

1,815 views
Skip to first unread message

Tim Howard

unread,
Sep 3, 2011, 1:40:44 PM9/3/11
to networkx-discuss
All,
There's one reference to Python's multiprocessing module in the
NetworkX archives (http://groups.google.com/group/networkx-discuss/
browse_thread/thread/b5b0208cd7e791c2 ) with the author indicating it
is difficult to parallel-process network algorithms.

I'm more interested in sending off algorithm tasks to different
processors/cores and wonder if anyone has attempted such a thing and
if it is feasible. For example, if I have a loop that cycles through
nodes in a graph and acquires the least cost path for each node pair:

for i, p1 in enumerate(Nodes):
for p2 in Nodes[i + 1]:
if p1 not in paths[p2]:
dlength, dpath = nx.bidirectional_dijkstra(G, p2, p1)
paths[p1][p2] = ['path':dpath, 'wgt';dlength]


Can anyone speak to how difficult it may be to employ different
processors to complete different iterations of the loop? Like sending
all the even numbered i to processor 1 and the odd i to processor 2.
The biggest concern may be that each processor may need a full copy of
G, which may be prohibitive.

Thanks in advance to any thoughts.
Tim

Ben Edwards

unread,
Sep 3, 2011, 3:45:35 PM9/3/11
to networkx...@googlegroups.com
I've successfully used the multiprocessing module and I believe there was an example using it to betweenness centrality in the works at some point. The module itself is a bit clunky. I would recommend checking out the IPython parallel module. It has a pretty clean implementation.

Ben


--
You received this message because you are subscribed to the Google Groups "networkx-discuss" group.
To post to this group, send email to networkx...@googlegroups.com.
To unsubscribe from this group, send email to networkx-discu...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/networkx-discuss?hl=en.


Jordi Torrents

unread,
Sep 4, 2011, 10:02:06 AM9/4/11
to networkx...@googlegroups.com
Hi Tim,

You can find Ben's example for computing betweenness centrality in parallel at:

https://networkx.lanl.gov/trac/ticket/594

On the other hand I've used parallel python
(http://www.parallelpython.com/) to parallelize the computation of
null models. You can also find an example at:

https://networkx.lanl.gov/trac/ticket/593

It is not a parallel version of an algorithm already implemented, but
the parallelization of the same computation applied to slightly
different inputs (random networks). So it is not exactly what you want
but maybe you can adapt the code to suit your needs.

Hope that helps.

Salut!

2011/9/3 Tim Howard <tgho...@gmail.com>:

Thomas Capelle

unread,
Sep 4, 2011, 12:35:21 PM9/4/11
to networkx...@googlegroups.com
If you want to do easy function paralelization, try the python pool module, its great to apply funcitons to arrays.
Gud luck.
Thomas

Tim Howard

unread,
Sep 5, 2011, 9:37:24 AM9/5/11
to networkx-discuss
Thanks everyone for comments and links - the betweenness example looks
most promising. I'm still a little worried that I have to send the
entire graph to each processor (and thus duplicate it in memory -- is
that what both of the examples your provided do, Jodi?). There is an
example of sharing arrays in the python documentation. Perhaps I'll
try that out.

Thanks again, I'll give it a try.
Best,
Tim

On Sep 4, 12:35 pm, Thomas Capelle <tcape...@dim.uchile.cl> wrote:
> If you want to do easy function paralelization, try the python pool module, its great to apply funcitons to arrays.
> Gud luck.
> Thomas
> On Sep 4, 2011, at 10:02 AM, Jordi Torrents wrote:
>
>
>
>
>
>
>
> > Hi Tim,
>
> > You can find Ben's example for computing betweenness centrality in parallel at:
>
> >https://networkx.lanl.gov/trac/ticket/594
>
> > On the other hand I've used parallel python
> > (http://www.parallelpython.com/) to parallelize the computation of
> > null models. You can also find an example at:
>
> >https://networkx.lanl.gov/trac/ticket/593
>
> > It is not a parallel version of an algorithm already implemented, but
> > the parallelization of the same computation applied to slightly
> > different inputs (random networks). So it is not exactly what you want
> > but maybe you can adapt the code to suit your needs.
>
> > Hope that helps.
>
> > Salut!
>
> > 2011/9/3 Tim Howard <tghow...@gmail.com>:
> >> For more options, visit this group athttp://groups.google.com/group/networkx-discuss?hl=en.

Jordi Torrents

unread,
Sep 5, 2011, 1:53:22 PM9/5/11
to networkx...@googlegroups.com
Hello Tim,

I'm not sure, but I think that you will have to duplicate the network
in memory in order to do parallel computations. Maybe someone on the
list with a better knowledge of python implementations for parallel
computing (multiprocessing module and parallel python) could add more
information.

Salut!

Jordi

2011/9/5 Tim Howard <tgho...@gmail.com>:

Ben Edwards

unread,
Sep 5, 2011, 2:14:25 PM9/5/11
to networkx...@googlegroups.com
Tim,

Python doesn't really have support for a shared memory parallel model. There is multithreading, but if you are using the default python the Global Interpretor Lock prevents the threads from executing truly in parallel. The solution now is multiprocessing, which spawns multiple processes (and their memory stacks), to get things done.

Ben

László Sándor

unread,
Sep 10, 2011, 8:03:31 PM9/10/11
to networkx...@googlegroups.com
Hi all,

I tried to apply the logic of Ben's example to my problem, below. Never mind that in this case the overhead is almost sure to kill me (other things will be better to parallelize in the code). But I even got a syntax error in Python 3.2 for the simplest function among all, that simply collected arguments into a tuple to channel them through pmap. Why is this definition invalid in py3k? Thanks!

The error:
  File "parser5_nodots_parallel.py", line 53
    def _pmap((G,B,u,nbrs2)):
              ^
SyntaxError: invalid syntax

The full code:
def chunks(l,n):
    """Divide a list of nodes `l` in `n` chunks"""
    l_c = iter(l)
    while 1:
        x = tuple(itertools.islice(l_c,n))
        if not x:
            return
        yield x

def overlaps(G,B,u,nbrs2):
    l = []
    for v in nbrs2:
        for mutual_cell in set(B[u]) & set(B[v]):
            for uspell in B.get_edge_data(u,mutual_cell).values():
                ustart = uspell[1]
                uend = uspell[2]
                for vspell in B.get_edge_data(v,mutual_cell).values():
                    vstart = vspell[1]
                    vend = vspell[2]
                    if uend > vstart and vend > ustart:
                        ostart = max(ustart,vstart)
                        oend = min(uend,vend)
                        olen = (oend-ostart+1)/86400
                        ocell = mutual_cell
                        if (v not in G[u] or ostart not in [ edict[1] for edict in G[u][v].values() ]):
                            l.append([(u,v,{0: olen,1: ostart,2: oend,3: ocell})])
    return l

def _pmap((G,B,u,nbrs2)):
    """Pool for multiprocess only accepts functions with one argument. This function
    uses a tuple as its only argument.
    """
    return overlaps(G,B,u,nbrs2)

def time_overlap_projected_graph_parallel(B, nodes):
G=nx.MultiGraph()
G.add_nodes_from((n,B.node[n]) for n in nodes)
add_edges_from = nx.MultiGraph.add_edges_from
get_edge_data = nx.MultiGraph.get_edge_data
for u in nodes:
unbrs = set(B[u])
nbrs2 = set((n for nbr in unbrs for n in B[nbr])) - set([u])
# iterate over subsets of neighbors - parallelize
        p = Pool(processes=4)
        node_divisor = len(p._pool)*4
        node_chunks = list(chunks(nbrs2,len(nbrs2)/node_divisor))
        num_chunks = len(node_chunks)
pedgelists = p.map(_pmap,
                  zip([G]*num_chunks,
                      [B]*num_chunks,
                      [u]*num_chunks,
                               node_chunks))
    list = []
    for l in pedgelists:
       list.append(l)
    G.add_edges_from(list)
    # compile long list
        # add edges from long list in a single step
return G 

Ben Edwards

unread,
Sep 10, 2011, 10:53:57 PM9/10/11
to networkx...@googlegroups.com
Python 3 no longer unpacks tuple arguments to functions.


You'd have to make it:

def _pmap(arg_tuple):
    """Pool for multiprocess only accepts functions with one argument. This function
    uses a tuple as its only argument.
    """
    return overlaps(arg_tuple[0],arg_tuple[1],arg_tuple[2],arg_tuple[3])

2011/9/10 László Sándor <san...@gmail.com>

--
You received this message because you are subscribed to the Google Groups "networkx-discuss" group.
To view this discussion on the web visit https://groups.google.com/d/msg/networkx-discuss/-/dRM755PZaj0J.

László Sándor

unread,
Sep 11, 2011, 12:58:57 PM9/11/11
to networkx...@googlegroups.com
Thanks, Ben!

Some things have changed in itertools too, I think that causes me new problem (or I bungled the adaptation of your code). If it's much faster for you to see how node_chunks must change than for me, please let us know.

Here is the function I took from you:

def chunks(l,n):
    """Divide a list of nodes `l` in `n` chunks"""
    l_c = iter(l)
    while 1:
        x = tuple(itertools.islice(l_c,n))
        if not x:
            return
        yield x

and it is called here:
        p = Pool(processes=4)
        node_divisor = len(p._pool)*4
        node_chunks = list(chunks(nbrs2,len(nbrs2)/node_divisor))

The error message is:
File "parser5_nodots_parallel.py", line 29, in chunks
    x = tuple(itertools.islice(l_c,n))
ValueError: Stop argument for islice() must be None or an integer: 0 <= x <= sys.maxsize

Ben Edwards

unread,
Sep 11, 2011, 1:42:43 PM9/11/11
to networkx...@googlegroups.com
My guess is that node_divisor is not an integer, so when len(nbrs)/node_divisors is passed to chunks it ends up being a float (I think). Try len(nbrs)/int(node_divisors)

Ben

2011/9/11 László Sándor <san...@gmail.com>

--
You received this message because you are subscribed to the Google Groups "networkx-discuss" group.
To view this discussion on the web visit https://groups.google.com/d/msg/networkx-discuss/-/tXzsl4Qcdo0J.

László Sándor

unread,
Sep 11, 2011, 1:52:36 PM9/11/11
to networkx...@googlegroups.com
Thanks, Ben,

int(node_divisors) did not solve the problem, but forcing the whole thing to be an int did.

By the way, with four cores I see a neat speed bump from parallelizing this way, though I think with larger networks the passing of graphs B and G will erase the gains --- if you ever checked, I could only parallelize within an inner loop, as the code must use information on the entire graph that each iteration (of the inner loop) updates. I will test this, of course, but I am also interested in any opinion about this. I will be able to run on 8 cores at most. (To use the cluster proper, I would need to use MPI which I had not courage to try so far.)

Thanks again,

Laszlo

László Sándor

unread,
Sep 11, 2011, 2:13:15 PM9/11/11
to networkx...@googlegroups.com
Oh, Ben, if I may, a new question: I hoped to chunk up an iterator to spread for-loop across cores. Would this be too much of a good thing? I should just keep a list of nodes and chunk that? And then can/should I make an iterator of the list somehow, or Py3k can get no faster that way?

Thanks!

Here is what I tried:

def consolidate_g(B,niter):
    neighbors_iter = nx.MultiGraph.neighbors_iter
    get_edge_data = nx.MultiGraph.get_edge_data
    l = []
    for n in niter:
        for m in B.neighbors_iter(n):
            omin = 2000000000.0
            omax = 0.0
            otot = 0.0
            cells = []
            spells = []
            for ed in B.get_edge_data(m,n).values():
                omin = min(omin,ed[1])
                omax = max(omax,ed[2])
                otot += ed[0]
                cells.append(ed[3])
                spells.append((ed[1],ed[2]))
            l.append([(n,m,{5: otot,6: omin,7: omax,8: cells,9: spells})])
            # collect only lists

def _pmap2(arg_tuple):
    """Pool for multiprocess only accepts functions with one argument. This function
    uses a tuple as its only argument.
    """
    return consolidate_g(arg_tuple[0],arg_tuple[1],arg_tuple[2],arg_tuple[3])

def consolidated_g_parallel(B):
    G = nx.Graph()
    G.add_nodes_from(B)
    # iterate over only subsets of nodes, but entire graph --- parallelize
    p = Pool(processes=4)
    node_divisor = len(p._pool)*4
    node_chunks = list(chunks(B.nodes_iter(),int(len(B.nodes_iter())/int(node_divisor))))
    num_chunks = len(node_chunks)
    pedgelists = p.map(_pmap2,
                       zip([G]*num_chunks,
                           [B]*num_chunks,
                           [u]*num_chunks,
                           node_chunks))
    # compile long list
    # add edges from long list in a single step
    ll = []
    for l in pedgelists:
        ll.append(l)
    G.add_edges_from(ll)
    return G

Ben Edwards

unread,
Sep 11, 2011, 5:11:57 PM9/11/11
to networkx...@googlegroups.com
Good, I forgot that in Python 3, division defaults to a float, so your solution works. You can also do len(nbrs)//node_divisors should give an int.

Ben Edwards

unread,
Sep 11, 2011, 5:14:58 PM9/11/11
to networkx...@googlegroups.com
I'm not quite sure what you mean. Generally, I try to parallelize at 'higher levels', or more 'course grained', that is the outer loop. This is of course dependent on your problem, and for me just tends to be the easiest (I'm no parallel computing expert).

László Sándor

unread,
Sep 14, 2011, 6:38:12 PM9/14/11
to networkx...@googlegroups.com
Thanks to all of you, I had made some progress in parallelizing my code.

However, as I was mostly making lists to add to my [Multi][Di]Graphs later on with one big add_edges_from command, I wanted to ask you if it would be more efficient to a build smaller graph in each thread and somehow merge those in the final step bringing the threads back together. Thanks!

On an unrelated note, I have an instance where an exact analogue of everything else what worked suddenly broke down with an error, and if it rings a bell to any of you, I obviously would be very grateful for any comment or suggestion:
Exception in thread Thread-2:
Traceback (most recent call last):
  File "/n/sw/python-3.2/lib/python3.2/threading.py", line 736, in _bootstrap_inner
    self.run()
  File "/n/sw/python-3.2/lib/python3.2/threading.py", line 689, in run
    self._target(*self._args, **self._kwargs)
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 338, in _handle_tasks
    put(task)
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed

László Sándor

unread,
Sep 14, 2011, 7:46:21 PM9/14/11
to networkx...@googlegroups.com
Ben, I think this might be of interest to more people (whoever looks this thread up):

Could you comment on what your original code does in https://networkx.lanl.gov/trac/attachment/ticket/594/betweenness_centrality.py , at least the code crucial to make it parallel? I think I adapted this part wrongly for my applications. I comment in my questions inline below. Thanks!

def betweenness_centrality_parallel(G,processes=None):
    """Parallel betweenness centrality  function"""
    p = Pool(processes=processes)
    node_divisor = len(p._pool)*4
# This is a number, 4 times the length of the pool. Why is that the divisor? Because you had four arguments in your function, zipped together below?
    node_chunks = list(chunks(G.nodes(),G.order()/node_divisor))
    num_chunks = len(node_chunks)
    bt_sc = p.map(_betmap,
                  zip([G]*num_chunks,
# Thus this zip need to be listed in py3k? And what do I do if my function uses only one argument? I just plug in node_chunks? Do I need to zip it still?
                      [True]*num_chunks,
                      [None]*num_chunks,
                      node_chunks))
    bt_c = reduce(_betreduce,bt_sc)
# What is an elegant way to do without reduce in py3k?
    return bt_c


2011/9/14 László Sándor <san...@gmail.com>
Reply all
Reply to author
Forward
0 new messages