Skewed group count across reducers

33 views
Skip to first unread message

PaulON

unread,
Aug 11, 2016, 5:43:30 AM8/11/16
to cascading-user
Hey,

We have the following scenario that we are trying to solve.
I have X tuples (where X is typically ~10M)
I need to send these tuples to an external resource but I need to control the number of threads doing this sending (to not overload the resource)
There may be repeating keys, if so, they need to be sent together.

What we have attempted is

GroupBy(KEY)
Every(Fields.ALL, new UniformGroupingBuffer())

Where UniformGroupingBuffer adds a GROUP field with the value of the current sliceNum
We are setting the number of reducers on this task to match the number of threads we want to send over.

Then

GroubBy(GROUP)
Every(Fields.ALL, new Sendingbuffer())
Again we set the number of reducers to the number of threads we want to send over.

The problem is thatin the first buffer we see a nice distribution of tuples, however in the next buffer there is a massive skew (1 reducer taking multiples of groups more than the other ones)

Is this expected?
Any advice/experience on how to achieve this?
The data is still evenly distributed across the groups, just the groups are not distributed across the reducers.


Cheers!
Paul

Andre Kelpe

unread,
Aug 11, 2016, 6:29:41 AM8/11/16
to cascading-user
It may be that the second grouping key has a very uneven hash
distribution. We are using murmur3 internally (since 2.7) for doing
the partitioning, but I wonder if maybe the hashCode implementation of
the GROUP field is causing this:

https://github.com/cwensel/cascading/blob/c3895f4fb144cc3bbc0e01e37e2ce583a1919918/cascading-core/src/main/java/cascading/tuple/util/TupleHasher.java#L173

- André
> --
> You received this message because you are subscribed to the Google Groups
> "cascading-user" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to cascading-use...@googlegroups.com.
> To post to this group, send email to cascadi...@googlegroups.com.
> Visit this group at https://groups.google.com/group/cascading-user.
> To view this discussion on the web visit
> https://groups.google.com/d/msgid/cascading-user/72ae8fe6-1593-43be-a8b2-6f358329fb7f%40googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
André Kelpe
an...@concurrentinc.com
http://concurrentinc.com

Patrick Duin

unread,
Aug 12, 2016, 9:58:15 AM8/12/16
to cascading-user
Hi,

We've seen a similar problem we fixed it by setting back the old hasher:

 pipe.getStepConfigDef().setProperty(HasherPartitioner.HASHER_PARTITIONER_USE_LEGACY_HASH, Boolean.TRUE.toString());

right after the group by.
Not sure if there is a better solution but that fixed it for us.

Kind regards,
  Patrick

Andre Kelpe

unread,
Aug 12, 2016, 11:41:14 AM8/12/16
to cascading-user
That is good to know. Do you happen to have a minimal example where
you see that behavior?

- André
> https://groups.google.com/d/msgid/cascading-user/481ab4a6-28a7-40fc-9abe-527b7dff113d%40googlegroups.com.

Ken Krugler

unread,
Aug 12, 2016, 11:56:52 AM8/12/16
to cascadi...@googlegroups.com
When we have this situation in Bixo, we do something similar but with some key differences…

After grouping by the key, we add a field with a random int (where the same value is used for each Tuple in one group, of course).

The random int has a range of 0…number of threads -1.

Then we group by this new field, sort by the key, and we’re all good.

— Ken


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at https://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/72ae8fe6-1593-43be-a8b2-6f358329fb7f%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



PaulON

unread,
Aug 15, 2016, 6:15:02 AM8/15/16
to cascading-user
Thanks all,

Will try the old hasher, it does seem like that would cause the issue we see, it might even improve if we multiply all sliceNums by a random large int and see if they hash performs better.

Ken we originally were using a randomInt but that didnt seem to make much difference, are you using a large number of "final" groups?
Ours is pretty small (typically less than 10)

Ken Krugler

unread,
Aug 15, 2016, 11:30:14 AM8/15/16
to cascadi...@googlegroups.com
On Aug 15, 2016, at 3:15am, PaulON <pone...@gmail.com> wrote:

Thanks all,

Will try the old hasher, it does seem like that would cause the issue we see, it might even improve if we multiply all sliceNums by a random large int and see if they hash performs better.

Ken we originally were using a randomInt but that didnt seem to make much difference, are you using a large number of "final" groups?
Ours is pretty small (typically less than 10)

Usually we have a lot of groups (in the thousands), but it should work regardless of the number of groups, as the number of actual groups is tied to the number of concurrent threads, and the grouping key is just used for sorting.

— Ken


For more options, visit https://groups.google.com/d/optout.

Andre Kelpe

unread,
Aug 15, 2016, 11:39:16 AM8/15/16
to cascading-user
On Mon, Aug 15, 2016 at 5:30 PM, Ken Krugler
<kkrugle...@transpac.com> wrote:
>
> On Aug 15, 2016, at 3:15am, PaulON <pone...@gmail.com> wrote:
>
> Thanks all,
>
> Will try the old hasher, it does seem like that would cause the issue we
> see, it might even improve if we multiply all sliceNums by a random large
> int and see if they hash performs better.
>
> Ken we originally were using a randomInt but that didnt seem to make much
> difference, are you using a large number of "final" groups?
> Ours is pretty small (typically less than 10)
>
>
> Usually we have a lot of groups (in the thousands), but it should work
> regardless of the number of groups, as the number of actual groups is tied
> to the number of concurrent threads, and the grouping key is just used for
> sorting.
>
> — Ken


Ken, what do you mean with "threads" here? Cascading is not multi-threaded.

- André
> https://groups.google.com/d/msgid/cascading-user/488AD02E-E5CB-4787-85A6-C7217CB434CE%40transpac.com.
>
> For more options, visit https://groups.google.com/d/optout.



Ken Krugler

unread,
Aug 15, 2016, 1:19:56 PM8/15/16
to cascadi...@googlegroups.com
Hi André,

PaulON was talking about using the reduce phase to control the number of parallel requests being made to an API.

That’s similar to what we do in Bixo during a web crawl, to control the level of parallelism when making multi-threaded requests for web pages.

So it’s nothing to do with Cascading per-se, it’s how to control the load on a server in the middle of a Flow.

— Ken



For more options, visit https://groups.google.com/d/optout.

Andre Kelpe

unread,
Aug 16, 2016, 5:14:26 AM8/16/16
to cascading-user
Ah, that makes sense. Thanks for the clarification.

- André

On Mon, Aug 15, 2016 at 7:19 PM, Ken Krugler
> https://groups.google.com/d/msgid/cascading-user/78B31397-0D8F-49C6-9658-CA6BAA70EBD0%40transpac.com.
Reply all
Reply to author
Forward
0 new messages