Multiple maps and reduces

27 views
Skip to first unread message

Matthew Gardner

unread,
Oct 15, 2012, 4:43:47 PM10/15/12
to mrs-ma...@googlegroups.com
Hi Andrew (and others that may be on this list),

I've got a relatively simple (but large scale) data processing job, and I figured mrs would be a good way to do it.  But I'm forgetting how exactly the reduce task output works, and I have a few other questions.  I want to do a map, reduce, map, reduce, and I'm not sure if I need to encode the key into the value I yield from the first reduce task.  To be clear, this is the pipeline I'm envisioning:

first map: data file -> (integer, (integer, integer))
first reduce: (integer, list(integer, integer)) -> (integer, list(integer, integer)) [this is just a collecting reduce - is there a builtin reduce function for simple things like this?]
second map: (integer, list(integer, integer)) -> ((integer, integer), string)
second reduce ((integer, integer), list(string)) -> ((integer, integer), list(string)) [another collecting reduce]

So, specifically, if my two reduces just look like this, will it work?  I probably need to convert the list of values into a string, with json or something?

def reduce(self, key, values):
    yield values

Second, if I have a pair of integers, encoded in a string, using mod_partition probably won't work anymore, right?  If not, what partition function should I use?

And lastly, I have a big machine with 24 cores that I want to just run this on.  Do I need to write a script to start a master and a bunch of slaves on the same machine, or is there built in functionality that will do that for me?

Oh, and another question - does mrs work with pypy?

Thanks,
Matt

Andrew McNabb

unread,
Oct 15, 2012, 6:16:53 PM10/15/12
to mrs-ma...@googlegroups.com
On Mon, Oct 15, 2012 at 01:43:47PM -0700, Matthew Gardner wrote:
> Hi Andrew (and others that may be on this list),
>
> I've got a relatively simple (but large scale) data processing job, and I
> figured mrs would be a good way to do it. But I'm forgetting how exactly
> the reduce task output works, and I have a few other questions. I want to
> do a map, reduce, map, reduce, and I'm not sure if I need to encode the key
> into the value I yield from the first reduce task. To be clear, this is
> the pipeline I'm envisioning:
>
> first map: data file -> (integer, (integer, integer))
> first reduce: (integer, list(integer, integer)) -> (integer, list(integer,
> integer)) [this is just a collecting reduce - is there a builtin reduce
> function for simple things like this?]
> second map: (integer, list(integer, integer)) -> ((integer, integer),
> string)
> second reduce ((integer, integer), list(string)) -> ((integer, integer),
> list(string)) [another collecting reduce]
>
> So, specifically, if my two reduces just look like this, will it work? I
> probably need to convert the list of values into a string, with json or
> something?
>
> def reduce(self, key, values):
> yield values

A couple of things--I'm not sure what will be most helpful since I'm
having trouble identifying what the exact questions are.

The latest version of Mrs takes advantage of Mrs, so you usually don't
need to serialize manually anymore.

I'm not quite sure what you mean by "I'm not sure if I need to encode
the key into the value I yield from the first reduce task."

Depending on how many keys you have, you might be perfectly happy with
the default partition function (hash_partition).

Since you have a couple of map-reduce phases, you'll write a run method
that calls job.local_data (for the input), job.map_data,
job.reduce_data, job.map_data, and job.reduce_data, each with the
options that you need.

If you want your program to process all of the output, then at the end
of your run function, you'll do a job.wait on the last reduce dataset,
and then you can iterate over its `data()` method to get all output
key-value pairs.

>
> Second, if I have a pair of integers, encoded in a string, using
> mod_partition probably won't work anymore, right? If not, what partition
> function should I use?


> And lastly, I have a big machine with 24 cores that I want to just run this
> on. Do I need to write a script to start a master and a bunch of slaves on
> the same machine, or is there built in functionality that will do that for
> me?

For right now, just start a master and bunch of slaves on the same
machine.

> Oh, and another question - does mrs work with pypy?

Yes.

I hope this is helpful, at least for getting you started.

--
Andrew McNabb
http://www.mcnabbs.org/andrew/
PGP Fingerprint: 8A17 B57C 6879 1863 DE55 8012 AB4D 6098 8826 6868

Andrew McNabb

unread,
Oct 15, 2012, 6:23:00 PM10/15/12
to mrs-ma...@googlegroups.com
On Mon, Oct 15, 2012 at 01:43:47PM -0700, Matthew Gardner wrote:
>
> So, specifically, if my two reduces just look like this, will it work? I
> probably need to convert the list of values into a string, with json or
> something?
>
> def reduce(self, key, values):
> yield values

By the way, for this, you probably want to do:

def reduce(self, key, values):
yield list(values)

Assuming that I understand correctly.

Jeffrey Lund

unread,
Oct 15, 2012, 6:39:14 PM10/15/12
to mrs-ma...@googlegroups.com
The latest version of Mrs takes advantage of Mrs, so you usually don't 
need to serialize manually anymore. 

I think you meant that the latest version of Mrs takes advantage of pickle, so yes, you don't need to manually serialize anymore unless you want to, in which there is a different mechanism for doing it.

Matthew Gardner

unread,
Oct 15, 2012, 6:44:50 PM10/15/12
to mrs-ma...@googlegroups.com
On Mon, Oct 15, 2012 at 6:16 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
> So, specifically, if my two reduces just look like this, will it work?  I
> probably need to convert the list of values into a string, with json or
> something?
>
> def reduce(self, key, values):
>     yield values

A couple of things--I'm not sure what will be most helpful since I'm
having trouble identifying what the exact questions are.

The latest version of Mrs takes advantage of Mrs, so you usually don't
need to serialize manually anymore.
I think you meant something besides "takes advantage of Mrs," but if I understand what you meant, it answers one of my questions.  I remember having to encode everything as strings - you're saying I can pass lists of strings around?  What is the set of things that can be yielded in map and reduce tasks?  (And is there documentation somewhere that I should have read first?  I looked in a few places and didn't find much - the documentation in packages.python.org, linked to by code.google.com, still has yield (word, str(1)), which makes it seem like you still need to encode things as strings.)

The second question was simply, is the key still attached to the output of a reduce?  A reduce function takes as input a (key, values) pair, and just outputs a single value.  I don't need to do any funny business to be sure that the key is still attached to the value in future tasks?  In hindsight, it seems fairly obvious that the key will still be mapped to the output value for the purpose of future tasks, I just remembered having to do something funny when I was writing the SEPSO stuff with mrs.
 
Depending on how many keys you have, you might be perfectly happy with
the default partition function (hash_partition).
I would expect something on the order of millions of keys.  Would hash_partition be fine with that? 

Since you have a couple of map-reduce phases, you'll write a run method
that calls job.local_data (for the input), job.map_data,
job.reduce_data, job.map_data, and job.reduce_data, each with the
options that you need.

If you want your program to process all of the output, then at the end
of your run function, you'll do a job.wait on the last reduce dataset,
and then you can iterate over its `data()` method to get all output
key-value pairs.
Right, that's what I have.  I didn't know about the data() method, though - I was some arguments to the second reduce_data task.  Because I'm expecting there to be so many keys, it might be better to stick with outputting them in the reduce.  Or does it make a difference? 

I hope this is helpful, at least for getting you started.
Thanks, it is indeed helpful. 

Matthew Gardner

unread,
Oct 16, 2012, 11:56:15 AM10/16/12
to mrs-ma...@googlegroups.com
And, just an update, I got it working just fine.  It looks great; I'm pretty happy with it.  Thanks for your help.  There's one issue that I'll put in as a feature request on the google code page, but that's all.

Andrew McNabb

unread,
Oct 16, 2012, 12:14:40 PM10/16/12
to mrs-ma...@googlegroups.com
On Tue, Oct 16, 2012 at 11:56:15AM -0400, Matthew Gardner wrote:
> And, just an update, I got it working just fine. It looks great; I'm
> pretty happy with it. Thanks for your help. There's one issue that I'll
> put in as a feature request on the google code page, but that's all.

I'm glad to hear things are working. Posting a feature request sounds
great, and also feel free to submit corrections for any deficiencies in
the tutorial. Thanks.

