Scalding classpath issues with Kryo serialization of functions between 0.8.11 and 0.9.0rc4

582 views
Skip to first unread message

Ian Hummel

unread,
Jan 22, 2014, 4:21:29 PM1/22/14
to cascadi...@googlegroups.com
Hey everyone,

I've been struggling with Scalding and some classpath issues that seem related to Kryo.  I've found a bunch of threads scattered here and there, but nothing seems conclusive but I have some test cases in GitHub to help reproduce, so maybe we can get to the bottom of this...

For some background, we don't build fat jars for our specific jobs, instead we have Scalding and all its dependencies in a fat jar deployed to EMR clusters via bootstrap actions, so job developers can just "sbt package" and submit their jars using the elastic-mapreduce command line tool.

Recently I've been porting some jobs to 0.9.0rc4 but I keep getting exceptions (full stack trace at bottom).  After some head scratching, it occurred to me that I only get exceptions when my job has anonymous functions.  Very strange!  For example, this will work fine:

class CopyJob(args : Args) extends Job(args) {
  TextLine(args("input"))
    .write(Tsv(args("output")))
}

But this fails

class WordCountJob(args : Args) extends Job(args) {
  TextLine(args("input"))
    .flatMap('line -> 'word) { line : String ⇒ tokenize(line) }
    .groupBy('word) { _.size }
    .groupAll { _.sortBy('size).reverse }
    .write(Tsv(args("output")))

  // Split a piece of text into individual words.
  def tokenize(text : String) : Array[String] = {
    // Lowercase each word and remove punctuation.
    text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+").filter(_ != "")
  }
}

I put an example project up on github @ https://github.com/themodernlife/scalding-kryo-bug.  I am using hadoop 1.2.1 installed via Homebrew to run my jars, setting HADOOP_CLASSPATH to include the jar of scalding and all its dependencies (but not my job classes).  

The repo has a branch scalding-0.8.11 which allows you to run the exact same jobs using scalding 0.8.11.  If you use that version, everything works ok, so I'm assuming this is related to Kryo changes to how FunctionX are serialized?

Any help/guidances would be much appreciated!


Here's the full stack trace:

java.lang.Exception: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:426)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:88)
    ... 10 more
Caused by: cascading.flow.FlowException: internal error during mapper configuration
    at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:99)
    ... 15 more
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: example.WordCountJob$$anonfun$3
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:721)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:25)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:19)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61)
    at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94)
    at com.twitter.chill.Externalizer.fromBytes(Externalizer.scala:149)
    at com.twitter.chill.Externalizer.maybeReadJavaKryo(Externalizer.scala:162)
    at com.twitter.chill.Externalizer.readExternal(Externalizer.scala:152)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at java.util.HashMap.readObject(HashMap.java:1184)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at cascading.flow.hadoop.util.JavaObjectSerializer.deserialize(JavaObjectSerializer.java:101)
    at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:295)
    at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:276)
    at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:80)
    ... 15 more
Caused by: java.lang.ClassNotFoundException: example.WordCountJob$$anonfun$3
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:270)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
    ... 64 more

Ken Krugler

unread,
Jan 22, 2014, 4:41:05 PM1/22/14
to cascadi...@googlegroups.com
I've got no experience with sbt, but it sure looks like Kryo is trying to serialize your flow.

Which shouldn't be happening - the flow itself (the graph of objects & any non-transient members) should be getting serialized via regular Java serialization.

-- Ken


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/ae9aa7b8-f80c-48d6-86b4-3b28fdbdf706%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.

--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr







--------------------------
Ken Krugler
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Oscar Boykin

unread,
Jan 22, 2014, 5:14:38 PM1/22/14
to cascadi...@googlegroups.com
What is happening here is that Kryo was used to serialize the closure (is now used as a fallback when Java serialization fails).

The class for the anonymous function (  { line : String ⇒ tokenize(line) } )is not being found.

In the past, I've seen people have issues with having to declare the appjar: 

For some reason, we never need that at Twitter, but others do seem to need it (sorry, I have never bothered to understand it because we never got a pull request on it, and it never caused us any problem).


--
You received this message because you are subscribed to the Google Groups "cascading-user" group.
To unsubscribe from this group and stop receiving emails from it, send an email to cascading-use...@googlegroups.com.
To post to this group, send email to cascadi...@googlegroups.com.
Visit this group at http://groups.google.com/group/cascading-user.
To view this discussion on the web visit https://groups.google.com/d/msgid/cascading-user/ae9aa7b8-f80c-48d6-86b4-3b28fdbdf706%40googlegroups.com.
For more options, visit https://groups.google.com/groups/opt_out.



