Broadcasting variables

790 views
Skip to first unread message

Madeleine Udell

unread,
Nov 20, 2014, 11:20:20 PM11/20/14
to julia...@googlegroups.com
I'm trying to use parallelism in julia for a task with a structure that I think is quite pervasive. It looks like this:

# broadcast lists of functions f and g to all processes so they're available everywhere
# create shared arrays X,Y on all processes so they're available everywhere
for iteration=1:1000
@parallel for i=1:size(X)
X[i] = f[i](Y)
end
@parallel for j=1:size(Y)
Y[j] = g[j](X)
end
end

I'm having trouble making this work, and I'm not sure where to dig around to find a solution. Here are the difficulties I've encountered:

* @parallel doesn't allow me to create persistent variables on each process; ie, the following results in an error.

        s = Base.shmem_rand(12,3)
@parallel for i=1:nprocs() m,n = size(s) end
@parallel for i=1:nprocs() println(m) end

* @everywhere does allow me to create persistent variables on each process, but doesn't send any data at all, including the variables I need in order to define new variables. Eg the following is an error: s is a shared array, but the variable (ie pointer to) s is apparently not shared.
s = Base.shmem_rand(12,3)
@everywhere m,n = size(s)

Here are the kinds of questions I'd like to see protocode for:
* How can I broadcast a variable so that it is available and persistent on every process?
* How can I create a reference to the same shared array "s" that is accessible from every process? 
* How can I send a command to be performed in parallel, specifying which variables should be sent to the relevant processes and which should be looked up in the local namespace?

Note that everything I ask above is not specific to shared arrays; the same constructs would also be extremely useful in the distributed case.

----------------------

An interesting partial solution is the following:
funcs! = Function[x->x[:] = x+k for k=1:3]
d = drand(3,12)
let funcs! = funcs!
  @sync @parallel for k in 1:3
    funcs![myid()-1](localpart(d))
  end
end

Here, I'm not sure why the let statement is necessary to send funcs!, since d is sent automatically.

---------------------

Thanks!
Madeleine

Tim Holy

unread,
Nov 21, 2014, 2:14:47 PM11/21/14
to julia...@googlegroups.com
My experiments with parallelism tend to occur in focused blocks, and I haven't
done it in quite a while. So I doubt I can help much. But in general I suspect
you're encountering these problems because much of the IPC goes through
thunks, and so a lot of stuff gets reclaimed when execution is done.

If I were experimenting, I'd start by trying to create RemoteRef()s and put!
()ing my variables into them. Then perhaps you might be able to fetch them
from other processes. Not sure that will work, but it seems to be worth a try.

HTH,
--Tim

Madeleine Udell

unread,
Nov 21, 2014, 8:13:13 PM11/21/14
to julia...@googlegroups.com
My experiments with parallelism also occur in focused blocks; I think that's a sign that it's not yet as user friendly as it could be.

Here's a solution to the problem I posed that's simple to use: @parallel + global can be used to broadcast a variable, while @everywhere can be used to do a computation on local data (ie, without resending the data). I'm not sure how to do the variable renaming programmatically, though.

# initialize variables
m,n = 10,20
localX = Base.shmem_rand(m)
localY = Base.shmem_rand(n)
localf = [x->i+sum(x) for i=1:m]
localg = [x->i+sum(x) for i=1:n]

# broadcast variables to all worker processes
@parallel for i=workers()
    global X = localX
    global Y = localY
    global f = localf
    global g = localg
end
# give variables same name on master
X,Y,f,g = localX,localY,localf,localg

# compute
for iteration=1:10
    @everywhere for i=localindexes(X)
        X[i] = f[i](Y)
    end
    @everywhere for j=localindexes(Y)
        Y[j] = g[j](X)
    end
end
--
Madeleine Udell
PhD Candidate in Computational and Mathematical Engineering
Stanford University

Madeleine Udell

unread,
Nov 22, 2014, 9:54:27 PM11/22/14
to julia...@googlegroups.com
The code block I posted before works, but throws an error when embedded in a function: "ERROR: X not defined" (in first line of @parallel). Why am I getting this error when I'm *assigning to* X?

function doparallelstuff(m = 10, n = 20)
    # initialize variables
    localX = Base.shmem_rand(m)
    localY = Base.shmem_rand(n)
    localf = [x->i+sum(x) for i=1:m]
    localg = [x->i+sum(x) for i=1:n]

    # broadcast variables to all worker processes
    @parallel for i=workers()
        global X = localX
        global Y = localY
        global f = localf
        global g = localg
    end
    # give variables same name on master
    X,Y,f,g = localX,localY,localf,localg

    # compute
    for iteration=1:1
        @everywhere for i=localindexes(X)
            X[i] = f[i](Y)
        end
        @everywhere for j=localindexes(Y)
            Y[j] = g[j](X)
        end
    end
end

doparallelstuff()

Amit Murthy