Andrew McNabb

unread,
Oct 16, 2012, 12:17:59 PM10/16/12
to mrs-ma...@googlegroups.com
On Mon, Oct 15, 2012 at 06:44:50PM -0400, Matthew Gardner wrote:
> What is the set of things that can be yielded in map and
> reduce tasks? (And is there documentation somewhere that I should have
> read first? I looked in a few places and didn't find much - the
> documentation in packages.python.org, linked to by code.google.com, still
> has yield (word, str(1)), which makes it seem like you still need to encode
> things as strings.)

Hopefully Jeff's comment made this clear (by default it uses pickle).
I'll try to update the docs today.

> The second question was simply, is the key still attached to the output of
> a reduce? A reduce function takes as input a (key, values) pair, and just
> outputs a single value. I don't need to do any funny business to be sure
> that the key is still attached to the value in future tasks? In hindsight,
> it seems fairly obvious that the key will still be mapped to the output
> value for the purpose of future tasks, I just remembered having to do
> something funny when I was writing the SEPSO stuff with mrs.

Yes, the key is still attached to the output of the reduce. With SEPSO
we had multiple items related to the same particle, so things could get
a bit more complicated.

> I would expect something on the order of millions of keys. Would
> hash_partition be fine with that?

This sounds like the perfect place for hash_partition.

> Right, that's what I have. I didn't know about the data() method, though -
> I was some arguments to the second reduce_data task. Because I'm expecting
> there to be so many keys, it might be better to stick with outputting them
> in the reduce. Or does it make a difference?

It all depends on what you're trying to do, but at least you know what
your options are now.

Matthew Gardner

unread,
Oct 16, 2012, 12:35:02 PM10/16/12
to mrs-ma...@googlegroups.com
On Tue, Oct 16, 2012 at 12:17 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
With SEPSO
we had multiple items related to the same particle, so things could get
a bit more complicated.

Yeah, I remembered this later - the issue was that I wanted a reduce-map function, so I could yield to a _different_ key within a reduce.  Instead I encoded the key into the value, and had a map that just split the "value" into a key-value pair and emitted it.  Hence the confusion =).
 
> Right, that's what I have.  I didn't know about the data() method, though -
> I was some arguments to the second reduce_data task.  Because I'm expecting
> there to be so many keys, it might be better to stick with outputting them
> in the reduce.  Or does it make a difference?

It all depends on what you're trying to do, but at least you know what
your options are now.

The issue is really whether or not using the data() method brings everything into memory.  I don't expect that everything will fit in memory.  If the data() method is smart about that, it's probably preferable, so I can put clean output files wherever I want, instead of in the source_0_1wPrn3 files that they end up in with the reduce_data options.

And as far as documentation, I think the most helpful thing would just be more examples (particularly, well-documented examples).  I'm happy to offer the small program I just wrote as another example, if you think that's a good idea.  Also, the mapreduce_parse example is out of date - it uses job.end(), and job.status().  Updating the last few lines to match the currect API would probably be a good idea.

Andrew McNabb

unread,
Oct 16, 2012, 12:48:27 PM10/16/12
to mrs-ma...@googlegroups.com
On Tue, Oct 16, 2012 at 12:35:02PM -0400, Matthew Gardner wrote:
>
> The issue is really whether or not using the data() method brings
> everything into memory. I don't expect that everything will fit in memory.
> If the data() method is smart about that, it's probably preferable, so I
> can put clean output files wherever I want, instead of in the
> source_0_1wPrn3 files that they end up in with the reduce_data options.

Right now it's not quite smart enough about this, but I think this would
be good to add.

> And as far as documentation, I think the most helpful thing would just be
> more examples (particularly, well-documented examples). I'm happy to offer
> the small program I just wrote as another example, if you think that's a
> good idea. Also, the mapreduce_parse example is out of date - it uses
> job.end(), and job.status(). Updating the last few lines to match the
> currect API would probably be a good idea.

Thanks for the suggestions, and yes, I think it would be handy to see
your example, especially if you make sure it's a "well-documented
example". :)

Matthew Gardner

unread,
Oct 16, 2012, 4:55:02 PM10/16/12
to mrs-ma...@googlegroups.com
Alright, I'll work on getting my program to a point where I can submit it as an example.  In the mean time, when scaling up to reasonably large data set, I encountered this problem.  Any idea what may have caused it?

----------------------------------------
Exception happened during processing of request from ('128.2.204.39', 56312)
Traceback (most recent call last):
  File "/home/mg1/clone/pypy-1.7/lib-python/2.7/SocketServer.py", line 284, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/home/mg1/clone/pypy-1.7/lib-python/2.7/SocketServer.py", line 310, in process_request
    self.finish_request(request, client_address)
  File "/home/mg1/clone/pypy-1.7/lib-python/2.7/SocketServer.py", line 323, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/home/mg1/clone/pypy-1.7/lib-python/2.7/SocketServer.py", line 639, in __init__
    self.handle()
  File "/home/mg1/clone/pypy-1.7/lib-python/2.7/BaseHTTPServer.py", line 339, in handle
    self.handle_one_request()
  File "/home/mg1/clone/pypy-1.7/lib-python/2.7/BaseHTTPServer.py", line 313, in handle_one_request
    self.raw_requestline = self.rfile.readline()
  File "/home/mg1/clone/pypy-1.7/lib-python/modified-2.7/socket.py", line 492, in readline
    data = self._sock.recv(self._rbufsize)
  File "/home/mg1/clone/pypy-1.7/lib-python/modified-2.7/socket.py", line 188, in recv
    return self._sock.recv(buffersize, flags=flags)
error: [Errno 104] Connection reset by peer
----------------------------------------

At this point I had finished the first map and reduce, and was about 99% done with the second map.  It then didn't make any more progress.  I got several slaves lost due to timeouts, and I saw the previous error twice.

Related to this, is there any mechanism in mrs for declaring that you're somewhat fault tolerant?  Like, if some number (or percentage) of map tasks repeatedly fail, that's ok, just ignore them and keep going?

Andrew McNabb

unread,
Oct 16, 2012, 7:33:27 PM10/16/12
to mrs-ma...@googlegroups.com
I ran into and fixed a bug that looked a little like this. If it's the
same bug I'm thinking of, it should be fixed in the repo but isn't in a
new release yet. Are you running from the latest tarball or from the
git repo?

If it's not the same, I might need some more information. Specifically,
I'm confused why there isn't a longer traceback.

> At this point I had finished the first map and reduce, and was about 99%
> done with the second map. It then didn't make any more progress. I got
> several slaves lost due to timeouts, and I saw the previous error twice.
>
> Related to this, is there any mechanism in mrs for declaring that you're
> somewhat fault tolerant? Like, if some number (or percentage) of map tasks
> repeatedly fail, that's ok, just ignore them and keep going?

There would need to be a name for this other than "fault tolerant", but
it's an interesting idea. It would not be all that difficult to
implement this.

Andrew McNabb

unread,
Oct 17, 2012, 9:32:50 AM10/17/12
to mrs-ma...@googlegroups.com
By the way, would you mind opening up an issue on the Issue Tracker for
this? That's probably a better place to continue looking into it.
Thanks!

Matthew Gardner

unread,
Oct 17, 2012, 10:00:18 AM10/17/12
to mrs-ma...@googlegroups.com
I'm running from the git repo.  I'm trying to get more information to debug this, but it's a bit tricky, because it only happens after it's been running for an hour or more on a large dataset, and there is no traceback that I've seen.  When I get some more specific information, I'll open a ticket on google code.

Andrew McNabb

unread,
Oct 17, 2012, 10:10:17 AM10/17/12
to mrs-ma...@googlegroups.com
On Wed, Oct 17, 2012 at 10:00:18AM -0400, Matthew Gardner wrote:
> I'm running from the git repo. I'm trying to get more information to debug
> this, but it's a bit tricky, because it only happens after it's been
> running for an hour or more on a large dataset, and there is no traceback
> that I've seen. When I get some more specific information, I'll open a
> ticket on google code.

Sounds good. By the way, one other bit of information that might be
helpful is whether you're seeing it in both cpython and pypy or if it's
just happening in pypy. Usually they're the same, but if they aren't,
it's very good to know.

