Strange (or inconsistent) behaviour for GroupBy -> SortBy

10 views
Skip to first unread message

ravi kiran holur vijay

unread,
Sep 10, 2016, 9:12:04 PM9/10/16
to Scalding Development
Hello,

I am noticing strange behaviour in my Scalding job which uses groupby and sortby. If I do not have a .sortBy function, each of the reducers are getting all of the values for the same group key. However, if I use .sortBy, each reducer is getting only part of the values for the same group key. I was wondering if any of you have run into a similar issue before or have a hypothesis about what's happening?

Case 1: Observed behaviour = Expected behaviour, without using sortBy

Reducer 1 output:

Processing data for group ... 1
Initializing FM Model with existing parameters ...
Processing model param ... o
Processing model param ... w
Processing model param ... r
Processing model param ... s
Processing model param ... t
Processing model param ... l
Processing model param ... f
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1

Reducer 2 output:
Processing data for group ... 0
Initializing FM Model with existing parameters ...
Processing model param ... o
Processing model param ... w
Processing model param ... r
Processing model param ... s
Processing model param ... t
Processing model param ... l
Processing model param ... f
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1

Case 2: Observed behaviour != Expected behaviour, after using sortBy
Reducer 1 output
Processing data for group ... 0
Initializing FM Model with existing parameters ...
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Initialized FM Model with: w0=0.000000, w=0, v=5, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0
Processing data for group ... 1
Initializing FM Model with existing parameters ...
Processing model param ... f
Processing model param ... l
Processing model param ... o
Processing model param ... r
Processing model param ... s
Processing model param ... t
Processing model param ... w
Initialized FM Model with: w0=-0.181250, w=34531087, v=0, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1

Reducer 2 output
Processing data for group ... 0
Initializing FM Model with existing parameters ...
Processing model param ... f
Processing model param ... l
Processing model param ... o
Processing model param ... v
Processing model param ... r
Processing model param ... s
Processing model param ... t
Processing model param ... w
Initialized FM Model with: w0=-0.181250, w=34531087, v=1, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1
Processing data for group ... 1
Initializing FM Model with existing parameters ...
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Initialized FM Model with: w0=0.000000, w=0, v=4, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0

Code
val data: TypedPipe[(Int, Float, Either[FeatureVector, FMModelParameter])] = modelData
val fmModels: SortedGrouped[Int, FMModel] = data
.groupBy { case (id1, id2, modelParam) => id1 }
.sortBy { case (id1, id2, modelParam) => id2 }
.forceToReducers
//Secondary is needed to ensure model parameters appear before actual training data
//TODO: This sortby is causing problems and has a bug
.mapGroup {
case (groupId, records) =>
println("Processing data for group ... " + groupId)
val trainedModel = aggregateAndUpdateModel(records)
Iterator(trainedModel)
}

Oscar Boykin

unread,
Sep 10, 2016, 11:06:29 PM9/10/16
to ravi kiran holur vijay, Scalding Development
Sorry, I don't follow. What did you expect. I don't see a bug.

The data looks sorted within groups, which is all sortBy does.

Note, you don't need forceToReducers here. Sorting can only be done on the reducers.
--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

ravi kiran holur vijay

unread,
Sep 10, 2016, 11:33:07 PM9/10/16
to Oscar Boykin, Scalding Development
Hey Oscar,

Sorry, sounds like I might have misunderstood the semantics of groupBy followed by sortBy.
Is there a way to make sure ALL records having the same key end up at the same reducer (what groupBy does) and within each reducer, have it sorted by value (what sortBy does)?

-Ravi

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

Oscar Boykin

unread,
Sep 11, 2016, 12:14:09 AM9/11/16
to ravi kiran holur vijay, Scalding Development
Wait, sorry. Looking more carefully. Is the bug that originally all data with id 0 was in one reducer but with sorting it winds up on both? That would be a bug. What version of scalding is this?

Can you replicate this bug in a minimal case? Sorting should not change how the keys are paritioned to reducer (which is done by hashCode of the key, which is the same, I suppose).

Basically the test you want to write is that after groupBy with sortBy if you take only the keys in the output each key appears exactly once.

I have a hard time believing there could have been a bug like this that we didn't notice for 5 years but I guess it is possible.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.

Oscar Boykin

unread,
Sep 11, 2016, 12:15:42 AM9/11/16
to ravi kiran holur vijay, Scalding Development
Can you file an issue if nothing else to track the discussion?

ravi kiran holur vijay

unread,
Sep 11, 2016, 5:23:48 PM9/11/16
to Oscar Boykin, Scalding Development
Hey Oscar,

Yes, that's correct. I am seeing data with id 0 being distributed to multiple reducers, which sounds counterintuitive to what a groupBy followed by a sortBy should do. However, if I comment the line with sortBy, I see data with id 0 ending up at a single reducer. I filed a new issue to track this and will work on coming up with a minimal test case for replicating this.

I am using Scalding 0.15.0 with Cascading 2.6.3 running on hadoop-0.20.1-dev-qubole distribution.

-Ravi

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

Oscar Boykin

unread,
Sep 11, 2016, 6:33:15 PM9/11/16
to ravi kiran holur vijay, Scalding Development
Just to be sure can you try with scalding 0.16.0?
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages