Re: Trouble writing Hadoop MapReduce in Scala

195 views
Skip to first unread message

Arun Ramakrishnan

unread,
May 23, 2012, 3:03:40 AM5/23/12
to scala...@googlegroups.com
Let me know if anyone wants a full project setup of the same or even javap/scalap outputs.

On Wed, May 23, 2012 at 12:02 AM, Arun Ramakrishnan <sinchron...@gmail.com> wrote:
Not sure how many users here use hadoop. But, I am writing to this group as the test case i have is the HelloWorld of MapReduce,  the word count and believe its more of a Scala/Java interop issue. 

I narrowed it down to my Map task implementation. I say this because when i reference and use a Java implementation of a equivalent( seemingly so) Java version, everything works fine. 

Java version

public class TokenCounterMapperJava extends Mapper<LongWritable, Text, Text, LongWritable> {
    private final LongWritable one = new LongWritable(1);
    private Text word = new Text();
    @Override
    public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
        String[] splits = value.toString().split("\\s+");
        for (String split : splits ) {
            word.set( split );
            context.write(word, one);
        }
    }
}

Scala  version

class TokenCounterMapper extends Mapper[LongWritable, Text, Text, LongWritable] {
    val one = new LongWritable(1);
    val word = new Text();
    @Override
    def map(key: LongWritable, value: Text, context: Context){
        val splits = value.toString().split("\\s+");
        for (split <- splits ) {
            word.set( split );
            context.write(word, one);
        }
    }
}


Here is a link to the full page of code in Scala. 


I get the following error. FYI, I get this error when i run it locally, not even on a cluster.

############
java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:871)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:574)
at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210)

###########


I am using hadoop 0.20.2 specifically cdh3u4


thanks

Arun


Arun Ramakrishnan

unread,
May 23, 2012, 3:02:49 AM5/23/12
to scala...@googlegroups.com

Edmondo Porcu

unread,
May 23, 2012, 4:49:44 AM5/23/12
to Arun Ramakrishnan, scala...@googlegroups.com
Dear Arun,
it looks to me like the problem might be somewhere else, but I would
need more detail to be sure:

- Both version receive a different type of Context. How is the Context
created by Hadoop to be passed to the Mapper?
- Is there some kind of reflection or annotation mechanism?

Best
Edmondo

2012/5/23 Arun Ramakrishnan <sinchron...@gmail.com>:

Takuya UESHIN

unread,
May 23, 2012, 5:37:48 AM5/23/12
to Arun Ramakrishnan, scala...@googlegroups.com
Hi, Arun.

Your map function in Scala version didn't override map function of the
Mapper class.
So the default map function ( do nothing! ) was called.

You have to specify 'override' prefix to your map function instead of
@Override annotation.
And the Context class is not right in this case.

Your code should be like as follows:


class TokenCounterMapper extends Mapper[LongWritable, Text, Text,
LongWritable] {
val one = new LongWritable(1);
val word = new Text();

override def map(key: LongWritable, value: Text, context:
Mapper[LongWritable, Text, Text, LongWritable]#Context){
val splits = value.toString().split("\\s+");
for (split <- splits ) {
word.set( split );
context.write(word, one);
}
}
}


"Mapper[LongWritable, Text, Text, LongWritable]#Context" is messy but
I don't know the better way.
Could someone please let me know...

Thanks.


2012/5/23 Arun Ramakrishnan <sinchron...@gmail.com>:
--
Takuya UESHIN
Tokyo, Japan

http://twitter.com/ueshin

Edmondo Porcu

unread,
May 23, 2012, 5:58:47 AM5/23/12
to Takuya UESHIN, Arun Ramakrishnan, scala...@googlegroups.com
UHM !!!!

Good suggestion for developing in scala, forget the @Override annotation :)

you can define a type alias like that

class TokenCounterMapper extends Mapper[LongWritable, Text, Text,
LongWritable] {
   val one = new LongWritable(1);
   val word = new Text();
type TokenCounterContext = Mapper[LongWritable, Text, Text,
LongWritable]#Context

   override def map(key: LongWritable, value: Text,
context:TokenCounterContext){
       val splits = value.toString().split("\\s+");
       for (split <- splits ) {
           word.set( split );
           context.write(word, one);
       }
   }
}