Andrew McNabb

unread,
Oct 19, 2012, 12:51:48 PM10/19/12
to mrs-ma...@googlegroups.com
On Wed, Oct 17, 2012 at 10:00:18AM -0400, Matthew Gardner wrote:
> I'm running from the git repo. I'm trying to get more information to debug
> this, but it's a bit tricky, because it only happens after it's been
> running for an hour or more on a large dataset, and there is no traceback
> that I've seen. When I get some more specific information, I'll open a
> ticket on google code.

Have you had any more luck tracking this down? I hate feeling like
there might be any major unresolved bugs.

Matthew Gardner

unread,
Oct 19, 2012, 1:09:12 PM10/19/12
to mrs-ma...@googlegroups.com
Sadly, I didn't have any luck.  My guess was that it was some kind of file IO issue - the code worked fine when I gave it small input, but when I gave it a medium-sized input it couldn't handle it anymore.  I was hoping I could get a simple map reduce to run on a single machine, but after seeing exactly how much data I wanted to use (the "medium-sized" input was still two or three orders of magnitude smaller than my end goal), I realized I really should just use our hadoop cluster.  And now I'm beating my head against hadoop =).  Mrs was certainly a whole lot easier to use, it just looks like my problem really needs hadoop.  If you're really interested in tracking this down, I can try to help, but I'm not sure that I'll see anything more than I've already told you - there wasn't ever any error I saw, even when using --mrs-debug (part of the problem was that after the initial error I saw, my group's machines started getting very heavily used, and I couldn't really run many more tests; I did get one or two running, but I never saw a more detailed error).  I suspect you'd have to do the kind of monitoring that you've used when tracking down nfs issues.  I'm still planning on making my program into a "well-documented example," and I can provide you with my code and data, if you want to try running it to see what you find.  And, if you can tell me specifically what to run, I can run it on my end.  Just let me know.

Matt

Andrew McNabb

unread,
Oct 19, 2012, 4:12:27 PM10/19/12
to mrs-ma...@googlegroups.com
On Fri, Oct 19, 2012 at 01:09:12PM -0400, Matthew Gardner wrote:
>
> Mrs was certainly a whole lot easier to use, it just
> looks like my problem really needs hadoop.

It sounds like you've hit a bug that we need to fix, but I don't have
any idea what "my problem really needs hadoop" means. :)

> If you're really interested in
> tracking this down, I can try to help, but I'm not sure that I'll see
> anything more than I've already told you - there wasn't ever any error I
> saw, even when using --mrs-debug (part of the problem was that after the
> initial error I saw, my group's machines started getting very heavily used,
> and I couldn't really run many more tests; I did get one or two running,
> but I never saw a more detailed error).

At one point I thought you mentioned getting a backtrace somewhere? Is
this the same thing, or are you hitting some other symptom?

Did you check for errors both on the master and on the slaves?

Is there an easy way for me to try to reproduce this locally?

> I'm
> still planning on making my program into a "well-documented example," and I
> can provide you with my code and data, if you want to try running it to see
> what you find. And, if you can tell me specifically what to run, I can run
> it on my end. Just let me know.

That sounds great. It would be helpful to see the example program and
any other information you have.

Thanks.

Matthew Gardner

unread,
Oct 19, 2012, 4:31:29 PM10/19/12
to mrs-ma...@googlegroups.com
On Fri, Oct 19, 2012 at 4:12 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
It sounds like you've hit a bug that we need to fix, but I don't have
any idea what "my problem really needs hadoop" means. :)
 
=).  Partly it means that the scale of the data I have needs a cluster, and the only cluster I have available is a hadoop cluster.  There are some 3 large workstations I have access to, but together it's not enough to do what I need, I think, particularly if I can't distribute the file io.  And partly it means that hadoop seems better able to handle really large scale data sets, owing to the fact that it's such a large project.  Perhaps if we figure out this bug you'll prove me wrong on that part =).

At one point I thought you mentioned getting a backtrace somewhere?  Is
this the same thing, or are you hitting some other symptom?

The only backtrace I've seen was the one I posted earlier, consisting entirely of pypy lib stuff. 

Did you check for errors both on the master and on the slaves?

Yep.
 
Is there an easy way for me to try to reproduce this locally?

I'm currently copying the data and code over to the aml machines.  It's about 3GB, and it's taking a little while.  But in 20 or 30 minutes, you can go to /aml/home/mjg82/mrs_testing, and there'll be a script called run.sh, which I've been using to test stuff.  You'll probably want to modify it a bit for however you want to test it on your machines.  I'm also re-running it here, now that my group's machines are available again, to see if I can find anything.  If I get the same ambiguous error I'll try it with python instead of pypy.

Andrew McNabb

unread,
Oct 19, 2012, 6:21:12 PM10/19/12
to mrs-ma...@googlegroups.com
On Fri, Oct 19, 2012 at 04:31:29PM -0400, Matthew Gardner wrote:
> On Fri, Oct 19, 2012 at 4:12 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
>
> > It sounds like you've hit a bug that we need to fix, but I don't have
> > any idea what "my problem really needs hadoop" means. :)
> >
>
> =). Partly it means that the scale of the data I have needs a cluster, and
> the only cluster I have available is a hadoop cluster. There are some 3
> large workstations I have access to, but together it's not enough to do
> what I need, I think, particularly if I can't distribute the file io.

Yeah, there's a big difference between a cluster and a workstation. So
you don't have ssh access to the cluster?

Part of me wonders whether you might have had problems from running too
many slaves on one workstation and running out of RAM. Is this
possibility easy to rule out?

> And
> partly it means that hadoop seems better able to handle really large scale
> data sets, owing to the fact that it's such a large project. Perhaps if we
> figure out this bug you'll prove me wrong on that part =).

Proving you wrong would be great. :) We did wordcount with some 50 GB
of Gutenberg files, and Hadoop kept crashing. When we shrank the
dataset, Hadoop finished but was an order of magnitude slower. So we're
not entirely happy with the scalability of Hadoop.

> The only backtrace I've seen was the one I posted earlier, consisting
> entirely of pypy lib stuff.

Does this backtrace happen every time you've seen problems? Did it
happen using both python and pypy?

I don't have a definite explanation for the backtrace, but as I think
about it, it occurs to me that it's not inconsistent with running out of
RAM.

> Did you check for errors both on the master and on the slaves?
>
> Yep.

And there are no backtraces of any sort? Or are you seeing the same
backtrace you saw earlier? Does this backtrace happen on the master or
on the slaves?

> I'm currently copying the data and code over to the aml machines. It's
> about 3GB, and it's taking a little while. But in 20 or 30 minutes, you
> can go to /aml/home/mjg82/mrs_testing, and there'll be a script called
> run.sh, which I've been using to test stuff. You'll probably want to
> modify it a bit for however you want to test it on your machines. I'm also
> re-running it here, now that my group's machines are available again, to
> see if I can find anything. If I get the same ambiguous error I'll try it
> with python instead of pypy.

Thanks for copying stuff--I think this will make it easier to track
down.

Matthew Gardner

unread,
Oct 19, 2012, 7:59:11 PM10/19/12
to mrs-ma...@googlegroups.com
On Fri, Oct 19, 2012 at 6:21 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
Yeah, there's a big difference between a cluster and a workstation.  So
you don't have ssh access to the cluster?

I can ssh into the compute nodes, but I don't think the managers of the cluster would look highly upon me bypassing their scheduling system and using compute nodes for non-hadoop tasks, if it would even let me.  I suppose I could write a hadoop job that is just a single "map," where the map task opens a python subprocess that talks to a master somewhere...  But then you'd have to worry about getting to HDFS and such.  Anyway...  =)
 
Part of me wonders whether you might have had problems from running too
many slaves on one workstation and running out of RAM.  Is this
possibility easy to rule out?

The machine I used had 200GB of RAM, so I think that possibility is unlikely, though it is possible.

Proving you wrong would be great. :)  We did wordcount with some 50 GB
of Gutenberg files, and Hadoop kept crashing.  When we shrank the
dataset, Hadoop finished but was an order of magnitude slower.  So we're
not entirely happy with the scalability of Hadoop.

