pmap - version to return reduced result only

324 views
Skip to first unread message

Greg Plowman

unread,
Jul 9, 2015, 9:23:44 PM7/9/15
to julia...@googlegroups.com
I have been using pmap for simulations and find it very useful and convenient.
However, sometimes I want to run a large number of trials where the results are also large. This requires a lot of memory to hold the returned results.
If I'm only interested the final, reduced result, and not concerned with the raw individual trial results, then returning entire array seems unnecessary.
I want to reduce on the fly, avoiding the need to keep all trial results.
I want to run more trials than workers for load balancing. (And possibly because I'm interested in summary results of individual trials, not the entire raw results).

With the help of the simplified version of pmap presented in the docs (http://julia.readthedocs.org/en/latest/manual/parallel-computing/), I have a tenuous understanding of how pmap works. Although the actual implementation scares me.
In any case, I was wondering before I progress further, whether a modified version of pmap could be designed to reduce on-the-fly.
Here are some modifications to the simplified, documentation version.
Would something like this work? I'm worried about the shared updates to final_result. Will these happen orderly? What else should I consider?


* function pmap(f, lst)

* function pmap_reduce(f, lst, reduce_fn)  # extra argument is reduce function
    np = nprocs()  # determine the number of processes available
    n = length(lst)


*  
results
= cell(n)
*   results = cell(np)  # hold results for currently executing procs only
*   final_result = cell(1)  # holds the final, reduced result

    i
= 1
    # function to produce the next work item from the queue.
    # in this case it's just an index.
    nextidx() = (idx=i; i+=1; idx)

    @sync begin
        for p=1:np
            if p != myid() || np == 1
                @async begin
                    while true
                        idx = nextidx()
                        if idx > n
                            break
                        end

*                       results[idx] = remotecall_fetch(p, f, lst[idx])
*                       results[p] = remotecall_fetch(p, f, lst[idx])  # return results into array indexed by proc
*                       reduce_fn(final_result, results[p])  # combine results[p] into final_result using reduction function
                    end
                end
            end
        end
    end

*   results
*   final_result  # return reduced result
end



Jameson Nash

unread,
Jul 9, 2015, 10:24:16 PM7/9/15
to julia...@googlegroups.com
this sounds like you may be looking for the `@parallel reduce_fn for itm = lst; f(itm); end` map-reducer construct (described on the same page)?

Greg Plowman

unread,
Jul 17, 2015, 12:12:04 AM7/17/15
to julia...@googlegroups.com
OK thanks.
I didn't consider @parallel (probably because I considered it for only large trials of small work units, whereas I considered pmap more suited to relatively small trials of longer running work units)
In any case, @parallel works fine.

Old pmap code skeleton:
trialCounts = pmap(MySimulation, [1:numTrials], fill(numIter, numTrials))
totalCounts
= sum(trialCounts)

New @parallel code
totalCounts = @parallel (+) for trial = 1:numTrials
   
MySimulation(trial, numIter)
end



However, I have 2 questions:

1. When I try to modify @parallel code to assign the result to a variable inside the loop, I get an error.
I don't understand the @parallel macro, but I'm guessing I can't assign to variable inside loop?
 
totalCounts = @parallel (+) for trial = 1:numTrials
    trialCount
= MySimulation(trial, numPlays)
   
print(trialCount) # or some other processing with trialCount
end



2. Again I don't @parallel macro but it seems to call preduce (see below), which seems to collect results in an array of size numTrials / nworkers().
If this is so, then memory requirement still has a dependency on the number of trials.
I was trying to limit the results array to the number of workers, independent of number of trials.
Is my understanding here correct?

function preduce(reducer, f, N::Int)
    chunks
= splitrange(N, nworkers())
    results
= cell(length(chunks))
   
for i in 1:length(chunks)
        results
[i] = @spawn f(first(chunks[i]), last(chunks[i]))
   
end
    mapreduce
(fetch, reducer, results)
end



Greg

Jameson Nash

unread,
Jul 17, 2015, 12:48:44 AM7/17/15
to julia...@googlegroups.com
i believe that length(chunks) will be <= nworkers()

the last statement of the for loop should be the "return" value from that iteration. (for example: the variable name `trialCount`).

Greg Plowman

unread,
Jul 17, 2015, 1:09:03 AM7/17/15
to julia...@googlegroups.com
Thanks Jameson.

My error was from the line: trialCounts = MySimulation(trial, numIter)
Error message: trialCounts not defined

This had something to do with the variable name (used later, perhaps if statements guarding scope and now local)
In any case, if I change variable name, it works: counts = MySimulation(trial, numIter)


Thanks again for your help.

Greg

...

Greg Plowman

unread,
Aug 28, 2015, 12:40:33 AM8/28/15
to julia-users
Firstly, I hope it's OK to revive this thread.

@parallel doesn't do dynamic load balancing.
So I'm still interested in a pmap_reduce() or equivalent which does dynamic allocation to workers AND reduces on the fly.

Just to be clear:
The "chunks" of work given to each worker might vary in execution time.
The workers might vary in their execution speed.

It's difficult to optimally split job into chunks to assign to workers upfront.

Although pmap does assign chunks of work dynamically, it's still not optimal if the number of chunks is of the same order of the number of workers.
It's very possible a slow worker will be left running a long "chunk". Or simply that only some workers will be required to finish any remaining chunks, leaving other workers idle.

It seems more granularity may help. i.e. Splitting job into many chunks so that #chunks >> #workers.
But then pmap becomes unviable because it returns an array of results for each chunk, which requires too much memory.
I don't need the intermediate results, so I could reduce on the fly.

Hence my interest in a version of pmap which returns the final reduced result only.
Is it possible to comment on my original question about a modified pmap?

Or have I misunderstood pmap &/or @parallel? Or are there other options?

Thanks, Greg
...

Tim Holy

unread,
Aug 28, 2015, 4:00:02 AM8/28/15
to julia...@googlegroups.com
You may find the "more extended and complex" example informative:
http://docs.julialang.org/en/latest/manual/parallel-computing/#shared-arrays

You'd need to set something up more complex for load-balancing, but, e.g.,
`assignment = SharedArray(Int, 2, nworkers())` would be a nice tool (worker i
takes a chunk of indexes from assignment[1:i]:assignment[2:i]).

--Tim

David van Leeuwen

unread,
Sep 2, 2015, 3:30:34 AM9/2/15
to julia-users
Hi Greg,


On Friday, August 28, 2015 at 6:40:33 AM UTC+2, Greg Plowman wrote:
Firstly, I hope it's OK to revive this thread.

@parallel doesn't do dynamic load balancing.
So I'm still interested in a pmap_reduce() or equivalent which does dynamic allocation to workers AND reduces on the fly.

Just to be clear:
The "chunks" of work given to each worker might vary in execution time.
The workers might vary in their execution speed.

It's difficult to optimally split job into chunks to assign to workers upfront.

I had the same demands for my implementation in GaussianMixtures, training parallel on large amounts of data in chunks of some natural heterogeneous size, on a cluster with heterogeneous CPU power per node. 

If you have a look at https://github.com/davidavdav/GaussianMixtures.jl/blob/master/src/data.jl#L101-136 there is a function `dmapreduce` that works on chucks of data of type Data.  Data is basically a Vector{Matrix}, but the Matrix can be loaded from disc in my case.  The length of the vector---the number of chunks---can be very large, but in reducing there are never more than the number of workers results stored.  
 
Although pmap does assign chunks of work dynamically, it's still not optimal if the number of chunks is of the same order of the number of workers.
It's very possible a slow worker will be left running a long "chunk". Or simply that only some workers will be required to finish any remaining chunks, leaving other workers idle.

It seems more granularity may help. i.e. Splitting job into many chunks so that #chunks >> #workers.
But then pmap becomes unviable because it returns an array of results for each chunk, which requires too much memory.

This is exactly the reason why I reduce in two places: 
 - in the @async loop there is a reduced for each worker
 - there is a reducer at the end. 

before, I was running out of memory on a large machine (128G) while reducing a huge list of chunks, for which the results themselves (a tuple of matrices) were quite sizeable.  Now with the `dmapreduce()` the reduction size is limited by the number of workers, and not by the number of chunks (the data). 

Perhaps you can adapt the code a bit for your own purpose, or, if you have your data chunks on disk in a matrix-like format, you can even use the Data type.  

Cheers, 

---david
 
...

Greg Plowman

unread,
Sep 2, 2015, 11:14:35 PM9/2/15
to julia-users
Hi David,

Thanks for your reply.
This seems to be exactly what I had in mind (as per my original post).


My original concern was whether shared updates to a common variable within `@async` would happen in an orderly way.
In my case, its actually quite convenient to return results for each worker.
Just curious as to why you reduce for each worker and then reduce the workers at the end. Why not reduce into a final result inside the `@async` loop?
That way, if you scale up the number workers, you don't need to keep a data chunk for each worker, just one total reduced result.

Coincidently, I read a reply to a query about "threads and processes" by @stevengj yesterday which helped me understand parallel concepts a little better, but I'm still unsure about some stuff.
Apparently, `@async` runs in green threads which are cooperative threads? Does this mean switching between threads only occurs when current thread blocks for I/O? As opposed to operating system threads which are pre-emptive meaning switching could happen at any time in thread execution?
Since `pmap()` uses `@async`, does this mean switching between threads only occurs at `remotecall_fetch()`? In which case it's safe to reduce to a single variable inside `@async`? Is this correct? Presumably this wouldn't be safe with pre-emptive threads? 


The actual pmap() code seems to be made more robust than the simplified version in the documentation. It seems to handle errors, etc.
My modified version of pmap_reduce(), returns an array of reduced results for each worker.
The modifications were surprising simple (although I don't really understand the error recovery part so I'm not sure what to do in case of error)
In any case here it is:

#function pmap(f, lsts...; err_retry=true, err_stop=false)
function pmap_reduce(f, op, lsts...; err_retry=true, err_stop=false)

    len = length(lsts)

    results = Dict{Int,Any}()

    retryqueue = {}
    task_in_err = false
    is_task_in_error() = task_in_err
    set_task_in_error() = (task_in_err = true)

    nextidx = 0
    getnextidx() = (nextidx += 1)

    states = [start(lsts[idx]) for idx in 1:len]
    function getnext_tasklet()
        if is_task_in_error() && err_stop
            return nothing
        elseif !any(idx->done(lsts[idx],states[idx]), 1:len)
            nxts = [next(lsts[idx],states[idx]) for idx in 1:len]
            for idx in 1:len; states[idx] = nxts[idx][2]; end
            nxtvals = [x[1] for x in nxts]
            return (getnextidx(), nxtvals)
        elseif !isempty(retryqueue)
            return shift!(retryqueue)
        else
            return nothing
        end
    end

    @sync begin
        for wpid in workers()
            @async begin
                tasklet = getnext_tasklet()
                while (tasklet != nothing)
                    (idx, fvals) = tasklet
                    try
                        result = remotecall_fetch(wpid, f, fvals...)
                        if isa(result, Exception)
                            ((wpid == myid()) ? rethrow(result) : throw(result))
                        else
                            #results[idx] = result
                            if haskey(results, wpid)
                                results[wpid] = op(results[wpid], result)
                            else
                                results[wpid] = result
                            end
                        end
                    catch ex
                        if err_retry
                            push!(retryqueue, (idx, fvals, ex))
                        else
                            #results[idx] = ex
                            # Not really sure what to do here
                            results[0] = ex
                        end
                        set_task_in_error()
                        break # remove this worker from accepting any more tasks
                    end
                    tasklet = getnext_tasklet()
                end
            end
        end
    end

    for failure in retryqueue
        #results[failure[1]] = failure[3]
        # Not really sure what to do here
        results[0] = failure[3]
    end

    #[results[x] for x in 1:nextidx]
    [ results[x] for x in sort(collect(keys(results))) ]
end

Reply all
Reply to author
Forward
0 new messages