--
Oscar Boykin :: @posco :: http://twitter.com/posco

Ian Hummel

unread,
Jan 22, 2014, 5:39:16 PM1/22/14
to cascadi...@googlegroups.com
Hey gang,

Thanks for the replies.  Oscar, I did stumble on some threads that mentioned appjar but the below is still failing with the same exception.  FYI I noticed the signature of config changed in 0.9.x (no longer takes an implicit Mode)... don't suppose that has anything to do with it though?

Could you clarify where would Kryo even be involved with the code below?  I guess I am missing what happens after I call ToolRunner.run with my Job.  Does the whole thing get serialized?  To where, and how does cascading know how where to load it from?

Let me know if I can provide any more info or try anything else out.  Thanks!!

package example

import com.twitter.scalding._
import org.apache.hadoop.util.ToolRunner
import org.apache.hadoop.conf.Configuration

class WordCountJob(args : Args) extends Job(args) {
  TextLine(args("input"))
    .flatMap('line -> 'word) { line : String ⇒ tokenize(line) }
    .groupBy('word) { _.size }
    .groupAll { _.sortBy('size).reverse }
    .write(Tsv(args("output")))

  // Split a piece of text into individual words.
  def tokenize(text : String) : Array[String] = {
    // Lowercase each word and remove punctuation.
    text.toLowerCase.replaceAll("[^a-zA-Z0-9\\s]", "").split("\\s+").filter(_ != "")
  }

  override def config: Map[AnyRef, AnyRef] = {
    super.config ++ Map("cascading.app.appjar.class" -> classOf[WordCountJob])
  }
}

object WordCountJobRunner {
  def main(args : Array[String]) {
    val tool = new Tool {
      override def getJob(args: Args) = new WordCountJob(args)
    }
    ToolRunner.run(new Configuration, tool, args)

Ian Hummel

unread,
Jan 22, 2014, 6:14:50 PM1/22/14
to cascadi...@googlegroups.com
Ok another piece to the puzzle.

Reminder, the Job is one jar and scalding + all dependencies are in another.  If I add BOTH jars to HADOOP_CLASSPATH it runs ok.

HADOOP_CLASSPATH=$PWD/deps/scalding-and-deps.jar:target/scala-2.10/scalding-example_2.10-0.1-SNAPSHOT.jar hadoop jar target/scala-2.10/scalding-example_2.10-0.1-SNAPSHOT.jar example.WordCountJobRunner --hdfs --input src/test/resources/alice.txt --output target/word-count

Huh?  Why wouldn't classes from scalding-example_2.10-0.1-SNAPSHOT.jar (where my Job is defined - and which is the jar file passed to "hadoop jar ...") be found unless they are on HADOOP_CLASSPATH?

I also tried using -libjars, but that didn't work either....

HADOOP_CLASSPATH=$PWD/deps/scalding-and-deps.jar hadoop jar target/scala-2.10/scalding-example_2.10-0.1-SNAPSHOT.jar example.WordCountJobRunner -libjars target/scala-2.10/scalding-example_2.10-0.1-SNAPSHOT.jar --hdfs --input src/test/resources/alice.txt --output target/word-count

The above gives the same exception (java.lang.ClassNotFoundException: example.WordCountJob$$anonfun$3)

Out of pure speculation, does hadoop local only spawn 1 JVM for both job submission and mapper?  Maybe Kryo is somehow initialized before hadoop adds the jar argument to the classpath?


Cheers,

Ian Hummel

unread,
Jan 23, 2014, 5:53:41 PM1/23/14
to cascadi...@googlegroups.com
So I think I have a handle on what's going on.  

I don't really understand the specifics of how the "default" java classloader interacts with a thread's "context" classloader (Thread.currentThread().getContextClassLoader) but it all starts in hadoop's RunJar class...

Basically when you run hadoop jar JAR you are running that class' main method, which explodes your jar into a temp directory and updates the CONTEXT classloader.

org.apache.hadoop.util.RunJar#main

unJar(file, workDir);
    
    ArrayList<URL> classPath = new ArrayList<URL>();
    classPath.add(new File(workDir+"/").toURL());
    classPath.add(file.toURL());
    classPath.add(new File(workDir, "classes/").toURL());
    File[] libs = new File(workDir, "lib").listFiles();
    if (libs != null) {
      for (int i = 0; i < libs.length; i++) {
        classPath.add(libs[i].toURL());
      }
    }
    
    ClassLoader loader =
      new URLClassLoader(classPath.toArray(new URL[0]));

    Thread.currentThread().setContextClassLoader(loader);
    Class<?> mainClass = Class.forName(mainClassName, true, loader);


All the scalding classes live in the default classloader but all the Job’s classes live in this thread’s new “context” class loader.  This means that if you try to use com.twitter.scalding.Tool to instantiate your job via reflection you will get ClassNotFoundExceptions (more on this later).  However, if you create your own Tool, your code will happily initialize Scalding and Cascading.  I think this is why the snowplow analytics guy's example project (https://github.com/snowplow/scalding-example-projectbasically copy and pastes com.twitter.scalding.Tool to get everything running on EMR.

The next thing that happens though, is that Cascading serializes the graph of steps into a JobConf.  The place that kicks off the exceptions occurs within cascading in cascading.flow.hadoop.FlowMapper#configure.  It tries to deserialize the HadoopFlowStep, but to do this, it needs to instantiate a serializer which it finds in cascading.flow.hadoop.util.HadoopUtil#instantiateSerializer.  Relevant code is below


cascading.flow.hadoop.FlowMapper#configure

@Override
  public void configure( JobConf jobConf )
    {
    try
      {
      HadoopUtil.initLog4j( jobConf );

      LOG.info( "cascading version: {}", jobConf.get( "cascading.version", "" ) );
      LOG.info( "child jvm opts: {}", jobConf.get( "mapred.child.java.opts", "" ) );

      currentProcess = new HadoopFlowProcess( new FlowSession(), jobConf, true );

      String stepState = jobConf.getRaw( "cascading.flow.step" );

      if( stepState == null )
        stepState = readStateFromDistCache( jobConf, jobConf.get( FlowStep.CASCADING_FLOW_STEP_ID ) );

      HadoopFlowStep step = deserializeBase64( stepState, jobConf, HadoopFlowStep.class );
      Tap source = step.getTapForID( step.getSources(), jobConf.get( "cascading.step.source" ) );

      streamGraph = new HadoopMapStreamGraph( currentProcess, step, source );

      for( Duct head : streamGraph.getHeads() )
        LOG.info( "sourcing from: " + ( (ElementDuct) head ).getFlowElement() );

      for( Duct tail : streamGraph.getTails() )
        LOG.info( "sinking to: " + ( (ElementDuct) tail ).getFlowElement() );

      for( Tap trap : step.getMapperTraps().values() )
        LOG.info( "trapping to: " + trap );
      }
    catch( Throwable throwable )
      {
      if( throwable instanceof CascadingException )
        throw (CascadingException) throwable;

      throw new FlowException( "internal error during mapper configuration", throwable );
      }
    }

Eventually we end up in cascading.flow.hadoop.util.JavaObjectSerializer#deserialize where we create an object stream and attempt to read in the object graph.  This part I am really confused by because I can’t yet see the connection to Kryo, but eventually the stream throws an exception trying to deserialize due to a ClassNotFoundException.

The last few classes it reads are com.twitter.scalding.serialization.Externalizer and com.twitter.chill.Externalizer.

Digging into the Chill code, you can see that Kryo is instantiated like this:

com.twitter.chil.KryoInstantiator

public class KryoInstantiator implements Serializable {
  public Kryo newKryo() { return new Kryo(); }
... 

And if you dig into the Kryo code you can see that the default Kryo constructor will use the default classloader (which doesn’t contain the Job’s classes).  There is an option to pass it a classloader, so maybe the solution is to pass the Kryo constructor the Thread.currentThread().getContextClassLoader?  

Cascading seems to use Thread.currentThread().getContextClassLoader and I guess that’s the “right way to do it” although I can’t quite explain why yet...

BTW I think fixing this + a small tweak to com.twitter.scalding.Job to will also fix the "-libjars" issues that were mentioned recently on the list.

Any pointers/feedback on the above?

Thanks!

Oscar Boykin

unread,
Jan 23, 2014, 6:07:53 PM1/23/14
to cascadi...@googlegroups.com
Super awesome digging on this. Can you open an issue with both scalding and chill for the relevant issues?

I am a bit ignorant of hadoop's class loading, and may be culpable for many of the mistakes.

It would be fantastic to fix these issues.



For more options, visit https://groups.google.com/groups/opt_out.

Ian Hummel

unread,
Jan 24, 2014, 2:43:35 PM1/24/14
to cascadi...@googlegroups.com
Pull requests sent.  Cheers!

Oscar Boykin

unread,
Jan 24, 2014, 4:07:09 PM1/24/14
to cascadi...@googlegroups.com
Reply all
Reply to author
Forward
0 new messages