Yeah, I'm beginning to think the scalability issues I am seeing are more to do with using a single workstation instead of a cluster.  My group's machine guru (our resident Andrew, you might say =)) noticed today that I was running a bunch of pypy jobs that were disk bound, and let me know that the RAID 6 my home directory is stored on is a particularly bad place to be doing the kind of disk io that mrs is doing.

And for the other stuff, I haven't seen the original stack trace again, but we did find an error.  I just started it running again to see if it works.  I'll let you know how it goes.

Matthew Gardner

unread,
Oct 19, 2012, 8:41:01 PM10/19/12
to mrs-ma...@googlegroups.com
By the way, if you want to compare scalability on this problem, our hadoop cluster can do the first map and reduce in about 3 minutes, using 50 reduce tasks.  If you want to run on the potatoes and see how long it takes, it would be a rough comparison.  I had one map task per input file (and I removed the two largest files, so there were 28 map tasks).  I'm not sure how many threads each map and reduce task could use, though.

Andrew McNabb

unread,
Oct 19, 2012, 8:56:09 PM10/19/12
to mrs-ma...@googlegroups.com
On Fri, Oct 19, 2012 at 07:59:11PM -0400, Matthew Gardner wrote:
>
> I can ssh into the compute nodes, but I don't think the managers of the
> cluster would look highly upon me bypassing their scheduling system and
> using compute nodes for non-hadoop tasks, if it would even let me. I
> suppose I could write a hadoop job that is just a single "map," where the
> map task opens a python subprocess that talks to a master somewhere... But
> then you'd have to worry about getting to HDFS and such. Anyway... =)

That sounds sad. :(

> The machine I used had 200GB of RAM, so I think that possibility is
> unlikely, though it is possible.

That's a good point.

> Yeah, I'm beginning to think the scalability issues I am seeing are more to
> do with using a single workstation instead of a cluster. My group's
> machine guru (our resident Andrew, you might say =)) noticed today that I
> was running a bunch of pypy jobs that were disk bound, and let me know that
> the RAID 6 my home directory is stored on is a particularly bad place to be
> doing the kind of disk io that mrs is doing.

Hmm. Most of the IO in Mrs should be in /tmp, which really shouldn't be
on RAID. But even then, if your program is processing a lot of data, it
might have trouble on a single machine, regardless of the number of
processors.

> And for the other stuff, I haven't seen the original stack trace again, but
> we did find an error. I just started it running again to see if it works.
> I'll let you know how it goes.

Sounds great. Keep us updated, and thanks again for your earlier bug
report.

Andrew McNabb

unread,
Oct 19, 2012, 9:18:56 PM10/19/12
to mrs-ma...@googlegroups.com
On Fri, Oct 19, 2012 at 07:59:11PM -0400, Matthew Gardner wrote:
>
> Yeah, I'm beginning to think the scalability issues I am seeing are more to
> do with using a single workstation instead of a cluster. My group's
> machine guru (our resident Andrew, you might say =)) noticed today that I
> was running a bunch of pypy jobs that were disk bound, and let me know that
> the RAID 6 my home directory is stored on is a particularly bad place to be
> doing the kind of disk io that mrs is doing.

Hey. I think this is your problem:

pypy create_matrix.py -I Slave -M `hostname`:$PORT --mrs-tmpdir=tmp

Get rid of the "--mrs-tmpdir=tmp", which is putting all of your
temporary files in your home directory, and your performance should
increase dramatically.

Matthew Gardner

unread,
Oct 19, 2012, 10:22:26 PM10/19/12
to mrs-ma...@googlegroups.com

I did that because there wasn't enough space in /var, which is where it defaults to. I can see if there's a local disk that has enough space on it, but I think it's unlikely.

Andrew McNabb

unread,
Oct 19, 2012, 10:31:28 PM10/19/12
to mrs-ma...@googlegroups.com
On Fri, Oct 19, 2012 at 10:22:26PM -0400, Matthew Gardner wrote:
> I did that because there wasn't enough space in /var, which is where it
> defaults to. I can see if there's a local disk that has enough space on it,
> but I think it's unlikely.

Yeah, trying to do that amount of IO to a remote filesystem can't
possibly work well. How small is the local disk that it's too small?
It seems crazy that there wouldn't be a reasonably sized disk.

I've made a few changes to the program in /home/amcnabb/c/mrs/matt which
you might want to see. Most of the changes are fairly minor, but you
might find them interesting.

I have it running right now. It looks like the biggest problem, so far
anyway, is the lack of granularity in the input files. A whole bunch of
processors are idle from the beginning, and after a couple of minutes,
almost all of them are idle waiting for the last two tasks or so to
finish. If there's any way to split the input files or make tasks that
can read portions of files, it will be much more efficient to
parallelize.

Anyway, I think we've figured out why things weren't working for you,
and I'm relieved that it's not Mrs' fault, though I'm also disappointed
on your behalf that you only get one machine to play with. :)

Matthew Gardner

unread,
Oct 20, 2012, 11:03:36 AM10/20/12
to mrs-ma...@googlegroups.com
On Fri, Oct 19, 2012 at 10:31 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
Yeah, trying to do that amount of IO to a remote filesystem can't
possibly work well.  How small is the local disk that it's too small?
It seems crazy that there wouldn't be a reasonably sized disk.

/var/run has about 95G available, and it was too small.  I looked, and there's just not that much local space available on our workstations - most of our space is on NFS mounted disks. 

I've made a few changes to the program in /home/amcnabb/c/mrs/matt which
you might want to see.  Most of the changes are fairly minor, but you
might find them interesting.

I did find them interesting.  I'm glad to see that you now have a reducemap =).  And I wasn't really familiar with struct.unpack, and the first thing I tried didn't work, so I guess I just reimplemented what was already there...  Oh well =).
 
I have it running right now.

How long did it take?  I see the time info in those job directories, but I'm not sure what the difference is between the times.
 
It looks like the biggest problem, so far
anyway, is the lack of granularity in the input files.  A whole bunch of
processors are idle from the beginning, and after a couple of minutes,
almost all of them are idle waiting for the last two tasks or so to
finish.  If there's any way to split the input files or make tasks that
can read portions of files, it will be much more efficient to
parallelize.

Yeah, I agree.  That would make it a lot easier in hadoop, too.  The problem is that I get those data files from someone else's code.  I think in the long term I'm going to modify that code to print out a text file instead of a binary file.  Right now it's 10 bytes per walk; if I wrote it in text, it would be about 8-10 bytes per int, 2-3 bytes for the short, and one byte for the newline, so around 22-25 bytes per walk.  That's not a huge difference, and the savings on the map reduce side are definitely worth it, I think.

And, incidentally, I ran into the same error last night that I saw the first time.  I won't put it here again - it's exactly the same as the one earlier in this thread.  And I'm pretty sure it's an out of memory error.  When I checked the code this morning that error had appeared somewhere in the middle of the night, and there were still pypy processes going that were consuming pretty much the entire 200GB of RAM.  I'm pretty sure each individual reduce task (the count_reduce, where this died) doesn't use that much memory.  Maybe I'm wrong, and there's one that's just really huge, and that's why it was using so much memory.  The error only happened on the last task on the last reduce.  And I just looked at the space being used in the tmp dir, and it was about a terabyte.  I guess that would explain why it was using so much memory (and why it wouldn't fit into any local disk that I have).  Is there anything here you want to see if you can fix, or is this memory and file usage just hopeless?

Andrew McNabb

unread,
Oct 22, 2012, 12:08:53 PM10/22/12
to mrs-ma...@googlegroups.com
On Sat, Oct 20, 2012 at 11:03:36AM -0400, Matthew Gardner wrote:
>
> /var/run has about 95G available, and it was too small. I looked, and
> there's just not that much local space available on our workstations - most
> of our space is on NFS mounted disks.

That's sad. That means that you have almost as much RAM as disk. :(

By the way (for future reference), if you really do need to use an NFS
server, "--mrs-shared" would do what you want. With "--mrs-tmpdir",
slave A writes to NFS, and when slave B needs the data, it contacts
slave A, which reads it from NFS and sends it over the network to slave
B. So, it's really bad. On the other hand, with "--mrs-shared" slave B
can read the data directly from NFS without contacting slave A. That
can work okay for CPU-bound programs like Particle Swarm Optimization,
but it's doomed to fail for programs with hundreds of gigabytes of I/O.

> I've made a few changes to the program in /home/amcnabb/c/mrs/matt which
> > you might want to see. Most of the changes are fairly minor, but you
> > might find them interesting.
>
> I did find them interesting. I'm glad to see that you now have a reducemap
> =). And I wasn't really familiar with struct.unpack, and the first thing I
> tried didn't work, so I guess I just reimplemented what was already
> there... Oh well =).