unread,
Nov 23, 2014, 12:11:18 AM11/23/14
to julia...@googlegroups.com
This works:

function doparallelstuff(m = 10, n = 20)
    # initialize variables
    localX = Base.shmem_rand(m; pids=procs())
    localY = Base.shmem_rand(n; pids=procs())
    localf = [x->i+sum(x) for i=1:m]
    localg = [x->i+sum(x) for i=1:n]

    # broadcast variables to all worker processes
    @sync begin
        for i in procs(localX)
            remotecall(i, x->(global X; X=x; nothing), localX)
            remotecall(i, x->(global Y; Y=x; nothing), localY)
            remotecall(i, x->(global f; f=x; nothing), localf)
            remotecall(i, x->(global g; g=x; nothing), localg)
        end
    end

    # compute
    for iteration=1:1
        @everywhere for i=localindexes(X)
            X[i] = f[i](Y)
        end
        @everywhere for j=localindexes(Y)
            Y[j] = g[j](X)
        end
    end
end

doparallelstuff()

Though I would have expected broadcast of variables to be possible with just 
@everywhere X=localX
and so on ....


Looks like @everywhere does not call localize_vars.  I don't know if this is by design or just an oversight. I would have expected it to do so. Will file an issue on github.


Amit Murthy

unread,
Nov 23, 2014, 12:58:14 AM11/23/14
to julia...@googlegroups.com

Madeleine Udell

unread,
Nov 23, 2014, 1:22:30 AM11/23/14
to julia...@googlegroups.com
Thanks! This is extremely helpful. 

Can you tell me more about what localize_vars does?

Amit Murthy

unread,
Nov 23, 2014, 1:30:49 AM11/23/14
to julia...@googlegroups.com
From the description of Base.localize_vars - 'wrap an expression in "let a=a,b=b,..." for each var it references'

Though that does not seem to the only(?) issue here.... 

Madeleine Udell

unread,
Nov 23, 2014, 1:43:57 AM11/23/14
to julia...@googlegroups.com
Yes, I read the code, but I'm not sure I understand what the let statement is doing. It's trying to redefine the scope of the variable, or create a new variable with the same value but over a different scope? How does the let statement interact with the namespaces of the various processes?

Amit Murthy

unread,
Nov 23, 2014, 3:03:36 AM11/23/14
to julia...@googlegroups.com
I mentioned  localize_vars() since it is one of the differences between the implementations of @everywhere and @spawnat. But, there is something else also going on that I don't understand.  

Amit Murthy

unread,
Nov 23, 2014, 10:26:28 PM11/23/14
to julia...@googlegroups.com
"global X; X=x" should probably be "global const X=x" and so on....

Blake Johnson

unread,
Nov 24, 2014, 1:24:15 PM11/24/14
to julia...@googlegroups.com
I use this macro to send variables to remote processes:

macro sendvar(proc, x)
    quote
        rr = RemoteRef()
        put!(rr, $x)
        remotecall($proc, (rr)->begin
            global $(esc(x))
            $(esc(x)) = fetch(rr)
        end, rr)
    end
end

Though the solution above looks a little simpler.

--Blake

Madeleine Udell

unread,
Dec 1, 2014, 6:28:30 PM12/1/14
to julia...@googlegroups.com
Thanks to Blake and Amit for some excellent suggestions! Both strategies work fine when embedded in functions, but not when those functions are embedded in modules. For example, the following throws an error: 

@everywhere include("ParallelStuff.jl")
@everywhere using ParallelStuff
doparallelstuff()

when ParallelStuff.jl contains the following code:

module ParallelStuff
export doparallelstuff

function doparallelstuff(m = 10, n = 20)
    # initialize variables
    localX = Base.shmem_rand(m; pids=procs())
    localY = Base.shmem_rand(n; pids=procs())
    localf = [x->i+sum(x) for i=1:m]
    localg = [x->i+sum(x) for i=1:n]

    # broadcast variables to all worker processes (thanks to Amit Murthy for suggesting this syntax)
    @sync begin
        for i in procs(localX)
            remotecall(i, x->(global const X=x; nothing), localX)
            remotecall(i, x->(global const Y=x; nothing), localY)
            remotecall(i, x->(global const f=x; nothing), localf)
            remotecall(i, x->(global const g=x; nothing), localg)
        end
    end

    # compute
    for iteration=1:1
        @everywhere for i=localindexes(X)
            X[i] = f[i](Y)
        end
        @everywhere for j=localindexes(Y)
            Y[j] = g[j](X)
        end
    end
end

end #module

On 3 processes (julia -p 3), the error is as follows:

