> We have recently started to use Disco for our web servers log analysis and I
> simply love it. I actually started to write a similar thing while finding
> our about Disco.
> Needless to say Disco is far more advanced than whatever I had :-)
Great!
> Anyhow I have two issues with an initial patch that I wanted to run by the
> mailing list:
>
> Locality - I usually use local file path since its easier for me to do that
> than to map a web folder for the files. On remote machines it might be
> easier and better when using local files to copy the files to the local
> machine before working with it. I assume this is how its done when using
> files located on http://, but I think it would be great to have them in
> local files as well.
If you distribute files to nodes e.g. using distrfiles.py, they'll be
accessed directly on the filesystem without HTTP. For instance, if
address of an input file is "disco://nx30/data/mydata_34", Disco will
schedule a map task on the node nx30 and access the file mydata_34
locally. Only if nx30 is not available, or if the task fails on that
node, the file will be accessed over HTTP.
> I was thinking about adding a new parameter to a job that will be a function
> that can decide whether to enable locality on the file or not. This way,
> when we reach the node we can decide if we want to copy the file to a local
> place or read it remotely. This can be good for files on a shared storage or
> other such configuration.
The assumption is that data is already distributed by some mechanism
when the job starts. It is up to you to decide whether you want to use
a centralized scheme with a single web server, a distributed scheme
with a web server on each node and local file access if possible, or a
full-fledged distributed file system.
Could you elaborate what you want to achieve exactly with the new parameter?
> The second issue is supporting Gzipped files. Some of the logs we parse are
> gzipped to save up space. There is a sweet spot for gzipped files between
> the amount of CPU invested to read them and the amount of I/O used (if the
> file is compressed enough it will take significantly less space so it will
> take less I/O but more CPU to read).
>
> The quick solution would be to change the open_local method in the node to
> try and open the file as GzipFile. If it fails, try to open it as it should.
> The generic solution would be to supply a set of handlers that can determine
> how to read a file. This will enable to add support for normal zip files,
> bzip2 files or files that require some parsing beforing reading it.
Good idea! We have also had some situations where gzip'ed input files
would have made sense.
Do you think that it would be too brutal to just look at the suffix
(.gz, .bz2, etc.) and choose an appropriate handler based on that?
Actually I think that you could implement this using the current
map_reader interface. It gives you a bare file descriptor (fd) to the
input stream, file size, and the file name. This information should be
enough for decoding pretty much any compressed format. We could
provide a set of map_reader decorators for the most common formats.
These decorators could be wrapped around a user-specified map_reader
that would then receive a file descriptor to the decoded stream.
Knowing the uncompressed file size before the file is fully decoded is
problematic, unless this information is explicitely included in the
archive's header. However, it is ok to provide None as the file size
to map_reader, so this issue shouldn't be a show-stopper.
Ville
Could you elaborate what you want to achieve exactly with the new parameter?
Good idea! We have also had some situations where gzip'ed input files
would have made sense.
Do you think that it would be too brutal to just look at the suffix
(.gz, .bz2, etc.) and choose an appropriate handler based on that?
Actually I think that you could implement this using the current
map_reader interface. It gives you a bare file descriptor (fd) to the
input stream, file size, and the file name. This information should be
enough for decoding pretty much any compressed format. We could
provide a set of map_reader decorators for the most common formats.
These decorators could be wrapped around a user-specified map_reader
that would then receive a file descriptor to the decoded stream.
Knowing the uncompressed file size before the file is fully decoded is
problematic, unless this information is explicitely included in the
archive's header. However, it is ok to provide None as the file size
to map_reader, so this issue shouldn't be a show-stopper.
It has been a deliberate decision to keep the storage layer separate
from Disco. Your application / filesystem should be better informed
than Disco to move data around as efficiently as possible.
That being said, I can see that in some situations it might be
convenient if Disco could copy and distribute data from a centralized
storage by itself. One (easy) way to implement this would be the
following: Node that needs a chunk of data requests it from the
master. Master returns a URL from which the data can be downloaded
over HTTP. The node downloads the data and saves it to its work
directory.
This way master can control the number of concurrent downloaders, to
limit IO load to the storage system. As long as you rely on
point-to-point TCP connections, HTTP is the easiest way to go (its
overhead over bare TCP is negligible). Another possibility would be to
use some multicast protocol or even bittorrent (see
http://www.umiacs.umd.edu/~nedwards/research/bittorrent_for_clusters.html).
How does this sound?
>> Do you think that it would be too brutal to just look at the suffix
>> (.gz, .bz2, etc.) and choose an appropriate handler based on that?
>>
>> Actually I think that you could implement this using the current
>> map_reader interface. It gives you a bare file descriptor (fd) to the
>> input stream, file size, and the file name. This information should be
>> enough for decoding pretty much any compressed format. We could
>> provide a set of map_reader decorators for the most common formats.
>> These decorators could be wrapped around a user-specified map_reader
>> that would then receive a file descriptor to the decoded stream.
>> Knowing the uncompressed file size before the file is fully decoded is
>> problematic, unless this information is explicitely included in the
>> archive's header. However, it is ok to provide None as the file size
>> to map_reader, so this issue shouldn't be a show-stopper.
>
>
> I also toyed with the brute force approach of detecting the extensions but
> decorators might be a better idea.
> I know the GzipFile class in Python can work as a decorator on top of an
> existing file descriptor. I don't know about .zip and .bzip2.
ok
> Regarding file size, I found a code sample on how to get the real file size
> from .gz files (I assume it can also be used in bzip2 in some way or
> another). It requires seeking a certain structure at the end of the file,
> reading the size and setting the location back to the start. In .zip its
> easier since its inside the header.
ok. The main reason why file size is provided to map_reader is that it
can see if the stream dies before all data has been read. When using a
decompressor decorator, the decorator can detect that the stream is
corrupted if it dies prematurely and it can throw an exception. So
it's not so critical if the user's map_reader can't detect corrupted
streams in this case.
> The only "issue" (its not that big) would be that the reader is a pure
> function, so adding additional decorators other than the ones that will be
> pre-installed would mean using the "import" within the pure function (like
> one would do in the map/reduce function) or using the required_modules
> parameter when starting the job.
Right. It's unfortunate that Python doesn't have better
metaprogramming capabilities. It would be great to generate new
functions on the fly in cases like this. One ugly approach to this
problem can be seen in disco.func.make_range_partition().
If the decorators are part of the standard Disco package, they are
readily available on the nodes and can be imported to disco_worker's
namespace automatically, so there's a reasonably clean solution to
this issue. However this doesn't help a user who wants to make new
custom decorators, so the general question remains unsolved.
Ville
It has been a deliberate decision to keep the storage layer separate
from Disco. Your application / filesystem should be better informed
than Disco to move data around as efficiently as possible.
That being said, I can see that in some situations it might be
convenient if Disco could copy and distribute data from a centralized
storage by itself. One (easy) way to implement this would be the
following: Node that needs a chunk of data requests it from the
master. Master returns a URL from which the data can be downloaded
over HTTP. The node downloads the data and saves it to its work
directory.
This way master can control the number of concurrent downloaders, to
limit IO load to the storage system. As long as you rely on
point-to-point TCP connections, HTTP is the easiest way to go (its
overhead over bare TCP is negligible). Another possibility would be to
use some multicast protocol or even bittorrent (see
http://www.umiacs.umd.edu/~nedwards/research/bittorrent_for_clusters.html).
How does this sound?
ok. The main reason why file size is provided to map_reader is that it
can see if the stream dies before all data has been read. When using a
decompressor decorator, the decorator can detect that the stream is
corrupted if it dies prematurely and it can throw an exception. So
it's not so critical if the user's map_reader can't detect corrupted
streams in this case.
Right. It's unfortunate that Python doesn't have better
metaprogramming capabilities. It would be great to generate new
functions on the fly in cases like this. One ugly approach to this
problem can be seen in disco.func.make_range_partition().
If the decorators are part of the standard Disco package, they are
readily available on the nodes and can be imported to disco_worker's
namespace automatically, so there's a reasonably clean solution to
this issue. However this doesn't help a user who wants to make new
custom decorators, so the general question remains unsolved.
Thanks! It seems that the original proposal
(http://issues.apache.org/jira/browse/HADOOP-288) to which the article
refers, concerns out-of-band data or "parameters" in Disco parlance.
Distributing large parameter sets is a problematic issue. Currently
Disco caches them locally in a similar manner that is proposed in
HADOOP-288 but it is still an open question what would be the best way
to distribute them to nodes in the first place.
Currently they are retrieved from a centralized location on the master
node over HTTP. Evidently this approach doesn't scale too well. We
have had problems already with tens of nodes requesting an 1G
parameter set from the master at the same time. We can prevent the
master from being DDOS'ed by limiting the number of concurrent
downloaders as I proposed earlier. However, a truly scalable solution
would distribute IO load to several nodes, e.g. a'la Bittorrent.
> I heard a couple of lectures from Googlers on Map-Reduce and from what I
> could understand from them, the master's responsibilities are:
>
> Distribute the work to mappers/reducers
> Monitor nodes (both for errors, status and performance)
>
> Several people in these lectures asked who has the responsibility of
> requesting the data and the Googlers said its not the master's
> responsibility. Each node needs to take the data and handle it.
Right. That's why "it has been a deliberate decision to keep the
storage layer separate from Disco."
> So, from an architectural point of view, perhaps there is a place for some
> other service (which can run, of course, on the master if needed). You can
> also consider this a similar abstraction to Hadoop's File System where there
> is some service that allows you to get access to the files.
Maybe something like Ringo (:
http://github.com/tuulos/ringo/
> The interesting
> thing about using HTTP is that you can pull remote files from our side your
> network, if needed, which makes this an interesting built-in feature for a
> hosted map-reduce solution :-)
Yep
> One interesting anecdote on the master (just a side note, actually), is that
> when it monitors the performance, a good optimization would be to kill slow
> progressing jobs on rather slow nodes and rescheduling them again on other
> nodes that are available or finished. They proved that it can dramatically
> improve the run time of a map-reduce job.
Good idea. We have actually done some preliminary work on running
map/reduce on extremely heterogenous environments (mobile phones!),
see
http://github.com/douji/misco/
where you need much more intelligent scheduling than with relatively
homogenous HPC clusters. We hope that this work will eventually
contribute improvements to the Disco's standard scheduler.
...
>> If the decorators are part of the standard Disco package, they are
>> readily available on the nodes and can be imported to disco_worker's
>> namespace automatically, so there's a reasonably clean solution to
>> this issue. However this doesn't help a user who wants to make new
>> custom decorators, so the general question remains unsolved.
>
> I guess this is something that can be solved using documentation, stating
> that if you want to include your own custom decorator you'll either need it
> to be pre-installed on the node (so that the "import" statement can find it)
> or send it via the "required_modules" parameter of the job.
Right.
Thanks a lot for your insightful comments! I hope that you'll find
time to contribute patches too (:
Ville