It was fun to go through and look for things that would be particularly
confusing for a user. I'm really fairly pleased with how your code
turned out. There were a few spots that weren't quite ideal, but you
weren't led toward doing things horribly wrong in any major way.

> > I have it running right now.
>
> How long did it take? I see the time info in those job directories, but
> I'm not sure what the difference is between the times.

So, at least in the Python code, it looks like a whole bunch of paths
are output in the second map operation. For example, if there were a
path A->B->C->D, it would spit out A, A->B, A->B->C, and A->B->C->D,
making the total amount of data O(n^2). Depending on how long these
paths can get (and it looks like they can get pretty long), this is a
little frightening.


> > It looks like the biggest problem, so far
> > anyway, is the lack of granularity in the input files. A whole bunch of
> > processors are idle from the beginning, and after a couple of minutes,
> > almost all of them are idle waiting for the last two tasks or so to
> > finish. If there's any way to split the input files or make tasks that
> > can read portions of files, it will be much more efficient to
> > parallelize.
> >
>
> Yeah, I agree. That would make it a lot easier in hadoop, too. The
> problem is that I get those data files from someone else's code. I think
> in the long term I'm going to modify that code to print out a text file
> instead of a binary file. Right now it's 10 bytes per walk; if I wrote it
> in text, it would be about 8-10 bytes per int, 2-3 bytes for the short, and
> one byte for the newline, so around 22-25 bytes per walk. That's not a
> huge difference, and the savings on the map reduce side are definitely
> worth it, I think.

The other possibility is to split the files. Since it's exactly 10
bytes per walk, you can jump to any spot in the middle of the file and
know exactly where a new record starts.

> And, incidentally, I ran into the same error last night that I saw the
> first time. I won't put it here again - it's exactly the same as the one
> earlier in this thread. And I'm pretty sure it's an out of memory error.
> When I checked the code this morning that error had appeared somewhere in
> the middle of the night, and there were still pypy processes going that
> were consuming pretty much the entire 200GB of RAM. I'm pretty sure each
> individual reduce task (the count_reduce, where this died) doesn't use that
> much memory. Maybe I'm wrong, and there's one that's just really huge, and
> that's why it was using so much memory. The error only happened on the
> last task on the last reduce. And I just looked at the space being used in
> the tmp dir, and it was about a terabyte. I guess that would explain why
> it was using so much memory (and why it wouldn't fit into any local disk
> that I have). Is there anything here you want to see if you can fix, or is
> this memory and file usage just hopeless?

I'm suspecting that it has something to do with the O(n^2) thing, but
I'm going to look into it a bit more and see if there's something else
going on.

Matthew Gardner

unread,
Oct 22, 2012, 1:08:29 PM10/22/12
to mrs-ma...@googlegroups.com
On Mon, Oct 22, 2012 at 12:08 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
By the way (for future reference), if you really do need to use an NFS
server, "--mrs-shared" would do what you want.  With "--mrs-tmpdir",
slave A writes to NFS, and when slave B needs the data, it contacts
slave A, which reads it from NFS and sends it over the network to slave
B.  So, it's really bad.  On the other hand, with "--mrs-shared" slave B
can read the data directly from NFS without contacting slave A.  That
can work okay for CPU-bound programs like Particle Swarm Optimization,
but it's doomed to fail for programs with hundreds of gigabytes of I/O.

That's good to know.
 
It was fun to go through and look for things that would be particularly
confusing for a user.  I'm really fairly pleased with how your code
turned out.  There were a few spots that weren't quite ideal, but you
weren't led toward doing things horribly wrong in any major way.

I just started from my own example in the code base (dependency_parse.py) and made the changes I needed to.  It was pretty easy.  And I'm glad I didn't do anything horribly wrong in any major way =).

So, at least in the Python code, it looks like a whole bunch of paths
are output in the second map operation.  For example, if there were a
path A->B->C->D, it would spit out A, A->B, A->B->C, and A->B->C->D,
making the total amount of data O(n^2).  Depending on how long these
paths can get (and it looks like they can get pretty long), this is a
little frightening.

Right, I discovered that problem when I was working with my hadoop code.  By construction of the data files, there are supposed to be a maximum of 11 steps per walk_id, so O(n^2) isn't that big a deal.  But there must be some error in the code I'm using to run the random walks, because there's one walk_id (I'm pretty sure it's walk_id 0) that ends up with many, many steps (I'm not sure exactly how many, because hadoop doesn't give me a good way to check, but it may be thousands or even millions).  If you put in a line that checks for way too many steps in a given walk, it becomes a lot more manageable.  Even after I made that fix in my mrs code, though, I had trouble running it on a single machine.

The other possibility is to split the files.  Since it's exactly 10
bytes per walk, you can jump to any spot in the middle of the file and
know exactly where a new record starts.

That's a really good idea.  I hadn't thought of that.  Thanks.

I'm suspecting that it has something to do with the O(n^2) thing, but
I'm going to look into it a bit more and see if there's something else
going on.

Right, as I mentioned above.  That fix should make it feasible to run on the potatoes, and maybe the --mrs-shared option would make it able to run on my machine.

Andrew McNabb

unread,
Oct 22, 2012, 1:13:28 PM10/22/12
to mrs-ma...@googlegroups.com
On Mon, Oct 22, 2012 at 01:08:29PM -0400, Matthew Gardner wrote:
> So, at least in the Python code, it looks like a whole bunch of paths
> > are output in the second map operation. For example, if there were a
> > path A->B->C->D, it would spit out A, A->B, A->B->C, and A->B->C->D,
> > making the total amount of data O(n^2). Depending on how long these
> > paths can get (and it looks like they can get pretty long), this is a
> > little frightening.
>
> Right, I discovered that problem when I was working with my hadoop code.
> By construction of the data files, there are supposed to be a maximum of
> 11 steps per walk_id, so O(n^2) isn't that big a deal. But there must be
> some error in the code I'm using to run the random walks, because there's
> one walk_id (I'm pretty sure it's walk_id 0) that ends up with many, many
> steps (I'm not sure exactly how many, because hadoop doesn't give me a good
> way to check, but it may be thousands or even millions). If you put in a
> line that checks for way too many steps in a given walk, it becomes a lot
> more manageable. Even after I made that fix in my mrs code, though, I had
> trouble running it on a single machine.

Would you please send me that fix, and then I can see if this completely
solves the problem?

> The other possibility is to split the files. Since it's exactly 10
> > bytes per walk, you can jump to any spot in the middle of the file and
> > know exactly where a new record starts.
>
> That's a really good idea. I hadn't thought of that. Thanks.

I'm glad I can help.

> Right, as I mentioned above. That fix should make it feasible to run on
> the potatoes, and maybe the --mrs-shared option would make it able to run
> on my machine.

Although it still won't be any faster than running it in serial, since
you'll be completely I/O-bound over a slow NFS connection. If you can
get at some reasonably sized local storage, then things will change.

Matthew Gardner

unread,
Oct 22, 2012, 1:19:13 PM10/22/12
to mrs-ma...@googlegroups.com
Just adding a line in collapse_reduce to check for pathologically long lists:

def collapse_reduce(self, key, values):
    v = list(values)
    if len(v) < 100:
        v.sort()
        yield v

You could also add a print statement in there and tell me exactly how pathologically long it is =).  I don't know how to get output from individual slaves in hadoop.

Andrew McNabb

