secondary sort

2,029 views
Skip to first unread message

eckamm

unread,
May 10, 2012, 10:43:03 AM5/10/12
to mr...@googlegroups.com
I'd like to use mrjob to do what I think is called a "secondary sort" in my job.

That is, the key used in the partitioning (say (field1)) is a part of the key used in the sorting (say (field1, field2)).

This allows the reducer to work from a sorted iterator on (field1, field2) rather than bringing all the rows grouped with (field1) into RAM for sorting in my reducer which is the bad practice I employ right now because my mapper's key is (field1).

Has anyone done this with mrjob?

Dave Marin

unread,
May 11, 2012, 2:54:37 PM5/11/12
to mr...@googlegroups.com
You can do something like this by specifying a custom partitioner
class (--partitioner), but we haven't yet been able to make it do what
we want.

For more information on our attempts, see:

https://github.com/Yelp/mrjob/issues/240

-Dave
--

Yelp is looking to hire great engineers! See http://www.yelp.com/careers.

Shivkumar Shivaji

unread,
May 11, 2012, 4:48:09 PM5/11/12
to mr...@googlegroups.com
Interestingly, this sort of problem is why I integrated pig with mrjob at my workplace. I still need to address some review issues on my pull request.

This feels like the sort of query that is better solved by a high level language.

Shiv

Brandon Haynes

unread,
May 12, 2012, 10:15:41 AM5/12/12
to mr...@googlegroups.com
For some reason I didn't realize this was an open issue in MRJob.  I've been running a jobs using a specified key comparator and partitioner for some time now as part of my schimmy map/reduce pattern implementation.
 
I'm travelling at the moment but will pull together the details together when I return next week.
 
Brandon

On Friday, May 11, 2012 1:54:37 PM UTC-5, Dave Marin wrote:
You can do something like this by specifying a custom partitioner
class (--partitioner), but we haven't yet been able to make it do what
we want.

For more information on our attempts, see:

    https://github.com/Yelp/mrjob/issues/240

-Dave

On Thu, May 10, 2012 at 7:43 AM, eckamm wrote:
> I'd like to use mrjob to do what I think is called a "secondary sort" in my
> job.
>
> That is, the key used in the partitioning (say (field1)) is a part of the
> key used in the sorting (say (field1, field2)).
>
> This allows the reducer to work from a sorted iterator on (field1, field2)
> rather than bringing all the rows grouped with (field1) into RAM for sorting
> in my reducer which is the bad practice I employ right now because my
> mapper's key is (field1).
>
> Has anyone done this with mrjob?



--

Yelp is looking to hire great engineers! See http://www.yelp.com/careers.

On Friday, May 11, 2012 1:54:37 PM UTC-5, Dave Marin wrote:
You can do something like this by specifying a custom partitioner
class (--partitioner), but we haven't yet been able to make it do what
we want.

For more information on our attempts, see:

    https://github.com/Yelp/mrjob/issues/240

-Dave

On Thu, May 10, 2012 at 7:43 AM, eckamm wrote:
> I'd like to use mrjob to do what I think is called a "secondary sort" in my
> job.
>
> That is, the key used in the partitioning (say (field1)) is a part of the
> key used in the sorting (say (field1, field2)).
>
> This allows the reducer to work from a sorted iterator on (field1, field2)
> rather than bringing all the rows grouped with (field1) into RAM for sorting
> in my reducer which is the bad practice I employ right now because my
> mapper's key is (field1).
>
> Has anyone done this with mrjob?



--

Yelp is looking to hire great engineers! See http://www.yelp.com/careers.

On Friday, May 11, 2012 1:54:37 PM UTC-5, Dave Marin wrote:
You can do something like this by specifying a custom partitioner
class (--partitioner), but we haven't yet been able to make it do what
we want.

For more information on our attempts, see:

    https://github.com/Yelp/mrjob/issues/240

-Dave

On Thu, May 10, 2012 at 7:43 AM, eckamm wrote:
> I'd like to use mrjob to do what I think is called a "secondary sort" in my
> job.
>
> That is, the key used in the partitioning (say (field1)) is a part of the
> key used in the sorting (say (field1, field2)).
>
> This allows the reducer to work from a sorted iterator on (field1, field2)
> rather than bringing all the rows grouped with (field1) into RAM for sorting
> in my reducer which is the bad practice I employ right now because my
> mapper's key is (field1).
>
> Has anyone done this with mrjob?



--

Yelp is looking to hire great engineers! See http://www.yelp.com/careers.

