Mapping entire files

39 views
Skip to first unread message

acr

unread,
Apr 22, 2015, 11:46:53 PM4/22/15
to disc...@googlegroups.com
Hi all,

I'm trying to use Disco's DDFS in conjunction with Disco's map reduce implementation to map a whole file rather than individual lines of a file. 

My aim is to push a large collection of (time series data) files to DDFS, each file with a distinct tag, and then to map each individual file one at a time, to be processed during the reduce step. My attempts to do this, however, have resulted in the file being parsed and then mapped one line at a time during the map stage. 

Here's some sample code to illustrate what I mean:

from disco.ddfs import DDFS
ddfs
= DDFS()
all_disco_tags
= ddfs.list() # I've already pushed all of the relevant files to DDFS


from disco.core import Job, result_iterator
job
= Job().run(input=all_disco_tags,
                map
=map,
                partitions
=partitions,
                reduce
=featurize_reduce,
               
params=params)
 
result
= result_iterator(job.wait(show=True))
return result


Map function:

def map(data, params):
   
print "map:  " + str(data) # for testing purposes, to see what is being mapped
   
yield data, "junk string"


Which just results in individual lines of each of the files being mapped. Any tips on how I can map a whole file that has been pushed to DDFS -- and not the lines contained therein individually -- would be greatly appreciated.

When I was only dealing with local files (not pushed to DDFS) I was creating a file with each line containing the path to a local time series data file, and would pass that "master file" as the Job().run() `input` parameter, which was working great.

Any tips or suggestions would be much appreciated! 

Shayan Pooya

unread,
Apr 23, 2015, 2:16:40 AM4/23/15
to disc...@googlegroups.com
Hello,

How did you push data into DDFS?
There are two different approaches available:
1. ddfs chunk: which will assume each line is a record. The lines will
be read one by one, a blob of data is created out of these lines and
then pushed into ddfs. When reading the file, the map task will
receive the file line by line (record by record).
2. ddfs push: which basically pushes the data into ddfs.

The latter is the one you probably want to look into.

Although ddfs chunk also accepts a reader which will allow it to have
records with arbitrary rules (e.g. reading xml in
https://github.com/discoproject/disco/blob/develop/examples/util/xml_reader.py)
> --
> You received this message because you are subscribed to the Google Groups
> "Disco-development" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to disco-dev+...@googlegroups.com.
> To post to this group, send email to disc...@googlegroups.com.
> Visit this group at http://groups.google.com/group/disco-dev.
> For more options, visit https://groups.google.com/d/optout.

acr

unread,
Apr 23, 2015, 2:04:54 PM4/23/15
to disc...@googlegroups.com
Hi, 

Thanks for your reply. I did in fact use `ddfs.push()`, each file pushed with a unique tag. Then I pass the output of `ddfs.list()` as the `input` parameter in the `Job().run()` method. This, however, results in individual lines from each of the pushed files to mapped, rather than the whole file or its tag!

acr

unread,
May 11, 2015, 7:35:02 PM5/11/15
to disc...@googlegroups.com
Hi all,

Just wondering if anyone has any insights or suggestions for delving further into this.

Thanks,
Ari

Erik Dubbelboer

unread,
May 14, 2015, 10:35:14 PM5/14/15
to disc...@googlegroups.com
What you want is to overwrite the map_reader method. See the top example on this page: http://disco.readthedocs.org/en/latest/lib/worker/classic.html

As you can see the map_reader receives a file descriptor (fd) which you can use to read the file in any way you would like. It then should yield what it wants to return to the map function.
Reply all
Reply to author
Forward
0 new messages