unread,
Oct 22, 2012, 2:01:54 PM10/22/12
to mrs-ma...@googlegroups.com
On Mon, Oct 22, 2012 at 01:19:13PM -0400, Matthew Gardner wrote:
> Just adding a line in collapse_reduce to check for pathologically long
> lists:
>
> def collapse_reduce(self, key, values):
> v = list(values)
> if len(v) < 100:
> v.sort()
> yield v
>
> You could also add a print statement in there and tell me exactly how
> pathologically long it is =). I don't know how to get output from
> individual slaves in hadoop.

That little if-statement made a huge difference. Right now I'm tracking
down one last bug in Mrs that's being triggered by the overwhelming
amount of NFS activity in the final output phase. Your program is a
nice example for this. :)

One other issue: the final output looks wrong. An example line is:

100013-98720 defaultdict(<type 'int'>, {'100013-edge-3641-edge-98455-edge-3641-edge-97691-edge-97690-edge-97691-edge-3641-edge-98720': 1, '100013-edge-3641-edge-98948-edge-3641-edge-98720': 1, '100013-edge-3641-edge-98720': 1})

The formatting of the key makes sense--the formatting of the value (the
defaultdict) does not seem right. Do you have any insights about how
this should be formatted?

Matthew Gardner

unread,
Oct 22, 2012, 2:20:21 PM10/22/12
to mrs-ma...@googlegroups.com
On Mon, Oct 22, 2012 at 2:01 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
The formatting of the key makes sense--the formatting of the value (the
defaultdict) does not seem right.  Do you have any insights about how
this should be formatted?

The end goal is to get P(target node | source node, path).  This will be stored in a big (sparse) matrix of the form (node pair) x (path).  So, really, I need another map reduce at the end to normalize the probabilities, which I was initially hoping to avoid (i.e., you could normalize on the fly, whenever you needed to compute probabilities, but on second thought that seems intractable).  To just output the raw counts, unnormalized, you could emit one key-value pair for each entry in the dictionary, where the value is "%s: %d" % (path, count).  In addition, because these are supposed to be probabilities, it might be a good idea to try to smooth things out and ignore noise, so I put in a minimum count cut off for every path.  Even setting the minimum count to 2 leads to something like a 75% reduction in the number of output keys.

(Just in case you're wondering, if the paths are actually output as they currently are by the python code, I realize that P(target node | source node, path) will always be 1, because the target node is included in the path.  My first goal was just to get the map reduce working, then to engineer the path features a little bit - you can probably see that by the TODO that's in the code.)

Andrew McNabb

unread,
Oct 22, 2012, 2:32:21 PM10/22/12
to mrs-ma...@googlegroups.com
It's not that I need to be involved in the details for the specific
problem. Rather, if there's a 75% reduction of the output keys as you
suggest, then this makes a huge difference in the overall performance.

And speaking of performance, it can be hard to separate any performance
hit from using Python from any performance hit that's specific to Mrs.
I'm particularly interested in anything crazy that Mrs is doing wrong.
Since your problem is I/O bound and none of mine are, you've already
helped me uncover a couple of minor issues.

By the way, I would love to have a new version of your code where the
values in the initial local_data are (filename, byterange) pairs. To
make this happen, the run method could find the size of each file,
presumably using stat and maybe comparing with the file header just to
catch errors, and then split files into smaller blocks.

Fixing these two issues, the inefficiencies in the output format and the
poor granularity of the initial map task, would let me make a fairly
accurate estimate of how long Mrs takes for this particular problem.

On Mon, Oct 22, 2012 at 02:20:21PM -0400, Matthew Gardner wrote:
>
> The end goal is to get P(target node | source node, path). This will be
> stored in a big (sparse) matrix of the form (node pair) x (path). So,
> really, I need another map reduce at the end to normalize the
> probabilities, which I was initially hoping to avoid (i.e., you could
> normalize on the fly, whenever you needed to compute probabilities, but on
> second thought that seems intractable). To just output the raw counts,
> unnormalized, you could emit one key-value pair for each entry in the
> dictionary, where the value is "%s: %d" % (path, count). In addition,
> because these are supposed to be probabilities, it might be a good idea to
> try to smooth things out and ignore noise, so I put in a minimum count cut
> off for every path. Even setting the minimum count to 2 leads to something
> like a 75% reduction in the number of output keys.
>
> (Just in case you're wondering, if the paths are actually output as they
> currently are by the python code, I realize that P(target node | source
> node, path) will always be 1, because the target node is included in the
> path. My first goal was just to get the map reduce working, then to
> engineer the path features a little bit - you can probably see that by the
> TODO that's in the code.)

Andrew McNabb

unread,
Oct 22, 2012, 5:31:57 PM10/22/12
to mrs-ma...@googlegroups.com
I have some interesting data from a run using 69 slaves on 23 machines
in a cluster.

The total amount of output data was 33 GB.

The total run time was 41.5 minutes. This breaks down per dataset as
follows:

- map_walk_files: 12.8 minutes
- collapse_reduce_map_walk: 8.12 minutes
- count_reduce: 20.5 minutes

The performance of map_walk_files was limited by the lack of granularity
in the input files, and the performance of count_reduce was deeply
affected by the large amount of final output data being written to NFS
which was a significant bottleneck. If these two issues were addressed,
I could easily see the total run time being closer to 25 minutes.
Beyond that, there's probably some performance penalty to having
everything in Python, but hopefully it's not a large penalty.

After a few of these fixes, it might be interesting to try to do a
side-by-side comparison with Hadoop, though of course it's impossible to
set all of the factors to be equal.

Matthew Gardner

unread,
Oct 23, 2012, 12:21:42 PM10/23/12
to mrs-ma...@googlegroups.com
That's quite close to what I'm seeing with hadoop, actually, though the time distributions are a little different.

It's about 3 minutes for the first map and reduce, using 28 mappers (for the 28 input files) and 50 reducers (this suggests that the initial file io is faster in hadoop, or that java is faster at processing the data file than python).

It's then something like 15-20 minutes each for the next map and reduce, using 50 machines each, mostly because there's one or two shards that take way longer than the others.  Perhaps your partition function on the first reduce and the second map is a little better than hadoop's partition function.

Too bad I don't have a cluster I can run mrs on =).

Andrew McNabb

unread,
Oct 23, 2012, 12:42:25 PM10/23/12
to mrs-ma...@googlegroups.com
On Tue, Oct 23, 2012 at 12:21:42PM -0400, Matthew Gardner wrote:
> That's quite close to what I'm seeing with hadoop, actually, though the
> time distributions are a little different.
>
> It's about 3 minutes for the first map and reduce, using 28 mappers (for
> the 28 input files) and 50 reducers (this suggests that the initial file io
> is faster in hadoop, or that java is faster at processing the data file
> than python).

We also have half as many machines, and the files aren't perfectly
distributed across them. I'm sure that both Hadoop and Mrs will do
better once the granularity issue is addressed, but I have a feeling
that Mrs might benefit just a little bit more. Of course, this is
assuming that all else is equal, which it isn't (it looks like you have
twice as many machines as we do).

> It's then something like 15-20 minutes each for the next map and reduce,
> using 50 machines each, mostly because there's one or two shards that take
> way longer than the others. Perhaps your partition function on the first
> reduce and the second map is a little better than hadoop's partition
> function.
>
> Too bad I don't have a cluster I can run mrs on =).

The one or two shards that take longer in the next map and reduce
shouldn't make a huge difference. It's noticeable, but not horrible.
However, this effect is more significant for Hadoop, which has no
reduce-map operation, than for Mrs. I'm guessing the partition
functions are roughly equivalent and that the main difference is that
Hadoop is spending a lot of time writing to HDFS.

So anyway, it looks like Mrs is a bit faster overall, despite being run
on a smaller cluster and running "slow" python code. :) And I would
still like to do a rematch with improved input granularity and sane
output because I think these changes would make the comparison even more
favorable.

Andrew McNabb

unread,
Oct 23, 2012, 2:39:29 PM10/23/12
to mrs-ma...@googlegroups.com
On Tue, Oct 23, 2012 at 10:42:25AM -0600, Andrew McNabb wrote:
>
> And I would
> still like to do a rematch with improved input granularity and sane
> output because I think these changes would make the comparison even more
> favorable.