On Friday, May 11, 2012 1:54:37 PM UTC-5, Dave Marin wrote:
You can do something like this by specifying a custom partitioner
class (--partitioner), but we haven't yet been able to make it do what
we want.

For more information on our attempts, see:

    https://github.com/Yelp/mrjob/issues/240

-Dave

On Thu, May 10, 2012 at 7:43 AM, eckamm wrote:
> I'd like to use mrjob to do what I think is called a "secondary sort" in my
> job.
>
> That is, the key used in the partitioning (say (field1)) is a part of the
> key used in the sorting (say (field1, field2)).
>
> This allows the reducer to work from a sorted iterator on (field1, field2)
> rather than bringing all the rows grouped with (field1) into RAM for sorting
> in my reducer which is the bad practice I employ right now because my
> mapper's key is (field1).
>
> Has anyone done this with mrjob?



--

Yelp is looking to hire great engineers! See http://www.yelp.com/careers.

On Friday, May 11, 2012 1:54:37 PM UTC-5, Dave Marin wrote:
You can do something like this by specifying a custom partitioner
class (--partitioner), but we haven't yet been able to make it do what
we want.

For more information on our attempts, see:

    https://github.com/Yelp/mrjob/issues/240

-Dave

eckamm

unread,
May 21, 2012, 10:28:29 AM5/21/12
to mr...@googlegroups.com
I figured out a way to do this.   In my case, I wanted to partition by user_id and sort by (user_id, date).
  • I used "INTERNAL_PROTOCOL = RawValueProtocol".
  • I embedded a composite key tab-delimited at the front of the yielded value (i.e. "user_id\tdate\t").   Yield None as the key since it is ignored given the protocol choice.
  • I added the following options to the mrjob command line:
    • --partitioner=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
    • --jobconf=stream.test.key.partitioner.options=-k1,1    (tells the partitioner to only use the first field as the partition key)
    • --jobconf=stream.num.map.output.key.fields=2     (tells the sort/shuffle to treat the first two field as the key)
  • The reducer key will be None as per how the protocol works.   You can retrieve the key from your record "manually".  In my case, the key was everything to the left of the second tab in the value, just like I embedded it.

Would like to hear about other approaches to doing this with mrjob.

--Eric


Steve Johnson

unread,
May 21, 2012, 11:28:54 AM5/21/12
to mr...@googlegroups.com
Oh, that's nice. How useful would it be for us to duplicate this behavior in the local runner for testing purposes?

Steve Johnson

unread,
May 21, 2012, 11:32:27 AM5/21/12
to mr...@googlegroups.com
Also meant to say: we may consider supporting composite keys in the mapper/reducer function signatures in the future, depending on demand, ease of implementation, presence of pull requests, etc.

I'm glad you were able to get KeyFieldBasedPartitioner to work. We've tried it at Yelp but had some strange issues with Hadoop adding tabs where they didn't belong.

Eric Kamm

unread,
May 21, 2012, 12:23:29 PM5/21/12
to mr...@googlegroups.com
It took a few tries before I saw a path forward using RawValue as the internal protocol. 

Seems like you might need a custom streaming and/or partitioner jars to pull off the combination of mrjob style protocols and composite keys supporting secondary sorts unless you were to stick with only using your normal Hadoop-friendly delimited records (at least for the key portion of the record) rather than JSON, repr, etc. serialization.

-Eric

Brandon Haynes

unread,
May 22, 2012, 9:21:51 AM5/22/12
to mr...@googlegroups.com
Sorry for the delay in responding -- I'm only back in town this morning.

It looks like you independently came up with a solution that is largely similar to mine.  I perform my configuration in code, as:

# Partitioner configuration
self.options.partitioner = 'org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner'
self.options.jobconf['mapred.text.key.partitioner.options'] = '-k1,1n'
self.options.jobconf['map.output.key.field.separator'] = ','

# Comparator configuration
self.options.jobconf['mapred.output.key.comparator.class'] = 'org.apache.hadoop.mapred.lib.KeyFieldBasedComparator'
self.options.jobconf['mapred.text.key.comparator.options'] = '-k2.2,2n'

The difference here is that I am using the ReprProtocol protocol with intermediate keys that are pairs of the form (partition, key).  Because of this, I identify the secondary key via the second character of the second key (2.2) and split the pair at the comma.  I am also dealing exclusively with integer keys ("n") so don't need to be concerned with additional commas in my intermediate pairs (this does interfere with the generalizability of my approach).  With these constraints, however, I do not have to do any additional parsing within the reducer (in fact, its signature is "reducer(self, (partition, key), values)").

I also recall that this configuration (frustratingly) changed when moving from Hadoop 0.18 to 0.20, so this might explain why there was historical difficulty in getting this to work.

