More Scalding plus a few docs questions

238 views
Skip to first unread message

ANithian

unread,
Jul 1, 2012, 2:52:41 PM7/1/12
to cascadi...@googlegroups.com
Hello all,

I am playing with Scalding and reading through the documentation and examples. Not being well versed in Scala but knowing Cascading, I have a few (seemingly simple) questions:
1)  In the documentation (https://github.com/twitter/scalding/wiki/Fields-based-API-Reference#wiki-join-functions) I see certain functions say "existingFields" and some say "originalFields" I assume they should both read the same?
2) Looking at the source, flatMap and flatMapTo will simply emit multiple tuples to the outputcollector. Does that mean that the anonymous function passed to the flatMap/flatMapTo must always return something iterable? (Even if it's an array of length 1)? Would clearly stating this help others?
3) I haven't played with a complex example but when you invoke map/flatmap (plus the To equivalents), do I simply pass in the fields being operated on? From what I have done in the past with Java Cascading, I would pass in the input fields, the fields being operated on, the actual operation, and the list of resulting output fields. In Scalding, how does this change if at all?

Also I have gotten it such that I can run a scalding job in Eclipse (as opposed to running using scald.rb). I didn't see any documentation on this so if this is of interest, I will probably document my setup in a blog post and submit to this list for review.

Thanks again!
Amit

Oscar Boykin

unread,
Jul 1, 2012, 4:29:23 PM7/1/12
to cascadi...@googlegroups.com
On Sun, Jul 1, 2012 at 11:52 AM, ANithian <anit...@gmail.com> wrote:
Hello all,

I am playing with Scalding and reading through the documentation and examples. Not being well versed in Scala but knowing Cascading, I have a few (seemingly simple) questions:
1)  In the documentation (https://github.com/twitter/scalding/wiki/Fields-based-API-Reference#wiki-join-functions) I see certain functions say "existingFields" and some say "originalFields" I assume they should both read the same?

Yes, feel free to make that improvement to the wiki!
 
2) Looking at the source, flatMap and flatMapTo will simply emit multiple tuples to the outputcollector. Does that mean that the anonymous function passed to the flatMap/flatMapTo must always return something iterable? (Even if it's an array of length 1)? Would clearly stating this help others?

The type signature of flatMap/To takes a function U => Iterable[T], so you are correct.  But we could also add this note to the wiki.  Again, I will be in your debt if you do so.
 
3) I haven't played with a complex example but when you invoke map/flatmap (plus the To equivalents), do I simply pass in the fields being operated on? From what I have done in the past with Java Cascading, I would pass in the input fields, the fields being operated on, the actual operation, and the list of resulting output fields. In Scalding, how does this change if at all?

So, scalding has one rule for fields resolution, where-as cascading allows you to specify how that is done.  In a map/flatMap you pass two sets of fields represented by a scala tuple of symbols (strings, and integers also work, but we usually use symbols, which are written with a single quote to start):

('x, 'y) -> 'z

This means in the input fields are Fields("x", "y") and the output fields are Fields("z").  In scalding, if the set of fields are disjoint, we keep ALL the fields, so the resulting pipe will be ('x,'y,'z).  If one set is a subset of the other: e.g. ('x, 'y) -> 'x or 'x -> ('x,'y) we SWAP the input fields with the output fields.  Finally, if there is an overlap but not strict subset, this is an error (not allowed by scalding).
 
This logic is here:

The exception is mapTo/flatMapTo/unpackTo/packTo, which always only keeps the output fields.


Also I have gotten it such that I can run a scalding job in Eclipse (as opposed to running using scald.rb). I didn't see any documentation on this so if this is of interest, I will probably document my setup in a blog post and submit to this list for review.

That sounds great. Please also make a link from the wiki to your post.

We are very enthusiastic about accepting code and documentation contributions from the community.
 
Thanks again!

Thank you!
 
Amit

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/vTLg9yYHtPEJ.
To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.



--
Oscar Boykin :: @posco :: https://twitter.com/intent/user?screen_name=posco

ANithian

unread,
Jul 4, 2012, 3:07:14 AM7/4/12
to cascadi...@googlegroups.com
Oscar,

Thanks for the help! I'll make the edits in the wiki. Regarding #3, I may be misunderstanding but what happens to fields in the pipe that weren't operated on hence not specified in the field tuples for input/output.

Example:
If I have a pipe with fields A B C D and I wish to produce a new field E that is a sum of A B, the fields tuple would simply be ('A,'B) -> E with some anonymous function to return 'E' as the sum of A,B ? What happens to fields C D in this case?

I may be confusing things and apologies if so!

Thanks
Amit

Oscar Boykin

unread,
Jul 16, 2012, 2:32:26 PM7/16/12
to cascadi...@googlegroups.com
Sorry for the delay in answering:

In the map phase (map/flatMap/pack/unpack) the rule is: if the target fields are new (disjoint from the input fields), they are appended.  If source or target fields are a subset of the other, only the results are kept.  Otherwise, you get an exception at flow planning stage (there is some overlap but not subset relationship).

If you use mapTo/flatMapTo/packTo/unpackTo, only the results are kept.

In the groupBy, the keys are always kept and only the target fields.  So, groupBy('x) { _.sum('y -> 'ys).sum('z -> 'zs) } will return a pipe with three columns:
('x, 'ys, 'zs).

Joins keep all the fields

I put this here:

Does that help?

Can you help clarify that page if you have any other thoughts?

--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To view this discussion on the web visit https://groups.google.com/d/msg/cascading-user/-/t0supLxZUJkJ.

To post to this group, send email to cascadi...@googlegroups.com.
To unsubscribe from this group, send email to cascading-use...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/cascading-user?hl=en.

Dirk Guijt

unread,
Apr 17, 2015, 6:49:49 AM4/17/15
to cascadi...@googlegroups.com
Hi Oscar, 

I just stumbled across issue #3 myself and it took a while for me to figure out what was going on and why it was behaving like that. I now know that it behaves like this due to the flow planning issue, after reading this thread and the Field-rules Github page. Now I'm by no means a scala/scalding and/or cascading expert, but can't this be solved easily?

For example:

Suppose we have a pipe with fields 'A, 'B, 'C and 'D. Now if I want to construct a field 'E with a value that is constructed from 'B, 'C, and 'D, I can do the following:
pipe.map(('B, 'C, 'D) -> 'E) { operation('B, 'C, 'D) }
Which results in a pipe with values 'A, 'B, 'C, 'D and 'E. Perfect!

Now if I want to replace 'B with a value that is constructed from 'B, 'C, and 'D, I can do the following:
pipe.map(('B, 'C, 'D) -> 'B) { operation('B, 'C, 'D) }
The result of this is a pipe with only fields 'A and 'B. This is kind of unexpected in my opinion, especially since the map function behaves differently for various inputs with respect to dropping or propagating fields. I would expect the pipe now to still have all the fields mentioned before, like in the 'E case. 

So if I need 'C later on in the process, I now use the following to solve the problem:
pipe.map(('B, 'C, 'D) -> ('B, 'C)) { (operation('A, 'B, 'C), 'C) }

Why can't the map function be build to always forward the values in the subset case? Are there technical and/or performance reasons for this?

If there are good reasons, than I advice to change the documentation on https://github.com/twitter/scalding/wiki/Fields-based-API-Reference to highlight this behaviour. You can't understand this behaviour based on that page at all while I think you should be able to.

Kind regards,

Dirk
Reply all
Reply to author
Forward
0 new messages