reducer key imbalance

29 views
Skip to first unread message

Esé

unread,
Nov 26, 2009, 3:51:30 PM11/26/09
to cascading-user
Hey folks,

I have an interesting question regarding data partitioning via keys.
To simplify the problem down to its essentials, let's say I am trying
to find the # of unique viewers per channel from a stream of tuples

(c1, v1), (c2, v2), (c1, v3), (c2, v2), (c1, v1)

i.e. pairs of channel ids and viewer ids. The final goal is to have an
output of something like:

c1, 2
c2, 1

indicating the unique viewers per channel.

The straighforward thing to do would be to groupby channel and sort by
viewerid and count the distinct viewerids that way. But this approach
runs into issues where some channels have lots and lots of viewers and
other channels have not as much. In other words, many reducers sit
around idle waiting for a few to finish.

One possible workaround for this might be to groupby both channel
*and* viewerId as a first step. The output of this step would give us
a stream of tuple counts such as:

(c1, 1), (c2, 2), (c1, 1)

that we could then simply add up by channel id since we know they are
unique counts. The trouble is this step, while having a high degree of
parallelization, doesn't reduce the original tuple space *that much*.
We'll still have to groupby channelId eventually and deal with that
partition imbalance.

However, I am wondering whether it's possible to get around this issue
by somehow spreading the tuple aggregation to multiple reducers as an
intermediate step - i.e. for example via grouping by (channelId + some
other key), summing up by channel id and *then* as a final step,
grouping by channelid for the final sum.

Just wondering if this makes sense or is there some easy solution or
primitive in cascading I can use.

Thanks!

E.

Mikhail Yakshin

unread,
Nov 27, 2009, 3:14:34 AM11/27/09
to cascadi...@googlegroups.com
On Thu, Nov 26, 2009 at 11:51 PM, Esé <opusdp...@gmail.com> wrote:
> Hey folks,
>
> I have an interesting question regarding data partitioning via keys.
> To simplify the problem down to its essentials, let's say I am trying
> to find the # of unique viewers per channel from a stream of tuples

[...]

> However, I am wondering whether it's possible to get around this issue
> by somehow spreading the tuple aggregation to multiple reducers as an
> intermediate step - i.e. for example via grouping by (channelId + some
> other key), summing up by channel id and *then* as a final step,
> grouping by channelid for the final sum.
>
> Just wondering if this makes sense or is there some easy solution or
> primitive in cascading I can use.

The simplest thing you can do here is adding some sort of hash value
for every row (based on visitor id) - c, v, h - for example, h = v %
SOME_MAGICAL_NUMBER_N.

Then you do group by (c, h), make count, group by (c), make sum.
Experiment with SOME_MAGICAL_NUMBER_N for best result. Note that you
can just write a specific partitioner for task #1 if you were using
regular Hadoop without Cascading, so I wondered some time ago if there
are some grand plans to introduce custom partitioning schemes to
Cascading:

http://groups.google.com/group/cascading-user/browse_thread/thread/bcc0bebc72959a3/9997a916a8d09b4b

--
WBR, Mikhail Yakshin

Chris K Wensel

unread,
Nov 27, 2009, 4:39:37 PM11/27/09
to cascadi...@googlegroups.com
not sure we will support custom partitioners since you can just promote a value into the tuple stream and have it partitioned on. a lot less magic and more obvious to people whats going on that way.

ckw
> --
>
> You received this message because you are subscribed to the Google Groups "cascading-user" group.
> To post to this group, send email to cascadi...@googlegroups.com.
> To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.
>
>

--
Chris K Wensel
ch...@concurrentinc.com
http://www.concurrentinc.com

Esé

unread,
Nov 28, 2009, 12:34:21 PM11/28/09
to cascading-user
Mikhail, Chris - thanks for your help and suggestions! Creating a
smaller secondary key and grouping on that + channelId definitely
seems to be the way to go.

Assuming that viewerId keyspace is pretty well distributed, I wonder
if the SOME_MAGICAL_NUMBER_N can simply be the number of reducers? For
example, in Amazon's Elastic Map Reduce, assuming all the instances
were c1.medium, that would be (2 * # of instances) being used in the
job.

Thanks!
> >http://groups.google.com/group/cascading-user/browse_thread/thread/bc...
>
> > --
> > WBR, Mikhail Yakshin
>
> > --
>
> > You received this message because you are subscribed to the Google Groups "cascading-user" group.
> > To post to this group, send email to cascadi...@googlegroups.com.
> > To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
> > For more options, visit this group athttp://groups.google.com/group/cascading-user?hl=en.

Mikhail Yakshin

unread,
Nov 28, 2009, 3:49:41 PM11/28/09
to cascadi...@googlegroups.com
Hi,

> Assuming that viewerId keyspace is pretty well distributed, I wonder
> if the SOME_MAGICAL_NUMBER_N can simply be the number of reducers? For
> example, in Amazon's Elastic Map Reduce, assuming all the instances
> were c1.medium, that would be (2 * # of instances) being used in the
> job.

Yep, "k * number of reducers" where k = 1..3 is a good starting value.
Different data-specific things may prove that other values of k could
be more efficient in some cases.

--
WBR, Mikhail Yakshin
Reply all
Reply to author
Forward
0 new messages