Brandon

eckamm

unread,
May 23, 2012, 11:12:16 AM5/23/12
to mr...@googlegroups.com
Thanks!  This is a great example of using KeyFieldBasedPartitioner and KeyFieldBasedComparator.

Now I see how you can write a Protocol with Hadoop-friendly keys for use with KeyFieldBased* and any kind of values you like (e.g. JSON, pickle, etc.).

The tricky part for me then becomes ensuring the necessary hadoop options are used for each step in a multi-step mrjob program (i.e. each invocation of hadoop may need different options).  I only have the experience of setting these "globally" for a mrjob program.  

--Eric

ra...@imagnaanalytics.com

unread,
Jan 16, 2013, 4:37:38 AM1/16/13
to mr...@googlegroups.com
Interestingly I found out out that if your field2 is  a string value then mrjob automatically sorts the values based on field 2. I have posted this on SO
http://stackoverflow.com/questions/14353842/how-does-mapreduce-sort-and-shuffle-work

Brandon Haynes

unread,
Jan 16, 2013, 8:44:05 AM1/16/13
to mr...@googlegroups.com
As far as I am aware, Hadoop does not guarantee ordering of the value sequence presented to a reducer (I believe this _is_ a feature of Google's implementation).

You are likely either experiencing an input-specific phenomenon (e.g. where your splits occur such that you later shuffle in total order) or running under the local/inline runner (where the intermediate results are piped through sort for convenience).

You should not depend on any specific value ordering in your reducers.

Brandon

ra...@imagnaanalytics.com

unread,
Jan 18, 2013, 9:19:33 AM1/18/13
to mr...@googlegroups.com
Yeah, I agree with you now. What I observed was only happening at the local machine. When I ported to actual hadoop cluster it did not sort anything.

Bhaskar Upadhyay

unread,
Apr 23, 2013, 8:44:32 PM4/23/13
to mr...@googlegroups.com
Hey Brandon,

Here's what I am trying to do.
I get a set of <id(int), count(int)> pairs from the reducer.
And I need to sort them by count.
I tried flipping the pair as <count, id> as mrjob inherently sorts by key.
Did not work.

Then I tried adding your code under mapper_init() and still does not work.
Is there anything I am missing?
I am a newbie using EMR and mrjob. Any help will be appreciated.

PS will your code work on EMR as well as on a local machine?

-Bhaskar

Brandon Haynes

unread,
Apr 24, 2013, 8:28:43 AM4/24/13
to mr...@googlegroups.com
Hi Bhaskar,

The approach I outline will not work under the local or inline runners; you will need to be running on "real" Hadoop (e.g., on EMR) to perform a secondary sort.

If you can establish an upper bound on the maximum count you will encounter, you might be able to get away with left-padding your count value with zeros and letting Hadoop just do an ordinary lexicographical sort on the resulting value (e.g., for key 5 and count 10, with a maximum possible count of 1000, you could form the compound key ('0010', 5).  If you encode the count (e.g. base64), you can achieve a lot with this method, but it is critical that each to-be-sorted attribute be in lexicographic order.

If you can't get away with this, you will indeed need to establish a secondary sort as has been discussed above.  Debugging this can be a painful process, and all I can really offer is general guidance.  I suggest first examining your job on the AWS console to make sure that your secondary-sort metadata is being forwarded properly.  Make sure that you are on a correct AMI version (you'll need to be running at least Hadoop 0.20).  If this all looks good, the next step will be to SSH into a node and examining the logs and/or stepwise results.

Hope this helps!

Brandon

Patrick Boocock

unread,
Aug 21, 2015, 7:08:47 PM8/21/15
to mrjob
Hi All,

What I'm finding (as of version 0.4.5) is that I can get the records to arrive at the reducer in order, but that they're partitioned into individual records such that, for a given key, there's only one value in the generator, and there are multiple records arriving at the reducer with that key.

I've searched around and seem to have found the solution to be the group comparator, e.g. http://stackoverflow.com/questions/14728480/what-is-the-use-of-grouping-comparator-in-hadoop-map-reduce. I haven't found a way to implement this natively in Python and am trying to find a way to ensure that:

1. my records arrive at the reducer properly partitioned by the natural key, and sorted on a secondary field
2. that the values are all grouped together for each natural key

I've considered writing an additional reduce step that can take the natural key/composite key pairs to group them and yield to a second reducer that does the aggregation, but I'm curious if you all have any other thoughts on how to do the grouping properly.

Thanks,
Patrick
Reply all
Reply to author
Forward
0 new messages