Job Output Load Times

8 views
Skip to first unread message

Justin Porter

unread,
Jun 12, 2018, 3:44:15 PM6/12/18
to jug-users
Hello Again,

I have the following situation: I have done a long computation and returned the result (which is an sklearn-like object, PyEmma's TICA object). The pickle is like 700MB on disk, which (I think?) ought to be un-pickleable in reasonable time.

However, it takes longer than 20 minutes (I killed it after a while) to load, and won't even get to a first print statement.

When interrupted, the following stack trace is emitted:

  File "/home/jrporter/miniconda2/envs/std3/bin/jug", line 11, in <module>
    load_entry_point('Jug==1.6.7+git', 'console_scripts', 'jug')()
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/jug.py", line 278, in main
    cmdapi.run(options.subcommand, options=options, store=store, jugspace=jugspace)
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/subcommands/__init__.py", line 271, in run
    return cmd(*args, **kwargs)
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/subcommands/__init__.py", line 161, in __call__
    return self.run(*args, **kwargs)
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/subcommands/execute.py", line 93, in run
    failures = execution_loop(tasks, options) or failures
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/jug.py", line 195, in execution_loop
    t.run(debug_mode=options.debug)
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/task.py", line 111, in run
    self._result = self._execute()
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/task.py", line 119, in _execute
    args = [value(dep) for dep in self.args]
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/task.py", line 119, in <listcomp>
    args = [value(dep) for dep in self.args]
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/task.py", line 476, in value
    return elem.value()
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/task.py", line 130, in value
    return self.result
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/task.py", line 125, in _get_result
    if not hasattr(self, '_result'): self.load()
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/task.py", line 167, in load
    self._result = self.store.load(self.hash())
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/backends/file_store.py", line 228, in load
    return decode_from(input)
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/backends/encode.py", line 195, in decode_from
    return pickle.load(stream)
  File "/home/jrporter/miniconda2/envs/std3/lib/python3.6/site-packages/Jug-1.6.7+git-py3.6.egg/jug/backends/encode.py", line 121, in read
    res += self.D.decompress(buf, nbytes - len(res))

What can I do about this? Obviously I could try to fit a smaller model (TICA is similar to PCA and I could just use fewer components to reudce the size of the covariance matrix), but I'd rather just make it load faster...

Thanks for a great library!

Cheers,
JRP 

Luis Pedro Coelho

unread,
Jun 13, 2018, 5:15:20 AM6/13/18
to jug-users
Hi Justin,

Yes, 700MiB should not be that big of a deal (although note that jug compresses the pickle, so it may be a much larger thing).

Right now, there is no hook for controlling how things are saved by jug. My workaround for very large datasets is to explicitly save it to a file using a dataset specific format (e.g., feather for pandas) and return the file path. You'll have to manage the paths yourself, unfortunately.

Cheers,
Luis

--
Luis Pedro Coelho | EMBL | http://luispedro.org

PI of Big Data Biology Lab at Fudan University (start mid-2018)
http://big-data-biology.org


On Tue, 12 Jun 2018, at 9:44 PM, Justin Porter wrote:
> Hello Again,
>
> I have the following situation: I have done a long computation and returned the result (which is an sklearn-like object, PyEmma's TICA object[http://www.emma-project.org/v2.4/api/generated/pyemma.coordinates.transform.TICA.html]). The pickle is like 700MB on disk, which (I think?) ought to be un-pickleable in reasonable time.
> --
> You received this message because you are subscribed to the Google Groups "jug-users" group.
> To unsubscribe from this group and stop receiving emails from it, send an email to jug-users+...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.

Justin Porter

unread,
Dec 6, 2018, 3:46:32 PM12/6/18
to jug-users
I have a related question. At present, when I have a list of results (typically the result of a map-like operation) that need to be the processed (in this case, a minibatch k-means clustering), I need to load those results one by one, do something, then dump them. Until how, I have been doing what you suggested previously, and that has been working really well.

Now, I want to load only a subsampled version of the original array (say, every 10th timepoint). At present the workflow is:

1. use jug.value(task.t) to load the data
2. slice into the data, and force a copy operation
3. use task.t.unload() to clear the large data

I do this many times to build up a mini-batch, which I then fit, dump, and load a new one. The problem with this is that it forces me to load into memory 10x more data than is really necessary, and creates a giant slowdown. Since file I/O is really the slowest part of this, it would be nice to be more efficient about it.

Generally, using pure numpy, I would just use np.load(filename, mmap_mode='r') to memory-map the file before loading it, so that only the slice is loaded. This would both prevent loading the whole dataset and prevent the copy. However, it seems like there's not really a way to control the way the file store does this. Is there some way I'm missing that I could do something like this? (For example, if I got the file path, I could do the np.load call myself.) Or would there be a straightforward way to add this functionality?

Thanks again for a great library!

Cheers,
Justin

Luis Pedro Coelho

unread,
Dec 7, 2018, 3:38:42 AM12/7/18
to jug-users
It seems that mmap() was used in the very first attempts, but it was brittle: I do know of some filesystems that handle mmap() very poorly.


IIRC, the issue is that not all dtypes were mmap()able (at least at the time, this commit is from 2011; it's been a while).

*

I would happily accept a patch that makes this configurable, like compressing numpy arrays is a configuration option, right now.

HTH,
Luis


Luis Pedro Coelho | Fudan University | http://luispedro.org

Justin Porter

unread,
Dec 7, 2018, 11:11:25 AM12/7/18
to jug-...@googlegroups.com

Aha. I would be certainly open to implementing this.

 

With the proviso that I don’t totally understand how all the objects relate together, I can see a couple of ways this might work.

 

  1. A way to tell jug.value() “load this as a memmap” (or even, “don’t zip this”)?

 

I’m imagining something like jug.value(task.t, mmap_mode=’r’), where the *args and **kwargs are just propagated forward to the load call in the store. That way there would be a general system for providing additional keyword arguments for load functions in the future.

 

2. Add some kind of setting to the file_store, so that you can modify its behavior globally (or maybe just for the calling task). I’m imagining something like

 

backend.save_numpy_as_compressed (False)

backend.load_as_memmap(True)

 

(You could also imagine things like backend.save_memmap())

 

3. Decorators/parameters to @TaskGenerator to indicate saving behavior, e.g.:

 

@TaskGenerator(load_mmap_mode=’r’)

def do_something_strided(list_of_tasks, stride=1):

            …

 

What do you think?

 

Cheers!

 

JRP

Luis Pedro Coelho

unread,
Dec 8, 2018, 12:53:45 AM12/8/18
to Justin Porter, jug-...@googlegroups.com
 

2. Add some kind of setting to the file_store, so that you can modify its behavior globally (or maybe just for the calling task). I’m imagining something like

 

backend.save_numpy_as_compressed (False)

backend.load_as_memmap(True)

 

(You could also imagine things like backend.save_memmap())



This is what already exists for numpy compression: generally it's a bad idea if you have arrays of floats: you spend CPU time and get very little compression, so it's off by default. By sometimes (e.g., large, sparse matrices), it makes a lot of sense, so you can turn it on.


I think something like this would be the least intrusive.

HTH
Luis 
Reply all
Reply to author
Forward
0 new messages