exception on 1: exception on 2: exception on 3: ERROR: X not defined
 in anonymous at no file
 in eval at /Users/vagrant/tmp/julia-packaging/osx10.7+/julia-master/base/sysimg.jl:7
 in anonymous at multi.jl:1310
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6
ERROR: X not defined
 in anonymous at no file
 in eval at /Users/vagrant/tmp/julia-packaging/osx10.7+/julia-master/base/sysimg.jl:7
 in anonymous at multi.jl:1310
 in anonymous at multi.jl:848
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6
ERROR: X not defined
 in anonymous at no file
 in eval at /Users/vagrant/tmp/julia-packaging/osx10.7+/julia-master/base/sysimg.jl:7
 in anonymous at multi.jl:1310
 in anonymous at multi.jl:848
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6
exception on exception on 2: 1: ERROR: Y not defined
 in anonymous at no file
 in eval at /Users/vagrant/tmp/julia-packaging/osx10.7+/julia-master/base/sysimg.jl:7
 in anonymous at multi.jl:1310
 in anonymous at multi.jl:848
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6
ERROR: Y not defined
 in anonymous at no file
 in eval at /Users/vagrant/tmp/julia-packaging/osx10.7+/julia-master/base/sysimg.jl:7
 in anonymous at multi.jl:1310
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6
exception on 3: ERROR: Y not defined
 in anonymous at no file
 in eval at /Users/vagrant/tmp/julia-packaging/osx10.7+/julia-master/base/sysimg.jl:7
 in anonymous at multi.jl:1310
 in anonymous at multi.jl:848
 in run_work_thunk at multi.jl:621
 in run_work_thunk at multi.jl:630
 in anonymous at task.jl:6

For comparison, the non-modularized version works:

function doparallelstuff(m = 10, n = 20)
    # initialize variables
    localX = Base.shmem_rand(m; pids=procs())
    localY = Base.shmem_rand(n; pids=procs())
    localf = [x->i+sum(x) for i=1:m]
    localg = [x->i+sum(x) for i=1:n]

    # broadcast variables to all worker processes (thanks to Amit Murthy for suggesting this syntax)
    @sync begin
        for i in procs(localX)
            remotecall(i, x->(global const X=x; nothing), localX)
            remotecall(i, x->(global const Y=x; nothing), localY)
            remotecall(i, x->(global const f=x; nothing), localf)
            remotecall(i, x->(global const g=x; nothing), localg)
        end
    end

    # compute
    for iteration=1:1
        @everywhere for i=localindexes(X)
            X[i] = f[i](Y)
        end
        @everywhere for j=localindexes(Y)
            Y[j] = g[j](X)
        end
    end
end

doparallelstuff()

Amit Murthy

unread,
Dec 1, 2014, 11:34:53 PM12/1/14
to julia...@googlegroups.com
From the documentation - "Modules in Julia are separate global variable workspaces."

So what is happening is that the anonymous function in "remotecall(i, x->(global const X=x; nothing), localX)" creates X as module global. 

The following works:

module ParallelStuff
export doparallelstuff

function doparallelstuff()(m = 10, n = 20)
    # initialize variables
    localX = Base.shmem_rand(m; pids=procs())
    localY = Base.shmem_rand(n; pids=procs())
    localf = [x->i+sum(x) for i=1:m]
    localg = [x->i+sum(x) for i=1:n]

    # broadcast variables to all worker processes (thanks to Amit Murthy for suggesting this syntax)
    @sync begin
        for i in procs(localX)
            remotecall(i, x->(global X=x; nothing), localX)
            remotecall(i, x->(global Y=x; nothing), localY)
            remotecall(i, x->(global f=x; nothing), localf)
            remotecall(i, x->(global g=x; nothing), localg)
        end
    end

    # compute
    for iteration=1:1
        @everywhere begin
            X=ParallelStuff.X
            Y=ParallelStuff.Y
            f=ParallelStuff.f
            g=ParallelStuff.g
            for i=localindexes(X)
                X[i] = f[i](Y)
            end
            for j=localindexes(Y)
                Y[j] = g[j](X)
            end
        end
    end
end

end #module


While remotecall, @everywhere, etc run under Main, the fact that the closure variables refers to Module ParallelStuff is pretty confusing.....
I think we need a better way to handle this.

Amit Murthy

unread,
Dec 2, 2014, 12:21:33 AM12/2/14
to julia...@googlegroups.com

Madeleine Udell

unread,
Dec 11, 2014, 4:38:38 PM12/11/14
to julia...@googlegroups.com
Amit and Blake, thanks for all your advice. I've managed to cobble together a shared memory version of LowRankModels.jl, using the workarounds we devised above. In case you're interested, it's at


and you can run it using eg

julia -p 3 LowRankModels/examples/sharetest.jl

There's a significant overhead, but it's faster than the serial version for large problem sizes. Any advice for reducing the overhead would be much appreciated.

However, in that package I'm seeing some unexpected behavior: occasionally it seems that some of the processes have not finished their jobs at the end of an @everywhere block, although looking at the code for @everywhere I see it's wrapped in a @sync already. Is there something else I can use to synchronize (ie wait for completion of all) the processes?
Reply all
Reply to author
Forward
0 new messages