getting/manupulating all fields in a pipe in scalding

35 views
Skip to first unread message

mstr...@gmail.com

unread,
Jun 30, 2017, 5:13:35 PM6/30/17
to Scalding Development
Here is the question:

Assume I have a pipe and I want to rename all the fields in the pipe programmatically, meaning that I don't want to hard code the field names in my code. Any idea how I can do this?

As a concrete example, assume I have a pipe with two fields: "name" and "age" and I want to rename these fields to "employee_name" and "employee_age". Obviously the natural solution is to write a piece of code as below:

pipe.rename(('name, 'age) -> ('employee_name, 'employee_age))

or

pipe.rename(new Fields("name", "age") ->  new Fields("employee_name", "employee_age"))

However, what I need is to be able to iterate through all fields in the pipe without knowing their names.

There are a couple of methods (resolveIncomingOperationArgumentFields and resolveIncomingOperationPassThroughFields) callable on a pipe which look promising but the issue is that they both take and input argument of type cascading.flow.planner.Scope which I don't know where can I get it from in a scalding job.

Another solution that comes to my mind is using "each" method on the pipe and implementing a cascading function and pass it to the each statement. But I was now able to find any sample code for that either.

Thanks!

Alex Levenson

unread,
Jun 30, 2017, 6:51:09 PM6/30/17
to mstr...@gmail.com, Scalding Development
Probably not what you want to hear, but the scalding dev team is really only developing + supporting the Typed API at this point -- which would make something like this even more difficult.
But the question I'd probably ask is what are you trying to do, and can you use strong types, the Typed Api, and maybe an extractor method or similar instead? 

--
You received this message because you are subscribed to the Google Groups "Scalding Development" group.
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
Alex Levenson
@THISWILLWORK

mstr...@gmail.com

unread,
Jun 30, 2017, 7:07:16 PM6/30/17
to Scalding Development, mstr...@gmail.com
Thanks for the reply Alex!

I'm trying to implement a couple of scenarios. The first scenario is pretty much what I explained in the post (i.e. appending a fixed prefix/suffix to every field name in a pipe). The second scenario, is that I want to iterate through all fields in a pipe and call a function on them based on their names. For example, let's say I have a bunch of different fields in a pipe and if the pipe name contains the string "_list_" I want to convert the List[Any] to a sparse representation of the list in the String format. I guess if I write a Cascading Function in java and invoke an "each" method on my pipe that should do the trick, but I was wondering if there is a cleaner/easier way of doing this in scalding:

import java.util.Iterator;
import cascading.operation.*;
import cascading.tuple.*;
import cascading.flow.*;

public class Sparser extends BaseOperation<Tuple> implements Function<Tuple>
{
public Sparser()
  {
  super(new Fields( "sum" ) );
  }

public Sparser( Fields fieldDeclaration )
  {
  super(fieldDeclaration );
  }

public void operate( FlowProcess flowProcess, FunctionCall<Tuple> functionCall )
  {
  // get the arguments TupleEntry
  Fields fieldNames = functionCall.getArgumentFields();
  TupleEntry arguments = functionCall.getArguments();

  // create a Tuple to hold our result values
  Tuple result = new Tuple();

  Iterator iterator = arguments.getTuple().iterator();
  int i = 0;
  while(iterator.hasNext())
  {
      Object obj = iterator.next();
      if (fieldNames.get(i).toString().contains("_list_")){
          java.util.List<Double> tmp = (java.util.List<Double>)obj;
          String sparsRepresentation = tmp.toString();// TO BE IMPLEMENTED
          result.add(sparsRepresentation);
      }
      else
          result.add((String)obj);
      i++;
  }

  // return the result Tuple
  functionCall.getOutputCollector().add( result );
  }
}

btw, I'm not sure if I understand what you mean by "an extractor method", can you please send me a pointer to an example?

Any input is greatly appreciated!
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Alex Levenson
@THISWILLWORK

Alex Levenson

unread,
Jun 30, 2017, 7:13:53 PM6/30/17
to mstr...@gmail.com, Scalding Development
Yeah, I guess what I was sort of getting as is that if you are using the Typed API, you try to use types instead of names for these sorts of things, and a lot of your code about casting one type to another goes away. But it can be painful to rewrite your entire world in this way. I'm not familiar enough with the un-typed API to tell you how to do this unfortunately, but at least here at Twitter we would try to push our users towards using strong types, and maybe the implicit typeclass pattern for extractors / converters / etc. For example, you can see how TypedPipe.sumByKey takes an implicit strongly typed Semigroup which explains how to "sum" two values. Similarly, you can create an implicit Sparser type class that is picked based on the types of the data at compile time.

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Alex Levenson
@THISWILLWORK

mstr...@gmail.com

unread,
Jun 30, 2017, 7:18:10 PM6/30/17
to Scalding Development, mstr...@gmail.com
Your suggestion makes sense. I will try to give typed API a try. Thanks very much for your input!

Alex Levenson

unread,
Jun 30, 2017, 7:25:17 PM6/30/17
to mstr...@gmail.com, Scalding Development
Sure! Sorry I don't know how to do that at the cascading layer.

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.



--
Alex Levenson
@THISWILLWORK

Koert Kuipers

unread,
Jun 30, 2017, 8:01:30 PM6/30/17
to mstr...@gmail.com, Scalding Development
you can do an operation on all fields along the lines of:

import cascading.tuple.{ Tuple => CTuple }
pipe.map(Fields.ALL -> Fields.ARGS){ (ctuple: CTuple) => ... }

however this does not give you access to the field names

--

mstr...@gmail.com

unread,
Jun 30, 2017, 8:31:50 PM6/30/17
to Scalding Development, mstr...@gmail.com
Thanks very much for the suggestion koert. Yes, this does the trick for me. However, as you mentioned I won't have access to the field names which I guess is fine. For the scenario that I explained, I can just check the tuple elements and see if they are of type List or not, if they are I will invoke the method I want, otherwise I won't touch their values.

Thanks again!
To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev...@googlegroups.com.

Oscar Boykin

unread,
Jul 1, 2017, 3:23:46 PM7/1/17
to Scalding Development, mstr...@gmail.com
You can do Koert's trick except with a function accepting TupleEntry and you will then have access to the field names.

mstr...@gmail.com

unread,
Jul 1, 2017, 6:36:14 PM7/1/17
to Scalding Development, mstr...@gmail.com
Thanks Oscar! By function, do you mean a cascading function or I can still do this in pure scalding?

Koert Kuipers

unread,
Jul 1, 2017, 7:29:19 PM7/1/17
to mstr...@gmail.com, Scalding Development
import cascading.tuple.{ Tuple => CTuple, TupleEntry}

pipe.map(Fields.ALL -> Fields.ARGS){ (te: TupleEntry) => ... }

You have access to the field names this way but cannot change then. Your function is TupleEntry => CTuple

To unsubscribe from this group and stop receiving emails from it, send an email to scalding-dev+unsubscribe@googlegroups.com.

mstr...@gmail.com

unread,
Jul 1, 2017, 7:44:53 PM7/1/17
to Scalding Development, mstr...@gmail.com
That's exactly what I was looking for. Many thanks Koert, Oscar, and Alex.
Reply all
Reply to author
Forward
0 new messages