I'm running into a very weird error while using algebird and spark.
//
// Step 0 - From JSON to Domain classes
//
val events: DStream[ProcessedEvent] =
stream.flatMap(
Parse.decodeOption[ProcessedEvent](_)
)
//
// Step 1 - Map events
//
val mappedToTuple: DStream[((String, Long), HLL)] = events.flatMap {
case ProcessedEvent(game, _, isDebug, time, uid, iid) if isDebug == params.isDebug =>
for {
_ <- games.get(game)
u <- iid.map(_.repr).orElse(uid.map(_.repr))
} yield ((game, (time %% params.bucket).getMillis), u)
case _ =>
none[((String, Long), String)]
}.groupByKey().map {
case (k, v) => (k, v.map(str => monoid.create(str.getBytes)).reduce(monoid.plus))
}
works fine... but If I try to using the batchCreate method like this:
//
// Step 0 - From JSON to Domain classes
//
val events: DStream[ProcessedEvent] =
stream.flatMap(
Parse.decodeOption[ProcessedEvent](_)
)
//
// Step 1 - Map events
//
val mappedToTuple: DStream[((String, Long), HLL)] = events.flatMap {
case ProcessedEvent(game, _, isDebug, time, uid, iid) if isDebug == params.isDebug =>
for {
_ <- games.get(game)
u <- iid.map(_.repr).orElse(uid.map(_.repr))
} yield ((game, (time %% params.bucket).getMillis), u)
case _ =>
none[((String, Long), String)]
}.groupByKey().map {
case (k, v) => (k, monoid.batchCreate(v)(_.getBytes))
}
I get tons of:
ERROR 11:49:44,393 Task 238.0:0 had a not serializable result: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2; not retrying
ERROR 11:49:44,393 Error running job streaming job 80000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Task 238.0:0 had a not serializable result: java.io.NotSerializableException: scala.collection.immutable.MapLike$$anon$2
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
My educated guess is that batchCreate is creating a DenseHLL because the number of events on each batch is high and that, somehow, is causing a serialization problem.
I guess the main problem is how spark serializes scala closures and the result of mapValues...
def batchCreate[T <% Array[Byte]](instances: Iterable[T]): HLL = {
val allMaxRhow = instances
.map { x => jRhoW(hash(x), bits) }
.groupBy { case (j, rhow) => j }
.mapValues { _.maxBy { case (j, rhow) => rhow } }
.mapValues { case (j, rhow) => Max(rhow) }
if (allMaxRhow.size * 16 <= size) {
SparseHLL(bits, allMaxRhow)
} else {
SparseHLL(bits, allMaxRhow).toDenseHLL
}
}
mapValues produces an object that is not serializable; I have fixed the problem by replacing mapValues with a map:
def batchCreate[T <% Array[Byte]](instances: Iterable[T]): HLL = {
val allMaxRhow = instances
.map { x => jRhoW(hash(x), bits) }
.groupBy { case (j, rhow) => j }
.map { case (j, iter) => (j, iter.maxBy { case (_, rhow) => rhow }) }
.map { case (j, (_, rhow)) => (j, Max(rhow)) }
if (allMaxRhow.size * 16 <= size) {
SparseHLL(bits, allMaxRhow)
} else {
SparseHLL(bits, allMaxRhow).toDenseHLL
}
}