Producer with multiple Consumers as worker threads?

262 views
Skip to first unread message

Duane Johnson

unread,
Oct 15, 2014, 12:36:17 PM10/15/14
to julia...@googlegroups.com
I'm attempting the following producer/consumer pattern, but I'm trying to give the work to separate processes to maximize CPU resources:

function file_producer(library_root)
  files
(library_root) do f
    produce
(f)
 
end
  produce
(END_FILE_LIST)
end


@parallel for f in Task(file_producer)
  println
(myid(), f)
end


When I get to the @parallel part, it fails:

ERROR: `length` has no method matching length(::Task)
 
in include at /Applications/Julia-0.3.1.app/Contents/Resources/julia/lib/julia/sys.dylib
 
in include_from_node1 at loading.jl:128
 
in process_options at /Applications/Julia-0.3.1.app/Contents/Resources/julia/lib/julia/sys.dylib
 
in _start at /Applications/Julia-0.3.1.app/Contents/Resources/julia/lib/julia/sys.dylib (repeats 2 times)
while loading /Users/duane/Dropbox/Projects/wordtree/TextGrams/baseline.jl, in expression starting on line 1488


Is there an easy way to let multiple processes consume work like this?

Duane Johnson

unread,
Oct 15, 2014, 1:50:18 PM10/15/14
to julia...@googlegroups.com
I'm new to Julia, so I'm not 100% confident on this solution, but here's what I came up with:

@everywhere function printfile(file)
  println
(myid(), " ", file)
end


np
= nprocs()
@sync begin
  producer
= @task file_producer(library_root)
 
for p = 1:np
   
if p != myid() || np == 1
     
@async begin
       
for file in producer
          result
= remotecall_fetch(p, printfile, file)
       
end
     
end
   
end
 
end
end

Steven G. Johnson

unread,
Oct 15, 2014, 3:21:38 PM10/15/14
to julia...@googlegroups.com
There is an example in the manual of how to create a parallel work queue:

http://docs.julialang.org/en/latest/manual/parallel-computing/#scheduling

(It would be nice to have cleaner built-in support for work queues and work stealing [the advantage of work stealing is that it scales better because there isn't a single queue as a bottleneck].)


Duane Johnson

unread,
Oct 15, 2014, 4:56:32 PM10/15/14
to julia...@googlegroups.com
The problem with the example in the manual is that you can't use a "producer" for the parallel map (pmap assumes the input has a fixed length).

In my case, I would like to produce a list of files that may be very large (300k+) and I'd prefer not to wait until all filenames have been recursively discovered from disk before starting the work.

Duane Johnson

unread,
Oct 15, 2014, 5:50:49 PM10/15/14
to julia...@googlegroups.com
I'm trying to generalize my solution as a new `pmap` function:

function pmap(fn::Function, producer::Task)
  results
= []

  np
= nprocs()
 
@sync begin

   
for p = 1:np
     
if p != myid() || np == 1
       
@async begin

         
for x in producer
            r
= remotecall_fetch(p, fn, x)
            push
!(results, r)

         
end
       
end
     
end
   
end
 
end

 
return results
end


However, I get the following error:

ERROR: [] cannot grow. Instead, initialize the array with "T[]", where T is the desired element type.

I'm not sure how to pull the type of the producer out of `task`, since it doesn't have a return type. Am I bumping in to this issue (github 1090) [1]?

Is there a workaround?


On Wednesday, October 15, 2014 2:56:32 PM UTC-6, Duane Johnson wrote:
The problem with the example in the manual is that you can't use a "producer" for the parallel map (pmap assumes the input has a fixed length).

In my case, I would like to produce a list of files that may be very large (300k+) and I'd prefer not to wait until all filenames have been recursively discovered from disk before starting the work.

Amit Murthy

unread,
Oct 15, 2014, 9:47:35 PM10/15/14
to julia...@googlegroups.com
Steven,
This https://github.com/JuliaLang/julia/pull/8507 should help when merged. A RemoteChannel can act as a work queue across workers.

Duane.
Till support for channels are added to Base, you can check out and use https://github.com/amitmurthy/MessageUtils.jl 

 
Reply all
Reply to author
Forward
0 new messages