group by and apply the same reduce method on all (non-group by) fields

33 views
Skip to first unread message

mstr...@gmail.com

unread,
Aug 21, 2017, 5:25:59 PM8/21/17
to Scalding Development
Hi all,

In Field-based API, is there any way to group by a field and apply the same reduce method on all other fields?

I'm thinking about something like:

pipe.groupBy(new Fields("fieldName"))(_.reduce(Fields.ALL -> Fields.ARGS){ (accum:TupleEntry, next:TupleEntry) =>
      someMethod(accum, next)
    })

the above code gets compiled but I believe it does not generate the output I expect (in the output schema in the execution trace I only see the groupby field name and nothing else).

As a concrete example, assume the following is your pipe:

field1, field2, field3
1       , 2      , 3
1       , 1      , 1
2,      , 10    , 1

I want to group the pipe by field1 and sum up the values in the other fields so that the output is:

field1, field2, field3
1       , 13      , 5


of course the logic that I want to implement in practice is more complex than simple summation and yes I don't know how many fields I have in the pipe, so, using typed api is not an option.

Thanks!

Oscar Boykin

unread,
Aug 21, 2017, 5:42:08 PM8/21/17
to mstr...@gmail.com, Scalding Development
in the typed API, this would be something like:

val input: TypedPipe[(K, (T, T, T, T, T))] = ???

input.group.sum(Semigroup.semigroup5(Semigroup.from { (t1, t2) => fn(t1, t2) }))

There is not convenient way to do that in the Fields API that I see at the moment.

--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

mstr...@gmail.com

unread,
Aug 22, 2017, 2:11:04 PM8/22/17
to Scalding Development, mstr...@gmail.com
Thanks very much Oscar. Seems that even in the typed api we need to know the number of columns (or the type structure) in advance (e.g. TypedPipe[(K, (T, T, T, T, T))]). What if you are dynamically building up a pipe and you don't know how many columns you will have at compile time (let's say all I know is that the first column is the one I want to group by and I don't have any idea about how many other columns are in the pipe). Do you know if typed api is able to handle such scenario?
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.

Cyrille Chépélov

unread,
Aug 22, 2017, 2:19:06 PM8/22/17
to mstr...@gmail.com, Scalding Development

Hi,

In the variable column scenario, you can build a TupleGetter that transforms your Cascading tuples into something that looks like

   case class KeyedRecord[K, T](key: K, values: Iterable[T])

Then your TypedPipe is TypedPipe[KeyedRecord[K, T]]

(fancier stuff quite possible, if you can derive a stronger subtype consuming an exact amount of columns based on the leftmost columns, e.g parsing EDI records)

  - - Cyrille

Envoyé avec AquaMail pour Android
http://www.aqua-mail.com

mstr...@gmail.com

unread,
Aug 22, 2017, 2:28:59 PM8/22/17
to Scalding Development, mstr...@gmail.com
hmm, I guess this is what I've been looking for. I'll give it a shot. Thanks for your help Cyrille!

mstr...@gmail.com

unread,
Aug 23, 2017, 2:48:00 PM8/23/17
to Scalding Development, mstr...@gmail.com
I was able to implement the logic I was looking for using the typed api, however, I believe Field-based api is also able to handle this scenario. In case anyone reading this thread is interested you should be able to do it as follows:

pipe.groupBy('some_field){_.reduce(Fields.VALUES -> Fields.ARGS){(cumulativeTuple:Tuple,next:Tuple) =>  //do whatever you want here and return a tuple } }

Fields.VALUES makes sue you iterate over the non-group field and Fields.ARGS makes sure you will have as many output fields as the number of non-grouping fields.

Hope it helps.
Reply all
Reply to author
Forward
0 new messages