Parallel processing for image loading and augmentation in flow_from_directory / DirectoryIterator

226 views
Skip to first unread message

Pat Jayet

unread,
Sep 14, 2018, 4:06:08 AM9/14/18
to Keras-users
Hi all,

Currently when calling flow_from_directory (which uses a DirectoryIterator), the data loading and augmentation is done one image at a time (on a single thread). Here is the code snippet:

# keras_preprocessing, image.py line 1948
def _get_batches_of_transformed_samples(self, index_array):
    ...
    # build batch of image data
    for i, j in enumerate(index_array):
        fname = self.filenames[j]
        img = load_img(os.path.join(self.directory, fname),
                       color_mode=self.color_mode,
                       target_size=self.target_size,
                       interpolation=self.interpolation)
        x = img_to_array(img, data_format=self.data_format)
        # Pillow images should be closed after `load_img`,
        # but not PIL images.
        if hasattr(img, 'close'):
            img.close()
        params = self.image_data_generator.get_random_transform(x.shape)
        x = self.image_data_generator.apply_transform(x, params)
        x = self.image_data_generator.standardize(x)
        batch_x[i] = x
...

When I use data augmentation (and probably even without), the bottleneck is not on the GPU (which has low usage) but on the data loading and augmentation.

I tried to do implement a multi-threaded loading/processing, which looks as follow:

# at the end of the DirectoryIterator, I construct a ThreadPool
self.pool = multiprocessing.pool.ThreadPool(multiprocessing.cpu_count() * 2)

# now here is _get_batches_of_transformed_samples
def _load_image_and_apply_transform(self, j):
    fname = self.filenames[j]
    img = load_img(os.path.join(self.directory, fname),
                   color_mode=self.color_mode,
                   target_size=self.target_size,
                   interpolation=self.interpolation)
    x = img_to_array(img, data_format=self.data_format)
    # Pillow images should be closed after `load_img`,
    # but not PIL images.
    if hasattr(img, 'close'):
        img.close()
    params = self.image_data_generator.get_random_transform(x.shape)
    x = self.image_data_generator.apply_transform(x, params)
    x = self.image_data_generator.standardize(x)
    return x

def _get_batches_of_transformed_samples(self, index_array):
...
    # build batch of image data
    results = self.pool.map(self._load_image_and_apply_transform, index_array)
    for i in range(len(index_array)):
        batch_x[i] = results[i]

It gives me good results so far. For instance on a training data set with 317k images, on an 8 VCPUs machine with a 1080 TI gpu, it reduced the epoch time from ~3h20 to ~0:53. I can also see that the GPU is well used as well as all 8 VCPUs.

My questions:
- what do you think about this modification? (we could also add a parameter to flow_from_directory / DirectoryIterator to specify if we want to use multi-threaded data loading/augmentation and on how many threads)
- anything speaks against it?
- would you be interested in a pull request?

Feedback welcome!

Cheers,
Pat

Pat Jayet

unread,
Sep 16, 2018, 4:55:59 PM9/16/18
to Keras-users
Hi all,

People willing to test drive this speedup in flow_from_directory can try out this fork.

Installation directly from github with this command:

$ pip install --no-dependencies git+https://github.com/pajai/keras-preprocessing.git@master 

Disclaimer: flow_from_dataframe is currently broken.

Feedback welcome.
Cheers,
Pat

silviuc...@gmail.com

unread,
Nov 19, 2018, 4:55:33 AM11/19/18
to Keras-users
I have tested it and seems to be working fine for me. I reduced my epoch time from ~36 seconds to 15 seconds. Definitely use case dependent. 
You should expose the number of threads as a parameter.

Thank you!
Reply all
Reply to author
Forward
0 new messages