Edmondo
2012/5/23 Takuya UESHIN <ues...@happy-camper.st>:

Takuya UESHIN

unread,
May 23, 2012, 6:03:31 AM5/23/12
to Edmondo Porcu, Arun Ramakrishnan, scala...@googlegroups.com
Oh, type alias.
That's a very good idea!

Thanks!


2012/5/23 Edmondo Porcu <edmond...@gmail.com>:

Arun Ramakrishnan

unread,
May 23, 2012, 2:08:12 PM5/23/12
to Takuya UESHIN, Edmondo Porcu, scala...@googlegroups.com
Thanks guys, that worked. But, not sure whats going on here. Should read somthing on java/scala interop. Especially not even sure what the "#" syntax is.

 But, I am surprised by the @Override behavior. Shouldn't the scala compiler complain about the use of @Override ? I have a feeling more often than not people are likely to make this mistake esp if they keep switching between @Override and override. Is this a work in progress in the compiler or some other reason why it just conveniently ignores annotations without validating it ?

thanks
Arun

Arun Ramakrishnan

unread,
May 23, 2012, 5:47:12 PM5/23/12
to Takuya UESHIN, Edmondo Porcu, scala...@googlegroups.com
Should I try to open an issue for this lack of @Override compiler warning/error problem ?

Luke Vilnis

unread,
May 23, 2012, 6:19:06 PM5/23/12
to Arun Ramakrishnan, Takuya UESHIN, Edmondo Porcu, scala...@googlegroups.com
+1

Lanny Ripple

unread,
May 25, 2012, 4:57:51 PM5/25/12
to scala-user
Been doing Hadoop for about two years now with Scala. Our usual
boilerplate is

object MyMapperHelper with
CassFedMapReduce[LongWritable,Text,Text,LongWritable] {
val MapperClass = classOf[MyMapper]
val ReducerClass = classOf[MyReducer]
}
import MyMapperHelper.{OurMapperType, OurReducerType} // type aliases
defined in HadoopMapReduce

class MyMapper extends OurMapperType {

private val outkey = new Text();
private val one = new LongWritable(1)

def writeContext(k: String)(implicit context: OurMapperType#Context)
{
outkey.set(k)
// maybe update a counter or other `context` specific item here
context.write(outkey, one)
}

override
def map(key: ByteBuffer, columns: java.util.SortedMap[ByteBuffer,
IColumn], context: OurMapperType#Context) {
implicit val mContext = context
...
splits.foreach{contextWrite _}
}
}

CassFedMap[K,V] and CassFedMapReduce[K,V,RK,RV] extends
CassFedMap[K,V] are classes that do a lot of the setup for the job
when they are constructed given the mapper or mapper and reducer
classes. (We're throwing Cassandra in the mix so always know the K,V
for maps will be [ByteBuffer, java.util.SortedMap[ByteBuffer,IColumn]]
but you could take the idea and play with it for other map inputs.)

Not sure why but the Hadoop guys didn't derive Mapper and Reducer from
some common ancestor so it's tough to write code that works against a
generic context. You can often get some aspect of the context
though. We type contexts for grabbing and updating counters with

type CounterContext =
org.apache.hadoop.mapreduce.TaskInputOutputContext[_,_,_,_]

and then with some pimping can say

context.counter(IMPORTANT_INFO) += 3

Anyway Scala's great for working with Hadoop and just a tiny bit of
the sugar Scala is so good at will make it (almost) enjoyable.

Best of luck,
-ljr


On May 23, 5:03 am, Takuya UESHIN <ues...@happy-camper.st> wrote:
> Oh, type alias.
> That's a very good idea!
>
> Thanks!
>
> 2012/5/23 Edmondo Porcu <edmondo.po...@gmail.com>:
> >> 2012/5/23 Arun Ramakrishnan <sinchronized.a...@gmail.com>:
> >>> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputCon text.java:80)
Reply all
Reply to author
Forward
0 new messages