* function pmap(f, lst)
* function pmap_reduce(f, lst, reduce_fn) # extra argument is reduce function
np = nprocs() # determine the number of processes available
n = length(lst)
* results = cell(n)
* results = cell(np) # hold results for currently executing procs only
* final_result = cell(1) # holds the final, reduced result
i = 1
# function to produce the next work item from the queue.
# in this case it's just an index.
nextidx() = (idx=i; i+=1; idx)
@sync begin
for p=1:np
if p != myid() || np == 1
@async begin
while true
idx = nextidx()
if idx > n
break
end
* results[idx] = remotecall_fetch(p, f, lst[idx])
* results[p] = remotecall_fetch(p, f, lst[idx]) # return results into array indexed by proc
* reduce_fn(final_result, results[p]) # combine results[p] into final_result using reduction function
end
end
end
end
end
* results
* final_result # return reduced result
end
trialCounts = pmap(MySimulation, [1:numTrials], fill(numIter, numTrials))
totalCounts = sum(trialCounts)totalCounts = @parallel (+) for trial = 1:numTrials
MySimulation(trial, numIter)
endtotalCounts = @parallel (+) for trial = 1:numTrials
trialCount = MySimulation(trial, numPlays)
print(trialCount) # or some other processing with trialCount
endfunction preduce(reducer, f, N::Int)
chunks = splitrange(N, nworkers())
results = cell(length(chunks))
for i in 1:length(chunks)
results[i] = @spawn f(first(chunks[i]), last(chunks[i]))
end
mapreduce(fetch, reducer, results)
end
...
...
Firstly, I hope it's OK to revive this thread.@parallel doesn't do dynamic load balancing.So I'm still interested in a pmap_reduce() or equivalent which does dynamic allocation to workers AND reduces on the fly.Just to be clear:The "chunks" of work given to each worker might vary in execution time.The workers might vary in their execution speed.It's difficult to optimally split job into chunks to assign to workers upfront.
Although pmap does assign chunks of work dynamically, it's still not optimal if the number of chunks is of the same order of the number of workers.It's very possible a slow worker will be left running a long "chunk". Or simply that only some workers will be required to finish any remaining chunks, leaving other workers idle.It seems more granularity may help. i.e. Splitting job into many chunks so that #chunks >> #workers.But then pmap becomes unviable because it returns an array of results for each chunk, which requires too much memory.
...
#function pmap(f, lsts...; err_retry=true, err_stop=false)
function pmap_reduce(f, op, lsts...; err_retry=true, err_stop=false)
len = length(lsts)
results = Dict{Int,Any}()
retryqueue = {}
task_in_err = false
is_task_in_error() = task_in_err
set_task_in_error() = (task_in_err = true)
nextidx = 0
getnextidx() = (nextidx += 1)
states = [start(lsts[idx]) for idx in 1:len]
function getnext_tasklet()
if is_task_in_error() && err_stop
return nothing
elseif !any(idx->done(lsts[idx],states[idx]), 1:len)
nxts = [next(lsts[idx],states[idx]) for idx in 1:len]
for idx in 1:len; states[idx] = nxts[idx][2]; end
nxtvals = [x[1] for x in nxts]
return (getnextidx(), nxtvals)
elseif !isempty(retryqueue)
return shift!(retryqueue)
else
return nothing
end
end
@sync begin
for wpid in workers()
@async begin
tasklet = getnext_tasklet()
while (tasklet != nothing)
(idx, fvals) = tasklet
try
result = remotecall_fetch(wpid, f, fvals...)
if isa(result, Exception)
((wpid == myid()) ? rethrow(result) : throw(result))
else
#results[idx] = result
if haskey(results, wpid)
results[wpid] = op(results[wpid], result)
else
results[wpid] = result
end
end
catch ex
if err_retry
push!(retryqueue, (idx, fvals, ex))
else
#results[idx] = ex
# Not really sure what to do here
results[0] = ex
end
set_task_in_error()
break # remove this worker from accepting any more tasks
end tasklet = getnext_tasklet()
end
end
end
end
for failure in retryqueue
#results[failure[1]] = failure[3]
# Not really sure what to do here
results[0] = failure[3]
end
#[results[x] for x in 1:nextidx]
[ results[x] for x in sort(collect(keys(results))) ]
end