Re: Confusion over simple example

43 views
Skip to first unread message

Luis Pedro Coelho

unread,
Nov 13, 2012, 6:28:45 AM11/13/12
to jug-...@googlegroups.com
Hi,

Let me try to see if I can help:

> trials = []
>
> @TaskGenerator
> def runACO(arg,ants,iterations):
> cmd = "/home/hamiltont/AntHybrid --%s -m %d -i %d" % \
> (arg, ants, iterations)
> import shlex, subprocess
> cargs = shlex.split(cmd)
> output,error = subprocess.Popen(cargs,
> stdout=subprocess.PIPE,
> stderr=subprocess.PIPE).communicate()
> return result

This looks good

> for file in os.dirlist(data_files):
> # Many, many for loops later to create arg
> # ...
> #
> results = [runACO(arg,ants,iterations) for i in range(0,rounds)]

OK so far.



The following needs to go into Tasks too:
> # I need to do this for each result
> score = result.split(",")[0]
> time = result.split(",")[1]
> scores.append(float(score))
> times.append(float(time))
>
> # This is my main issue - I only want to store the median values for each
> run of 0...rounds
> score_best = sorted(scores)[:-1][0]
> score_median = getMedian(scores)
> time_median = getMedian(times)
>
> trials.append({'best':score_best,'median':score_median,'ants':ants,'iter':it
> erations % Omitted rest % })


@TaskGenerator:
def summarize(results, ants)
score = result.split(",")[0]
time = result.split(",")[1]
scores.append(float(score))
times.append(float(time))
score_best = sorted(scores)[:-1][0]
score_median = getMedian(scores)
time_median = getMedian(times)
return {'best':score_best,'median':score_median,'ants':ants}


trials = [summarize(r, ants) for r in results]

*

Of course, if you really don't care to have all the results, then this might
be part of runACO:

### NOTE: NOT A TaskGenerator
def summarize(results, ants)
score = result.split(",")[0]
time = result.split(",")[1]
scores.append(float(score))
times.append(float(time))
score_best = sorted(scores)[:-1][0]
score_median = getMedian(scores)
time_median = getMedian(times)
return {'best':score_best,'median':score_median,'ants':ants}


@TaskGenerator
def runACO(arg,ants,iterations):
cmd = "/home/hamiltont/AntHybrid --%s -m %d -i %d" % \
(arg, ants, iterations)
import shlex, subprocess
cargs = shlex.split(cmd)
output,error = subprocess.Popen(cargs,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE).communicate()
return summarize(result,ants)
*

HTH
--
Luis Pedro Coelho | Institute for Molecular Medicine | http://luispedro.org

Hamilton Turner

unread,
Nov 13, 2012, 7:09:57 AM11/13/12
to jug-...@googlegroups.com
Thanks for the quick reply! Unfortunately I'm still feeling a bit confused. 

The below code will execute if I comment out "r.result" and instead say something like score = 5. Basically it's running the subprocess that takes forever, but I cannot trust just one run, so I want to aggregate a number of repeated runs into one. I'm looking at jug.mapreduce now, perhaps it is closer to what I need? 

ants = 2
iterations = 10
rounds=100
results=[]