I've fixed the input granularity problem in
examples/contrib/create_matrix.py in the Mrs repo. Feel free to take a
look at it and even to make the same changes in Java. The first map
phase now takes 2 minutes 21 seconds. And I think there's one other
change we can make that will reduce the time even more.

Matt, would you be able to tackle the output formatting problem? I
think that the final output should be much, much less than 33 GB.

Andrew McNabb

unread,
Oct 23, 2012, 2:53:40 PM10/23/12
to mrs-ma...@googlegroups.com
On Mon, Oct 22, 2012 at 03:31:57PM -0600, Andrew McNabb wrote:
> The total run time was 41.5 minutes. This breaks down per dataset as
> follows:
>
> - map_walk_files: 12.8 minutes
> - collapse_reduce_map_walk: 8.12 minutes
> - count_reduce: 20.5 minutes

Here's a full breakdown for the new run:

- total time: 19.9 minutes
- map_walk_files: 2.4 minutes
- collapse_reduce_map_walk: 8.4 minutes
- count_reduce: 9.1 minutes

I was surprised by the huge improvement in count_reduce. The main
difference I can think of is that I had the output data dump to NFS on a
workstation instead of on the central server. So there was less NFS
congestion and no RAID to worry about. Anyway, I'm pretty happy with
the sub-20 minute time, and I'm sure that we can get it even better.

Matthew Gardner

unread,
Oct 23, 2012, 3:08:44 PM10/23/12
to mrs-ma...@googlegroups.com
On Tue, Oct 23, 2012 at 2:39 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
Matt, would you be able to tackle the output formatting problem?  I
think that the final output should be much, much less than 33 GB.

It's probably not worth it to try formatting the output; filtering it is what you really care about, I think.  Here's a new count_reduce that does the filtering (I got about 3GB of out from hadoop when I ran it with this filtering):

    def count_reduce(self, key, values):
        counts = defaultdict(int)
        for v in values:
            counts[v] += 1
        MIN_COUNT = 2
        for path, count in counts.items():
            if count < MIN_COUNT:
                del counts[path]
        if counts:
            yield counts

And your sub-20 minute runtime is quite impressive.  Good work.

Kevin Seppi

unread,
Oct 23, 2012, 3:24:05 PM10/23/12
to mrs-ma...@googlegroups.com
I have a couple of questions:

1) Since we have posted this as an example, could we get a description of what it is doing, either in graph-theory terms and (or) application terms?
2) Have you mentioned this to any one else there? Is there any chance of getting others using it? It really seems that mrs is (much) easier to program to and also faster, even on a smaller cluster. Some one ought to care.

--Kevin 

Matthew Gardner

unread,
Oct 23, 2012, 3:28:57 PM10/23/12
to mrs-ma...@googlegroups.com
On Tue, Oct 23, 2012 at 3:24 PM, Kevin Seppi <kse...@byu.edu> wrote:
1) Since we have posted this as an example, could we get a description of what it is doing, either in graph-theory terms and (or) application terms?

Did you want that description here, or in the code?  If you want it in the code, how should I best update the file?  I don't know much about forking on github.
 
2) Have you mentioned this to any one else there? Is there any chance of getting others using it? It really seems that mrs is (much) easier to program to and also faster, even on a smaller cluster. Some one ought to care.

That's a good point.  I'll see what I can do.

Kevin Seppi

unread,
Oct 23, 2012, 7:30:26 PM10/23/12
to mrs-ma...@googlegroups.com
If you want to write a brief description in e-mail, we can get it in.

Andrew McNabb

unread,
Oct 24, 2012, 10:11:48 AM10/24/12
to mrs-ma...@googlegroups.com
On Tue, Oct 23, 2012 at 03:28:57PM -0400, Matthew Gardner wrote:
>
> Did you want that description here, or in the code? If you want it in the
> code, how should I best update the file? I don't know much about forking
> on github.

In the code would be ideal, but as Kevin mentioned, we can get it in if
you do email.

If you clone our git repository, then you can push it anywhere you want,
and I can pull from it. The easiest thing is probably to create a bare
repository on the aml system with:

aml$ git init --bare mrs.git

and then push to it from your system with

$ git push aml.cs.byu.edu:mrs.git master

It's really not too bad.

Andrew McNabb

unread,
Oct 24, 2012, 2:01:36 PM10/24/12
to mrs-ma...@googlegroups.com
On Tue, Oct 23, 2012 at 03:08:44PM -0400, Matthew Gardner wrote:
>
> It's probably not worth it to try formatting the output; filtering it is
> what you really care about, I think. Here's a new count_reduce that does
> the filtering (I got about 3GB of out from hadoop when I ran it with this
> filtering):
>
> def count_reduce(self, key, values):
> counts = defaultdict(int)
> for v in values:
> counts[v] += 1
> MIN_COUNT = 2
> for path, count in counts.items():
> if count < MIN_COUNT:
> del counts[path]
> if counts:
> yield counts

With this change, the total time is now 11.6 minutes. Just out of
curiosity, what's your final time in Hadoop?

> And your sub-20 minute runtime is quite impressive. Good work.

Thanks.

Matthew Gardner

unread,
Oct 24, 2012, 2:15:25 PM10/24/12
to mrs-ma...@googlegroups.com
On Wed, Oct 24, 2012 at 2:01 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
With this change, the total time is now 11.6 minutes.  Just out of
curiosity, what's your final time in Hadoop?

The previous time I said was including that last change, though the machines had a few problems (their disks were getting full, and there were other people using them).  I just ran it again in hadoop this morning, with an extra map reduce to normalize the counts that come out.  The machines were totally empty, and these are the times I saw:

First map reduce: 2 minutes, 30 seconds
Second map reduce: 4 minutes, 23 seconds
Third map reduce: 2 minutes, 23 seconds

So, about 7 minutes, when you count just the work that you ran on mrs.  It's a pretty huge discrepancy between the 45 minutes I was seeing over the weekend.  Oh, there was another very important difference - I used 100 splits instead of 50.  So, that means I used 100 machines for the each map and reduce after the first map (which only used 28), and I likely got a more even split, so there wasn't a big partition holding the other tasks up.  I'm not sure how many threads the code used on each machine.

Andrew McNabb

unread,
Oct 24, 2012, 2:34:01 PM10/24/12
to mrs-ma...@googlegroups.com
On Wed, Oct 24, 2012 at 02:15:25PM -0400, Matthew Gardner wrote:
>
> First map reduce: 2 minutes, 30 seconds
> Second map reduce: 4 minutes, 23 seconds
> Third map reduce: 2 minutes, 23 seconds
>
> So, about 7 minutes, when you count just the work that you ran on mrs.
> It's a pretty huge discrepancy between the 45 minutes I was seeing over
> the weekend.

That's a huge difference. So yours was a total of 9.3 minutes and would
be faster if you improve the granularity of the initial map task as we
did in Mrs--I'm guessing about a minute faster.

It's not apples to apples, but it looks like it's about 8 minutes on 100
machines for Hadoop (with the granularity fix) vs. about 11 and a half
minutes on 23 machines for Mrs. That's a pretty solid win for Mrs.

It's probably impossible to set all of the factors equal, but if you
send me the Java code (preferably copying the granularity fix from my
version), I can run it on our cluster to compare side by side.

Matthew Gardner

unread,
Oct 24, 2012, 2:51:12 PM10/24/12
to mrs-ma...@googlegroups.com
On Wed, Oct 24, 2012 at 2:34 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
It's not apples to apples, but it looks like it's about 8 minutes on 100
machines for Hadoop (with the granularity fix) vs. about 11 and a half
minutes on 23 machines for Mrs.  That's a pretty solid win for Mrs.

But you're using 3 threads per machine?  So is it really 69 vs. 100?  Either way, though, you're right that mrs is quite impressive.

It's probably impossible to set all of the factors equal, but if you
send me the Java code (preferably copying the granularity fix from my
version), I can run it on our cluster to compare side by side.

I can send you my hadoop code, but the granularity fix isn't the same - you have to modify an InputFormat class, which I'm planning on doing at some point, but it's just handled a bit differently in hadoop.

