base/multi.jl: handling of multiple workers

473 views
Skip to first unread message

Peter Kourzanov

unread,
Nov 13, 2014, 9:22:13 AM11/13/14
to juli...@googlegroups.com
Hi,

I've been trying to run Julia on a local LSF cluster in a most optimal way. That is: minimizing the number of jobs submitted with LSF's "bsub",
as our scheduler is a bit slow and flakey. What I've discovered in base/multi.jl: read_worker_host_port() s that each addproc in Julia expects
at most one worker to be started up. 

Isn't this a limitation? It would be more natural to submit just one job (through SSH, LSF etc.) that spawns several workers, each of which is
added to the Julia's pool, IMHO.

I have tried to fit this into the framework provided by https://github.com/JuliaLang/ClusterManagers.jl and failed, because the Julia's base/multi.jl
assumes one worker per addproc. So I partially cloned the spawning bit of base/multi.jl into a separate module which does handle the wanted case. 
However, I needed to extend base/exports.jl to include "create_worker", "add_worker" and "PGRP" exports to make this work.

The resulting module is here: https://gist.github.com/kourzanov/1da4ed55edb9739ea632, although there is not so much stuff specific to LSF.

Does it make sense to change Julia's implementation to automatically spawn the number of workers (as given in command line with -p N --worker) 
and then print connection info for each one (preceded by the printing of N, optionally), in addition to generalizing the read_cb_response(), 
read_worker_host_port() and start_cluster_workers() as indicated in the Gist (https://gist.github.com/kourzanov/1da4ed55edb9739ea632)?

Note that this makes sense also for the general SSH access method, or well as for the others from ClusterManagers.

Kind regards,
Peter

Amit Murthy

unread,
Nov 13, 2014, 10:25:34 PM11/13/14
to juli...@googlegroups.com
Command line  -p N , internally just results in an addprocs(N). And addprocs(N) does start N workers. read_worker_host_port() is called for every worker started.

Peter Kourzanov

unread,
Nov 14, 2014, 4:00:38 AM11/14/14
to juli...@googlegroups.com
On Friday, November 14, 2014 4:25:34 AM UTC+1, Amit Murthy wrote:
Command line  -p N , internally just results in an addprocs(N). And addprocs(N) does start N workers. read_worker_host_port() is called for every worker started.

Yes, but they will be local ones. What I want is to start N remote workers I can use from another machine. 

Amit Murthy

unread,
Nov 14, 2014, 4:21:06 AM11/14/14
to juli...@googlegroups.com
addprocs(["host1", "host2","host3"]) will start 3 workers, one on each of host1, host2 and host3
addprocs(["host1", "host1","host1"]) will start 3 workers, all of them on host1


Peter Kourzanov

unread,
Nov 14, 2014, 6:55:55 AM11/14/14
to juli...@googlegroups.com
Sure. But what I want for addprocs(["host1_with_20_cores"]) is to start 20 workers, not one.

Amit Murthy

unread,
Nov 14, 2014, 8:14:41 AM11/14/14
to juli...@googlegroups.com
addprocs(fill("host1", 20)) will do the trick.

For detection of the number of cores on host1, you could use the nproc command on Linux:

io, pobj = open(detach(`ssh $sshflags $(hostuser)@$(hostname) nproc`))
ncores = parseint(readall(io))
addprocs(fill(hostname, ncores))

sshflags, hostuser and hostname are arguments appropriate to build the ssh command.

Amit Murthy

unread,
Nov 14, 2014, 8:20:13 AM11/14/14
to juli...@googlegroups.com
This may actually be a good feature to support in the in-built SSHManager. A new keyword argument to addprocs, say "detect_cores", which if true, will result in n workers on each specified host where n is the number of cores on that particular host.

Peter Kourzanov

unread,
Nov 14, 2014, 8:41:51 AM11/14/14
to juli...@googlegroups.com
Its exactly what I am arguing for. For the most efficient implementation you don't want to spawn N*ssh (or LSF, or SGE) sessions to start N workers on 1 host. Some of the cluster systems severely limit the number of jobs anyway.

It requires extensions to --workers handling, and to the code that processes worker registrations (see the patch to multi.jl in the Gist at the beginning of this thread).

Shall I create a PR for this?

Amit Murthy

unread,
Nov 14, 2014, 8:58:48 AM11/14/14
to juli...@googlegroups.com
Is each ssh session counted as a job? Or is it each ssh connection.

If it is the latter, then use of ssh flags to multiplex sessions over the same socket connection, something like, `-o ControlMaster auto -o ControlPath [path] -o ControlPersist 5` should also work.

I'll take a look at the gist in a while.

Peter Kourzanov

unread,
Nov 14, 2014, 10:05:03 AM11/14/14
to juli...@googlegroups.com
What I am doing right now is the following sequence to start N workers using 1 single ssh session and 1 single LSF job:
  1. LSF.addprocs(["queue"],dir="julia")
  2. ssh from client to a LSF access machine (and run a "bjulia LSF" script)
  3. submit a LSF job to start Julia (which executes a "run" bash script)
  4. "run" determines the number of CPUs on the scheduled host and starts N worker processes
  5. Julia from the client waits for the printing of N (done by "run", from the ssh stream) and then collects the conn_info (printed by each worker, from the same stream)
  6. The connections for client Julia processes go directly, hopefully bypassing the original ssh stream (tunnel=false)
What would make sense is to integrate (4) into julia --worker handling (triggering on -p N --worker combination) and change base/multi.jl to do (5)
Then there would be no need to have a "run" script and all ClusterManagers get the benefit of more parallelization (at no additional cost of extra ssh sessions, LSF jobs etc.)

Erik Schnetter

unread,
Nov 14, 2014, 10:11:03 AM11/14/14
to juli...@googlegroups.com
On Fri, Nov 14, 2014 at 10:05 AM, Peter Kourzanov
<peter.k...@gmail.com> wrote:
> What I am doing right now is the following sequence to start N workers using
> 1 single ssh session and 1 single LSF job:
>
> LSF.addprocs(["queue"],dir="julia")
> ssh from client to a LSF access machine (and run a "bjulia LSF" script)
> submit a LSF job to start Julia (which executes a "run" bash script)
> "run" determines the number of CPUs on the scheduled host and starts N
> worker processes
> Julia from the client waits for the printing of N (done by "run", from the
> ssh stream) and then collects the conn_info (printed by each worker, from
> the same stream)
> The connections for client Julia processes go directly, hopefully bypassing
> the original ssh stream (tunnel=false)
>
> What would make sense is to integrate (4) into julia --worker handling
> (triggering on -p N --worker combination)

In many cases, the number of hardware cores on the system is not the
same as the number of Julia workers one wants to start. With
hyperthreading, there's an obvious factor of two one may want to use,
and with queuing systems such as LSF, other jobs may be scheduled on
the same node. In the case of LSF, there's an environment variable set
that specifies how many processes LSF says should be running on the
node.

Given that this is LSF specific, moving this into Julia may be
difficult, because Julia would a priori not know at which variable to
look. Maybe a Julia command line option like "julia -p
'"$(env['LSF_NUM_PROCS'])"'" would make sense? This option would
automatically be added by the "bjulia LSF" script.

-erik

> and change base/multi.jl to do (5)
> Then there would be no need to have a "run" script and all ClusterManagers
> get the benefit of more parallelization (at no additional cost of extra ssh
> sessions, LSF jobs etc.)
>
> On Friday, November 14, 2014 2:58:48 PM UTC+1, Amit Murthy wrote:
>>
>> Is each ssh session counted as a job? Or is it each ssh connection.
>>
>> If it is the latter, then use of ssh flags to multiplex sessions over the
>> same socket connection, something like, `-o ControlMaster auto -o
>> ControlPath [path] -o ControlPersist 5` should also work.
>>
>> I'll take a look at the gist in a while.
>>
>>
>> On Fri, Nov 14, 2014 at 7:11 PM, Peter Kourzanov <peter.k...@gmail.com>
>> wrote:
>>>
>>> Its exactly what I am arguing for. For the most efficient implementation
>>> you don't want to spawn N*ssh (or LSF, or SGE) sessions to start N workers
>>> on 1 host. Some of the cluster systems severely limit the number of jobs
>>> anyway.
>>>
>>> It requires extensions to --workers handling, and to the code that
>>> processes worker registrations (see the patch to multi.jl in the Gist at the
>>> beginning of this thread).
>>>
>>> Shall I create a PR for this?
>>
>>
>



--
Erik Schnetter <schn...@cct.lsu.edu>
http://www.perimeterinstitute.ca/personal/eschnetter/

Peter Kourzanov

unread,
Nov 14, 2014, 10:25:26 AM11/14/14
to juli...@googlegroups.com
Sure, in a "julia" folder there would be a script called "julia" which would run bin/julia with correct system-specific -p value.

Amit Murthy

unread,
Nov 14, 2014, 2:12:05 PM11/14/14
to juli...@googlegroups.com
It does not seem like a good idea to have all the workers write their host:port information to the same stream.

Another option, without tinkering with core Julia, would be to use the ability to specify port numbers for the workers (merged here https://github.com/JuliaLang/julia/pull/8513),
 to write a simple ClusterManager along the lines of those in ClusterManager.jl.

The changes in your environment would be:

- The "run" bash script should decide on a range of free ports of length N
- "run" starts each julia worker with the arguments "--bind-to host:port --worker" where host is the node name / ip-address,  and port is a unique port from the range for the worker instance. 
- "launch" function in your ClusterManager will return tuples of (host, port, config), i.e. no io object.
- you may need to put in a small delay before writing out the tuples to ensure that the workers have started listening on the specified ports.

You do lose the ability to see writes to STDOUT/STDERR from the workers in the master process though. Anyways, if they are getting mixed up on same output stream, they would be garbled...



   


 

Peter Kourzanov

unread,
Nov 14, 2014, 3:10:42 PM11/14/14
to juli...@googlegroups.com
Why not? The proposed method of explicit assignment of ports to workers sounds even more complex, IMHO. 

On the other hand, having the "run" script spawn a new worker only after the previous one has written
out the conn_info seems to solve all the problems of concurrent writes to the same stream. Or the new
code in start_worker() could simply start all workers one-by-one (assuming -p was given), recording the 
ports assigned by the system, and at the end print them all out in one sweep.

Other than that I see no problem in having garbled worker outputs (aside from the case of low-level debugging).

Amit Murthy

unread,
Nov 15, 2014, 12:49:26 AM11/15/14
to juli...@googlegroups.com
`julia -p N --worker` will not work since `julia -p N` assumes that it is the master and it starts more workers using the inbuilt LocalManager. Currently only the master process, pid 1, can add workers, workers cannot add more workers.   

Your model of the "run" script spawning workers sequentially will work, but there are a couple of things we need to watch out for. If you are using Julia nightlies, the startup time will be a couple of seconds for each worker. These will quickly add up if you are starting tens of workers on a node. Even if you using a precompiled version, sequential startup of workers will introduce a cumulative delay to addprocs . This is the main reason why the launch of workers has been parallelized.

Your bash script could start the workers in parallel and have their output streams redirected to different local files? You could then read the files, collect the host/port info and send them back to the launch function in your cluster manager. The launch function can then parse this output and return just tuples of (host, port, config).   
 

Peter Kourzanov

unread,
Nov 15, 2014, 5:31:45 AM11/15/14
to juli...@googlegroups.com
Hi Amit,

A better idea still is to change the design to "orphan" the locally started workers back to the originator through the "parent" worker. 
That is: collect port#ip info from local workers started from launch() via a modification to the start_cluster_workers()
and then changing the start_worker() to also dump the collected conn_info for locally started "orphan" workers. 

This is quite a versatile solution, I guess. In principle one could make a tree a workers this way, across different types of clusters by generalizing --machinefile to also use also other ClusterManagers (and appropriately scaling check_master_connect value, of course) ...

What do you think? Shall I do a PR for this (along with the extension for multiple read_cb_responses() which I still think is necessary
to make this idea work in a scalable fashion)?

On Saturday, November 15, 2014 6:49:26 AM UTC+1, Amit Murthy wrote:
`julia -p N --worker` will not work since `julia -p N` assumes that it is the master and it starts more workers using the inbuilt LocalManager. Currently only the master process, pid 1, can add workers, workers cannot add more workers.   

In fact, I tried running julia -p N --worker before and it does indeed start remotely unusable local workers. So it might make sense to refactor the code in base/client.jl to recognize this case and start those according to the scheme above: have addprocs() also return a list of port#host pairs for spawned workers and then have start_worker() also dump those along with the "parent" conn_info.

Your model of the "run" script spawning workers sequentially will work, but there are a couple of things we need to watch out for. If you are using Julia nightlies, the startup time will be a couple of seconds for each worker. These will quickly add up if you are starting tens of workers on a node. Even if you using a precompiled version, sequential startup of workers will introduce a cumulative delay to addprocs . This is the main reason why the launch of workers has been parallelized.

OK, good to know. Even without synchronization, my runs from the last couple of days seemed not to trigger this while starting 20 workers in parallel with each addprocs(). But I have to admit I am using the Git master, not nighlies.
 
Your bash script could start the workers in parallel and have their output streams redirected to different local files? You could then read the files, collect the host/port info and send them back to the launch function in your cluster manager. The launch function can then parse this output and return just tuples of (host, port, config).   

In the end I would also like to avoid relying on bash scripts altogether and do everything from Julia;-)

Amit Murthy

unread,
Nov 15, 2014, 6:41:22 AM11/15/14
to juli...@googlegroups.com
Yes, a PR is fine. The discussion can then continue on github.

I am also working on a PR that makes it possible to use alternate transports itself. It has quite a bit of changes to multi.jl.

Though it is still a WIP, I'll put it up over the next couple of days. If you want (and can wait), we can collaborate on that PR and incorporate additional requirements in core Julia for

1. Auto detecting cores on a node and launching workers appropriately
2. A "local coordinator" Julia worker that is responsible for launching additional local workers and passing connection info back to the master.

We may still need to write a simpler custom LSFManager.   
  

Erik Schnetter

unread,
Nov 15, 2014, 9:23:01 PM11/15/14
to juli...@googlegroups.com
On Sat, Nov 15, 2014 at 6:41 AM, Amit Murthy <amit....@gmail.com> wrote:
> Yes, a PR is fine. The discussion can then continue on github.
>
> I am also working on a PR that makes it possible to use alternate transports
> itself. It has quite a bit of changes to multi.jl.
>
> Though it is still a WIP, I'll put it up over the next couple of days. If
> you want (and can wait), we can collaborate on that PR and incorporate
> additional requirements in core Julia for
>
> 1. Auto detecting cores on a node and launching workers appropriately

I am currently wrapping the "hwloc" library for use with Julia. This
is a portable library that lets you find out how many cores are on a
node <https://github.com/eschnett/hwloc.jl>. It's not finished yet.

However -- with LSF, you shouldn't need to find out how many cores
there are; LSF should provide you that information.

-erik

Amit Murthy

unread,
Nov 15, 2014, 9:34:20 PM11/15/14
to juli...@googlegroups.com
addprocs today has 2 signatures, namely:

addprocs(np; manager=SomeClusterManager)   # default is LocalManager
addprocs(hosts::Array)

In both cases we expect to the caller to define the number of workers requested - either np or length(hosts).

Do we need a method where the user just specifies the nodes, and the actual number of workers started will depend on the cores available?  





Peter Kourzanov

unread,
Nov 16, 2014, 6:53:30 AM11/16/14
to juli...@googlegroups.com
Yes, that is my assumption. With LSF (and likely with other cluster systems too) one may not know on which host the job will run and how many cores that host might have (available). I am also imagining a scenario where one might SSH to a host which is setup to forward the job to another (type-of) a cluster via a customized machine file.  

I am currently collecting descendant worker conn_info entries, and print "ncpus:$(1+length(conn_info))" before printing current 
worker conn_info entries. With :orphan_workers one can inhibit connection of the master worker to the slave workers when 
started with -p N --worker (for now there is no support of -e, -E, -P, -L or  -F in conjunction with -p N --worker).

You can peek at the result here: https://github.com/kourzanov/julia

Peter Kourzanov

unread,
Nov 16, 2014, 7:12:34 AM11/16/14
to juli...@googlegroups.com
The only problem remaining (I think) is when the final worker is not accessible from the originator machine. For this case we 
need to setup SSH forwarding (using original SSH invocation) via "access" host to a worker whose port is yet unknown. We could have the originator setup a new forward over the original SSH connection, multiplexed. Or we might do an extra self-SSH-tunnel session although I think that shall also be not so efficient. 

Alternatively, do you think forwarding messages with a kind-of-a NAT using Julia itself should not be a problem?

Peter Kourzanov

unread,
Nov 16, 2014, 7:22:09 AM11/16/14
to juli...@googlegroups.com
Yes, I guess this is the only place where we would need to put a shell script for customizing for LSF.
The number of cores is not the only problem: some schedulers are non-deterministic - if you submit
a job it might get cancelled, killed and become zombie or stay for much longer time in the queue.

Amit Murthy

unread,
Nov 18, 2014, 3:36:46 AM11/18/14
to juli...@googlegroups.com
Please have a look at https://github.com/JuliaLang/julia/pull/9046 

It does not address the concerns mentioned here, but it would be great if we could take care of them by adding/modifying the ClusterManager interface as required.

As for your comment on SSH forwarding, if tunnel=true is specified at addprocs, it does access the worker via a new SSH connection using the same ssh args as the original invocation. 

For the cancelled/killed/delayed case of the job in the scheduler queue, we could add a timeout keyword argument to addprocs. The workers anyways exit on their own if the master process does not connect within 60 seconds.

Peter Kourzanov

unread,
Nov 18, 2014, 4:53:56 AM11/18/14
to juli...@googlegroups.com
Great:-) Thanks! In the meanwhile I forked Julia and adapted client.jl and multi.jl addressing the issues above. I have also added a few extra options to addprocs()
for :shell, :shellargs (on Ubuntu, default settings gave problems since sh=dash!=bash), and :orphan_workers. Will look into your version and integrate, if needed.

I have discovered the :tunnel option, but I think that is not sufficient, since in some of my cases I need to SSH twice to get to a machine from where I can submit jobs 
(a bit unfortunate, but still...). So we might need to extend the system for pubhost, as well as ssh_tunnel() in multi.jl to include a vector of access hosts through which
to SSH and forward ports.

For LSF, the script is trying to resubmit a job for a few times in the hope of getting it scheduled. I am fine with keeping that ugly stuff in a separate script.
Reply all
Reply to author
Forward
0 new messages