Strange (or inconsistent) behaviour for GroupBy -> SortBy

瀏覽次數:11 次
跳到第一則未讀訊息

ravi kiran holur vijay

未讀,
2016年9月10日 晚上9:13:342016/9/10
收件者:cascading-user
Hello,

I am noticing strange behaviour in my Scalding job which uses groupby and sortby. If I do not have a .sortBy function, each of the reducers are getting all of the values for the same group key. However, if I use .sortBy, each reducer is getting only part of the values for the same group key. I was wondering if any of you have run into a similar issue before or have a hypothesis about what's happening?

Case 1: Observed behaviour = Expected behaviour, without using sortBy

Reducer 1 output:

Processing data for group ... 1
Initializing FM Model with existing parameters ...
Processing model param ... o
Processing model param ... w
Processing model param ... r
Processing model param ... s
Processing model param ... t
Processing model param ... l
Processing model param ... f
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1

Reducer 2 output:
Processing data for group ... 0
Initializing FM Model with existing parameters ...
Processing model param ... o
Processing model param ... w
Processing model param ... r
Processing model param ... s
Processing model param ... t
Processing model param ... l
Processing model param ... f
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Initialized FM Model with: w0=-0.181250, w=34531087, v=5, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1

Case 2: Observed behaviour != Expected behaviour, after using sortBy
Reducer 1 output
Processing data for group ... 0
Initializing FM Model with existing parameters ...
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Initialized FM Model with: w0=0.000000, w=0, v=5, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0
Processing data for group ... 1
Initializing FM Model with existing parameters ...
Processing model param ... f
Processing model param ... l
Processing model param ... o
Processing model param ... r
Processing model param ... s
Processing model param ... t
Processing model param ... w
Initialized FM Model with: w0=-0.181250, w=34531087, v=0, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1

Reducer 2 output
Processing data for group ... 0
Initializing FM Model with existing parameters ...
Processing model param ... f
Processing model param ... l
Processing model param ... o
Processing model param ... v
Processing model param ... r
Processing model param ... s
Processing model param ... t
Processing model param ... w
Initialized FM Model with: w0=-0.181250, w=34531087, v=1, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.010000, statsFreq=1000000, merged models=1
Processing data for group ... 1
Initializing FM Model with existing parameters ...
Processing model param ... v
Processing model param ... v
Processing model param ... v
Processing model param ... v
Initialized FM Model with: w0=0.000000, w=0, v=4, reg0=0.000000, regw=0.000000, regv=0.000000, lr=0.000000, statsFreq=0, merged models=0

Code
val data: TypedPipe[(Int, Float, Either[FeatureVector, FMModelParameter])] = modelData
val fmModels: SortedGrouped[Int, FMModel] = data
.groupBy { case (id1, id2, modelParam) => id1 }
.sortBy { case (id1, id2, modelParam) => id2 }
.forceToReducers
//Secondary is needed to ensure model parameters appear before actual training data
//TODO: This sortby is causing problems and has a bug
.mapGroup {
case (groupId, records) =>
println("Processing data for group ... " + groupId)
val trainedModel = aggregateAndUpdateModel(records)
Iterator(trainedModel)
}
回覆所有人
回覆作者
轉寄
0 則新訊息