Algebird + Spark serialization issues

352 views
Skip to first unread message

Luis Ángel Vicente Sánchez

unread,
Sep 11, 2014, 9:16:02 AM9/11/14
to alge...@googlegroups.com
I'm running into a very weird error while using algebird and spark.


This piece of code,

    //
    // Step 0 - From JSON to Domain classes
    //
    val events: DStream[ProcessedEvent] =
      stream.flatMap(
        Parse.decodeOption[ProcessedEvent](_)
      )

    //
    // Step 1 - Map events
    //
    val mappedToTuple: DStream[((String, Long), HLL)] = events.flatMap {
      case ProcessedEvent(game, _, isDebug, time, uid, iid) if isDebug == params.isDebug =>
        for {
          _ <- games.get(game)
          u <- iid.map(_.repr).orElse(uid.map(_.repr))
        } yield ((game, (time %% params.bucket).getMillis), u)

      case _ =>
        none[((String, Long), String)]
    }.groupByKey().map {
      case (k, v) => (k, v.map(str => monoid.create(str.getBytes)).reduce(monoid.plus))
    }


works fine... but If I try to using the batchCreate method like this:

    //
    // Step 0 - From JSON to Domain classes
    //
    val events: DStream[ProcessedEvent] =
      stream.flatMap(
        Parse.decodeOption[ProcessedEvent](_)
      )

    //
    // Step 1 - Map events
    //
    val mappedToTuple: DStream[((String, Long), HLL)] = events.flatMap {
      case ProcessedEvent(game, _, isDebug, time, uid, iid) if isDebug == params.isDebug =>
        for {
          _ <- games.get(game)
          u <- iid.map(_.repr).orElse(uid.map(_.repr))
        } yield ((game, (time %% params.bucket).getMillis), u)

      case _ =>
        none[((String, Long), String)]
    }.groupByKey().map {
      case (k, v) => (k, monoid.batchCreate(v)(_.getBytes))
    }


I get tons of:

ERROR 11:49:44,393 Task 238.0:0 had a not serializable result: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2; not retrying
ERROR 11:49:44,393 Error running job streaming job 80000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 238.0:0 had a not serializable result: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

My educated guess is that batchCreate is creating a DenseHLL because the number of events on each batch is high and that, somehow, is causing a serialization problem.

I guess the main problem is how spark serializes scala closures and the result of mapValues...

  def batchCreate[T <% Array[Byte]](instances: Iterable[T]): HLL = {
    val allMaxRhow = instances
      .map { x => jRhoW(hash(x), bits) }
      .groupBy { case (j, rhow) => j }
      .mapValues { _.maxBy { case (j, rhow) => rhow } }
      .mapValues { case (j, rhow) => Max(rhow) }
    if (allMaxRhow.size * 16 <= size) {
      SparseHLL(bits, allMaxRhow)
    } else {
      SparseHLL(bits, allMaxRhow).toDenseHLL
    }
  }

mapValues produces an object that is not serializable; I have fixed the problem by replacing mapValues with a map:

  def batchCreate[T <% Array[Byte]](instances: Iterable[T]): HLL = {
    val allMaxRhow = instances
      .map { x => jRhoW(hash(x), bits) }
      .groupBy { case (j, rhow) => j }
      .map { case (j, iter) => (j, iter.maxBy { case (_, rhow) => rhow }) }
      .map { case (j, (_, rhow)) => (j, Max(rhow)) }
    if (allMaxRhow.size * 16 <= size) {
      SparseHLL(bits, allMaxRhow)
    } else {
      SparseHLL(bits, allMaxRhow).toDenseHLL
    }
  }

Would that be a PR you would accept?

Oscar Boykin

unread,
Sep 11, 2014, 12:40:09 PM9/11/14
to Luis Ángel Vicente Sánchez, alge...@googlegroups.com
Sure that change looks fine.

Could you replace it with only one .map after the groupBy?

It would be better not to create the Map twice.

mapValues is lazy and often causes gotchas like this.
--
You received this message because you are subscribed to the Google Groups "algebird" group.
To unsubscribe from this group and stop receiving emails from it, send an email to algebird+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.


--
Oscar Boykin :: @posco :: http://twitter.com/posco

Luis Ángel Vicente Sánchez

unread,
Sep 11, 2014, 12:59:36 PM9/11/14
to Oscar Boykin, alge...@googlegroups.com
Sure! I will make the PR when I get home.
Reply all
Reply to author
Forward
0 new messages