And, I pushed some documentation for create_matrix.py, and it's at /aml/home/mjg82/mrs_modified.git. 

Andrew McNabb

unread,
Oct 24, 2012, 3:06:20 PM10/24/12
to mrs-ma...@googlegroups.com
On Wed, Oct 24, 2012 at 02:51:12PM -0400, Matthew Gardner wrote:
> But you're using 3 threads per machine? So is it really 69 vs. 100?
> Either way, though, you're right that mrs is quite impressive.

That's possible. Either way, we're definitely about at the point where
little differences and tweaks matter.

> I can send you my hadoop code, but the granularity fix isn't the same - you
> have to modify an InputFormat class, which I'm planning on doing at some
> point, but it's just handled a bit differently in hadoop.

Even in Hadoop, it might be easier to do it the way we did it in Mrs
(the walks files wouldn't technically be the "input files").

> And, I pushed some documentation for create_matrix.py, and it's at
> /aml/home/mjg82/mrs_modified.git.

Thanks, I've pulled this, and it makes the example much more clear.

Matthew Gardner

unread,
Oct 24, 2012, 3:39:19 PM10/24/12
to mrs-ma...@googlegroups.com
On Wed, Oct 24, 2012 at 3:06 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
On Wed, Oct 24, 2012 at 02:51:12PM -0400, Matthew Gardner wrote:
> I can send you my hadoop code, but the granularity fix isn't the same - you
> have to modify an InputFormat class, which I'm planning on doing at some
> point, but it's just handled a bit differently in hadoop.

Even in Hadoop, it might be easier to do it the way we did it in Mrs
(the walks files wouldn't technically be the "input files").

I put my hadoop code (without trying to fix the granularity of the input files - I have other things to get to first) in /aml/home/mjg82/mrs_testing/hadoop_code, along with all of the libraries you should need to get it running (at least, everything that I need to run it; I don't have to worry about setting up the hadoop cluster itself).

Chris Monson

unread,
Oct 24, 2012, 4:03:18 PM10/24/12
to mrs-ma...@googlegroups.com
On Wed, Oct 24, 2012 at 8:51 PM, Matthew Gardner <m...@cs.cmu.edu> wrote:
On Wed, Oct 24, 2012 at 2:34 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
It's not apples to apples, but it looks like it's about 8 minutes on 100
machines for Hadoop (with the granularity fix) vs. about 11 and a half
minutes on 23 machines for Mrs.  That's a pretty solid win for Mrs.

But you're using 3 threads per machine?  So is it really 69 vs. 100?  Either way, though, you're right that mrs is quite impressive.

I get a little tear in my eye when I remember our conversation in Singapore, there by the bay, about how Andrew could probably implement a simple mapreduce in a week. Then he went and did it.

And now it's awesome. I love Andrew. It's a brotherly, geeky sort of love, but it's definitely love.

:-)

Andrew McNabb

unread,
Oct 24, 2012, 4:34:56 PM10/24/12
to mrs-ma...@googlegroups.com
On Wed, Oct 24, 2012 at 10:03:18PM +0200, Chris Monson wrote:
>
> I get a little tear in my eye when I remember our conversation in
> Singapore, there by the bay, about how Andrew could probably implement a
> simple mapreduce in a week. Then he went and did it.
>
> And now it's awesome. I love Andrew. It's a brotherly, geeky sort of love,
> but it's definitely love.

Thanks for the geeky brotherly love. I sincerely appreciate it.

And for the record, Mrs definitely took more than a week to be awesome.
:)

Andrew McNabb

unread,
Oct 25, 2012, 5:31:26 PM10/25/12
to mrs-ma...@googlegroups.com
On Wed, Oct 24, 2012 at 12:34:01PM -0600, Andrew McNabb wrote:
>
> It's not apples to apples, but it looks like it's about 8 minutes on 100
> machines for Hadoop (with the granularity fix) vs. about 11 and a half
> minutes on 23 machines for Mrs. That's a pretty solid win for Mrs.
>
> It's probably impossible to set all of the factors equal, but if you
> send me the Java code (preferably copying the granularity fix from my
> version), I can run it on our cluster to compare side by side.

We were able to get the time in Mrs down to 9.8 minutes (on 24 machines)
by specifying custom serializers. The default serializer is Pickle,
which is extremely convenient, but you can also specify serialize
functions and deserialize functions of your choice. We thought that
this might give a minor improvement, but we were surprised that it would
improve the time by more than 10%.

Sorry to keep adding to this endless thread, but we keep finding
interesting results. Matt, thanks again for this example--it's been
really helpful for testing.

Jeffrey Lund

unread,
Oct 27, 2012, 3:25:03 PM10/27/12
to mrs-ma...@googlegroups.com
On Wednesday, October 24, 2012 12:15:46 PM UTC-6, Matthew Gardner wrote:

So, that means I used 100 machines for the each map and reduce after the first map (which only used 28), and I likely got a more even split, so there wasn't a big partition holding the other tasks up.

When we are running your hadoop code, we are seeing 30 tasks, as opposed to your 28. Any reason why we are not seeing the same thing?
-Jeff 

Andrew McNabb

unread,
Oct 27, 2012, 3:50:01 PM10/27/12
to mrs-ma...@googlegroups.com
On Sat, Oct 27, 2012 at 12:25:03PM -0700, Jeffrey Lund wrote:
>
> When we are running your hadoop code, we are seeing 30 tasks, as opposed to
> your 28. Any reason why we are not seeing the same thing?

And one other followup question: I noticed that the Hadoop program has
three maps and three reduces (three MapReduce jobs), but the Mrs program
only has two maps and two reduces. Are the two programs doing different
work?

Matthew Gardner

unread,
Oct 27, 2012, 8:05:52 PM10/27/12
to mrs-ma...@googlegroups.com
On Sat, Oct 27, 2012 at 3:25 PM, Jeffrey Lund <jeff...@gmail.com> wrote:
When we are running your hadoop code, we are seeing 30 tasks, as opposed to your 28. Any reason why we are not seeing the same thing?

That's because I deleted the two largest files, as I mentioned way back near the beginning of the thread, like 40 messages ago =). 

Matthew Gardner

unread,
Oct 27, 2012, 8:13:14 PM10/27/12
to mrs-ma...@googlegroups.com
On Sat, Oct 27, 2012 at 3:50 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
And one other followup question: I noticed that the Hadoop program has
three maps and three reduces (three MapReduce jobs), but the Mrs program
only has two maps and two reduces.  Are the two programs doing different
work?

I mentioned this when I gave my last timing results, but it looks like I wasn't clear enough.  I actually implemented the third normalization step in my hadoop code, and it was only the first two map reduces that are comparable to the work being done in python.  So it was just 7 of the 10 minutes that are directly comparable to what you were doing with the python code.

It shouldn't be hard to put the last map reduce in, so the code is complete and outputs a well-formed matrix.  I could do that next week if you want me to.  Our "research programmer," who manages the machines and whatnot, informs me that we're getting 10 decent sized machines from Yahoo soon, and that might be a nice place to try running mrs here.  So, it makes sense from my end too to finish the python code.

Andrew McNabb

unread,
Oct 29, 2012, 12:16:53 PM10/29/12
to mrs-ma...@googlegroups.com
On Sat, Oct 27, 2012 at 08:05:52PM -0400, Matthew Gardner wrote:
>
> That's because I deleted the two largest files, as I mentioned way back
> near the beginning of the thread, like 40 messages ago =).

I remember you saying that, but I thought it was just a temporary thing.
Anyway, we've been running it with all of the data in both Mrs and
Hadoop. Is there anything wrong with the data from the two biggest
files, or is it just inconvenient to have large files.

Matthew Gardner

unread,
Oct 29, 2012, 1:11:22 PM10/29/12
to mrs-ma...@googlegroups.com
On Mon, Oct 29, 2012 at 12:16 PM, Andrew McNabb <amc...@mcnabbs.org> wrote:
Is there anything wrong with the data from the two biggest
files, or is it just inconvenient to have large files.

There shouldn't be anything wrong with it; I was just running out of disk space on our hadoop cluster because the input files were too large and the cluster had some old files it needed to clean up. 
Reply all
Reply to author
Forward
0 new messages