@TaskGenerator
def runACO(arg,core,stg):
    cmd = "echo '5,123'"  # An example output from the subprocess is <num>,<num>
    import shlex, subprocess
    cargs = shlex.split(cmd)
    output,error = subprocess.Popen(cargs,stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
    return output

def runTrial(rounds, path, core):
    scores = []
    times = []
    results = [runACO(arg,core,path) for i in range(0,rounds)]
    for r in results:
        score = r.result.split(",")[0] # Here is where the problem is. I'm accessing results that are not available, but I don't know how to not do this
        time = r.result.split(",")[1]   #  And here . . . :-(
        scores.append(float(score))
        times.append(float(time))
    return ({'best':sorted(scores)[:-1],
             's_median':getMedian(scores),
             't_median':getMedian(times)})

for dname in os.listdir(stg_path):
    if dname[0] == ".":
        continue
    for fname in os.listdir(stg_path + dname):
        if fname[0] == ".":
            continue        
        meta = get_file_meta("%s%s/%s" % (stg_path, dname, fname))
        
        for arg in argument_variations:
            for core in cores:
                path = "%s%s/%s" % (stg_path,dname,fname)
                data = runTrial(rounds, path, core)
                # Stores data for this trial

Best, 
Hamilton

On Tuesday, November 13, 2012 6:10:35 AM UTC-5, Hamilton Turner wrote:
Perhaps my brain is not so parallel-oriented, but I can't seem to get this to work: 

trials = []

@TaskGenerator
def runACO(arg,ants,iterations):
    cmd = "/home/hamiltont/AntHybrid --%s -m %d -i %d" % (arg, ants, iterations)
    import shlex, subprocess
    cargs = shlex.split(cmd)
    output,error = subprocess.Popen(cargs,stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
    return result

for file in os.dirlist(data_files):
# Many, many for loops later to create arg
# ...
results = [runACO(arg,ants,iterations) for i in range(0,rounds)]           

# I need to do this for each result
score = result.split(",")[0]
time = result.split(",")[1]
scores.append(float(score))
times.append(float(time))

# This is my main issue - I only want to store the median values for each run of 0...rounds
score_best = sorted(scores)[:-1][0] 
score_median = getMedian(scores)
time_median = getMedian(times)

trials.append({'best':score_best,'median':score_median,'ants':ants,'iter':iterations % Omitted rest % })


PS - loving the framework so far, I've setup NFS on our cluster to test is out at scale!


Luis Pedro Coelho

unread,
Nov 13, 2012, 8:29:05 AM11/13/12
to jug-...@googlegroups.com
On Tuesday, November 13, 2012 04:09:57 AM Hamilton Turner wrote:
> Thanks for the quick reply! Unfortunately I'm still feeling a bit confused.
>
> The below code will execute if I comment out "r.result" and instead say
> something like score = 5. Basically it's running the subprocess that takes
> forever, but I cannot trust just one run, so I want to aggregate a number
> of repeated runs into one. I'm looking at jug.mapreduce now, perhaps it is
> closer to what I need?

Maybe it is simpler if we de-sugar this code:


ants = 2
iterations = 10
rounds=100
results=[]

# REMOVED TASKGENERATOR
def runACO(arg,core,stg):
...

def runTrial(rounds, path, core):
scores = []
times = []
# Calling Task explicitly:
results = [Task(runACO,arg,core,path) for i in range(0,rounds)]
for r in results:

# r is a Task object!
score = r.result.split(",")[0]

So, this doesn't work. The only things you can do with a Task object is pass
it to another Task or call bvalue() on it (this last option is a bit more
complex, so let's ignore it for now). You need to use another function:


def summarize(results):
scores = []
times = []
for r in results:
score = r.result.split(",")[0]
time = r.result.split(",")[1]
scores.append(float(score))
times.append(float(time))
return ({'best':sorted(scores)[:-1],
's_median':getMedian(scores),
't_median':getMedian(times)})

def runTrial(rounds, path, core):
results = [Task(runACO,arg,core,path) for i in range(0,rounds)]
return Task(summarize,results)

Or, if you add the appropriate TaskGenerators back:

def runTrial(rounds, path, core):
return summarize([runACO(arg,core,path) for i in range(0,rounds)]

HTH,

Hamilton Turner

unread,
Nov 13, 2012, 5:36:43 PM11/13/12
to jug-...@googlegroups.com
Luis Pedro, you are currently a god to me!

I'm actually utilizing 30 different computers, 20 cores per computer and it's working :-) This email may be a bit long for the list, but I was desperately searching for more examples of working code, so I decided posting mine may end up being useful to a future soul 

Some feedback: 
 - The status --cache paramater is critical, but it's not really documented. Where is the cache e.g. will this method build a separate cache if called on each of 30 machines? Can --cache be used with other subcommands, like check? 
 - It's a bit unclear when you might need the clean - an example where you need to update your jugfile because it's incorrect would be useful - I ended up removing the entire jugdata directory every time I changed my jugfile
 - Feature thought: can each task record it's CPU time and sum it to a global, so at the end you can see how much total CPU time was needed for each type of task and/or for the entire program? 
 - The check command doesn't seem to be working for me. Maybe it's my setup, which I've detailed below in case it helps anyone else
 - An example where you abort your jug workers in the middle of execution might be useful
 - In general, some more info on how to debug / ensure that your jug workers are operating properly
 - I suppose jug is an ad-hoc solution, but I was a bit sad that it seems to really start stuttering when you get to a huge number of tasks, like 20-30 million. I think I'll just resign myself that it's for rapid concurrent execution and I can manually run a few iterations, changing the highest-level parameters

One more question: 
 - Can I access incremental results? I tried running this while the jug_benchmark.py was still being executed, but it failed
import jug
jug.init('jug_benchmark.py','jug_benchmark.jugfile')
import jug_benchmark
results = jug.task.value(jug_benchmark.results)
for r in results:
print r


__Setup__
 - The /home directory on all machines is an NFS share
 - Small run_workers.sh bash script on each slave machine: 
#!/bin/sh
for i in {1..20}
do
jug execute jug_benchmark.py &
done
wait

Because I am using the subprocess python module, which plays with pipes, I ended up using screen to help me ensure that when 
I logged off machines the jug workers would keep running: 

To start all slaves, I logged onto master and execute
hamiltont$ pssh -h jug_workers.ip screen -d -S jug -m sh run_workers.sh 

This SSH's into each IP in the jug_workers file and starts screen in "detached mode" running the run_workers.sh script, which simply starts all workers and then hangs until they have completed

___Final Code__

from jug.task import TaskGenerator
from jug import Task
import os

ants = 10
iterations = 100
rounds = 100;
stg_path = '<omitted>'
ant_path = '<omitted>'
argument_variations = ["rank 3"] #"simple", "elitist 0.7", "rank 50", "maxmin", "acs"]
cores=[2, 4, 8, 16]
results = [] 

def runACO(arg,core,stg):
    cmd = "%s --%s -c %d -f %s -m %d -i %d" % (ant_path, arg, core, stg, ants, iterations)
    import shlex, subprocess
    cargs = shlex.split(cmd)
    output,err = subprocess.Popen(cargs,stdout=subprocess.PIPE).communicate()
    return output

def summarize(results):
    scores = [] 
    times = [] 
    for r in results: 
        score = r.split(",")[0] 
        time = r.split(",")[1] 
        scores.append(float(score)) 
        times.append(float(time)) 
    return ({'s_best':sorted(scores)[:-1], 
          's_median':getMedian(scores), 
         't_median':getMedian(times)}) 

def runTrial(rounds, path, core):
    scores = []
    times = []
    results = [Task(runACO,arg,core,path) for i in range(0,rounds)]
    return Task(summarize,results)

for dname in os.listdir(stg_path):
    if dname[0] == ".":
        continue
    for fname in os.listdir(stg_path + dname):
        if fname[0] == ".":
            continue        
                
        for arg in argument_variations:
            for core in cores:
                path = "%s%s/%s" % (stg_path,dname,fname)
                data = runTrial(rounds, path, core)
                
                # Stores data for this trial
                meta = get_file_meta("%s%s/%s" % (stg_path, dname, fname))
                opt = <get from meta>

                meta = 0; # Free up meta with this many tasks
                trial = {'arg':arg, 'ants':ants, 'iter':iterations, 'cores': core, 
                         's_opt':opt, 'results' : data}
                results.append(trial)

Hamilton Turner

unread,
Nov 13, 2012, 5:46:07 PM11/13/12
to jug-...@googlegroups.com
One more piece of info in case it's useful to you: 

[hamiltont@ataack ~]$ jug status --cache jug_benchmark.py 
Task name                                    Waiting       Ready    Finished     Running
----------------------------------------------------------------------------------------
jug_benchmark.runACO                               0           0      418300       13700
jug_benchmark.summarize                          137           0        4183           0
........................................................................................
Total:                                           137           0      422483       13700

Luis Pedro Coelho

unread,
Nov 13, 2012, 6:50:39 PM11/13/12
to jug-...@googlegroups.com
Thanks!

I think you are operating at the wrong granularity, though.

There is a bit of overhead for each Task (in memory, a Task object; on
disk, a file; in time, mostly the time of hitting the disk).

If you have 20+ million tasks, this is a lot of overhead. You want your
tasks to take at least a few seconds each so that the task overhead
becomes negligible.

In your example, you probably want to have all the rounds be one task.
I.e., your runTrial calls should be your tasks, not the individual
runACO+summarize calls. This would cut down the number of tasks by a
factor of 17.

You might even aggregate some of these a bit more. For example, write a
function that explore a whole chunk of your problem space and build
tasks on that.

Alternatively, the redis backend is better for using many small results.
The file backend has one file per task; typically this will imply at
least 4KiB per Task. Redis does not have this problem.

Some more comments below:

> you need to update your jugfile because it's incorrect would be useful -
> I ended up removing the entire jugdata directory every time I changed my
> jugfile

jug cleanup is there so that you don't have to do that :)

It will only actually remove the stuff that too much. And it removes
lock files if some processes crash.

> - Feature thought: can each task record it's CPU time and sum it to a
> global, so at the end you can see how much total CPU time was needed for
> each type of task and/or for the entire program?

Thought about that. Never got round to writing the code for it, though.

> - The check command doesn't seem to be working for me. Maybe it's my
> setup, which I've detailed below in case it helps anyone else

check only returns a value, there is no visible output. It is only
useful for writing scripts.

HTH
Luis

Hamilton Turner

unread,
Nov 13, 2012, 7:04:17 PM11/13/12
to jug-...@googlegroups.com
Yea, I think I am too :-) Unfortunately this is all really nonoptimal code currently, so a single runACO can take minutes (not seconds like an optimal ACO) and I don't want to accidentally queue up a Task with 100 rounds of runACO that will end up taking hours -- I'd rather manually deal with the high-level granularity and allow jug to parallelize the calls to the subfunction. However, once I optimize the code I'll definitely be changing it, I'm feeling more confident with jug now that I've got a bit working example. Thanks for the cleanup clarification, and now I get the check function too: I was expecting it to print 0 or 1 to stdout ;-)

Thanks very much for this library, and the mailing list help, it's greatly appreciated!
Reply all
Reply to author
Forward
0 new messages