Issues with flatMap

488 views
Skip to first unread message

Mohsen Jamali

unread,
Jan 25, 2013, 8:50:46 PM1/25/13
to spark...@googlegroups.com
I am trying to apply flatMap on a JavaPairRDD<Long,MyClass> to get another JavaPairRDD in the form of JavaPairRDD(Double,double[]).

Therefore, I implement a class that extends PairFlatMapFunction<Tuple2<Long,MyClass>, Double, double[]> (With the name MyFunc) and call an instance of it as the parameter for flatMap function.

However, I get a compilation error as follows:


scala: .....java:54: cannot find symbol
scala: symbol  : method flatMap(MyFunc)
scala: location: class spark.api.java.JavaPairRDD<java.lang.Long,MyClass>

.
.
.

I am wondering if the compilation error is due to the difference in the input type and output type of my map function (from MyClass to double[]). Note that I do not get an error if I do map rather that flatMap, but I need to output more than one tuple.

I appreciate if someone can help me.

Thanks!
Mohsen

Mohsen Jamali

unread,
Jan 25, 2013, 9:04:31 PM1/25/13
to spark...@googlegroups.com
A simple example:

            JavaPairRDD<Long,String> rdd1= null;
            JavaPairRDD<Double,Date> rdd2 = rdd1.flatMap(new PairFlatMapFunction<Tuple2<Long, String>, Double, Date>() {
                /** Composes two instances of Function1 in a new Function1, with this function applied last.
                 *
                 *  @tparam   A   the type to which function `g` can be applied
                 *  @param    g   a function A => T1
                 *  @return       a new function `f` such that `f(x) == apply(g(x))`
                 */
                        @Override
                        public <A> Function1<A, Iterable<Tuple2<Double, Date>>> compose(Function1<A, Tuple2<Long, String>> g) {
                    return null;
                }

                /** Composes two instances of Function1 in a new Function1, with this function applied first.
                 *
                 *  @tparam   A   the result type of function `g`
                 *  @param    g   a function R => A
                 *  @return       a new function `f` such that `f(x) == g(apply(x))`
                 */
                        @Override
                        public <A> Function1<Tuple2<Long, String>, A> andThen(Function1<Iterable<Tuple2<Double, Date>>, A> g) {
                    return null;
                }

                @Override
                public Iterable<Tuple2<Double, Date>> call(Tuple2<Long, String> longStringTuple2) throws Exception {
                    return null;
                }
            })    ;

This gives you compilation error;

Mohsen

Josh Rosen

unread,
Jan 26, 2013, 1:18:09 AM1/26/13
to spark...@googlegroups.com
This looks like a bug.  I created a complete standalone example that demonstrates the problem: https://gist.github.com/4640356.

I'll look into this and let you know if I find a solution or a short-term workaround.

One note: you shouldn't need to implement compose() and andThen() when implementing a PairFlatMapFunction; default implementations of these are inherited, but some Java IDEs don't seem to figure this out (perhaps we can fix this in a future release by explicitly implementing those methods in spark.Function).

- Josh

Mohsen Jamali

unread,
Jan 26, 2013, 2:23:47 PM1/26/13
to spark...@googlegroups.com
Thanks!

I actually do not implement compose() and andThen(). It is just the autogenerated code by IDEA and I just put them there. I normally only implement call();

Mohsen

Josh Rosen

unread,
Jan 26, 2013, 6:21:25 PM1/26/13
to spark...@googlegroups.com
I opened an issue to track this bug:


As a short-term workaround, you can write a FlatMapFunction that returns an Iterator of Tuples, then convert the result into a JavaPairRDD (casting won't work because JavaPairRDD is not a subclass of JavaRDD):

    import scala.reflect.ClassManifest;
    import scala.reflect.ClassManifest$;

    JavaPairRDD<Long, String> rdd1 = null;
    JavaRDD<Tuple2<Double, Date>> rdd2tmp = rdd1.flatMap(new FlatMapFunction<Tuple2<Long, String>, Tuple2<Double, Date>>() {
        @Override
        public Iterable<Tuple2<Double, Date>> call(Tuple2<Long, String> x) throws Exception {
            return null;
        }
    });
    ClassManifest cm = ClassManifest$.MODULE$.fromClass(Object.class);
    JavaPairRDD<Double, Date> rdd2 = new JavaPairRDD(rdd2tmp.rdd(), cm, cm);

- Josh

--
 
 

Josh Rosen

unread,
Jan 26, 2013, 7:36:23 PM1/26/13
to spark...@googlegroups.com
I found a workaround and fixed this bug in https://github.com/mesos/spark/pull/417

The fix will be included in the 0.6.2 / 0.7.0 releases.

- Josh
Reply all
Reply to author
Forward
0 new messages