kryo {
type = "graph"
idstrategy = "default"
buffer-size = 4096
max-buffer-size = -1
use-manifests = false
post-serialization-transformations = "lz4,aes"
implicit-registration-logging = false
kryo-trace = false
kryo-custom-serializer-init = "CustomKryoSerializerInitFQCN"
resolve-subclasses = false
mappings {
"com.myexperiments.akkaexps.persistence.events.Evt" = 20
}
classes = [
"com.myexperiments.akkaexps.persistence.events.Evt"
]
}
Observations :
==============
1. I am able to see that the events do get persisted without any issues - checked via cqlsh of Cassandra to verify the message count in akka.messages table
Performance observation : there is an abnormal reduction in persistence-rate of events
in fact,with Kryo serialization + persistAsync I got around ~580 events persisted/sec with Cassandra plugin when compared to plain java serialization which for same test run on same machine yielded upto 800 events/sec
which looks weird. Cassandra runs in local node - no clustering (trying to thrash-out all variances before I can go to cluster with larger configuration so that I can isolate issues)
2. During recovery phase, however, I got following exception and recovery failed. Also tried changing idstrategy from 'default' to 'incremental' and still facing the same Exception
[ERROR] [08/02/2016 10:46:16.119] [example-akka.actor.default-dispatcher-8] [akka://example/system/cassandra-journal/$b/flow-1-0-asyncReplayMessages] Encountered unregistered class ID: 1406735620 com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 1406735620 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at com.romix.akka.serialization.kryo.KryoBasedSerializer.fromBinary(KryoSerializer.scala:483) at com.romix.akka.serialization.kryo.KryoSerializer.fromBinary(KryoSerializer.scala:339) at akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:124) at scala.util.Try$.apply(Try.scala:192) at akka.serialization.Serialization.deserialize(Serialization.scala:114) at akka.persistence.serialization.MessageSerializer.akka$persistence$serialization$MessageSerializer$$payload(MessageSerializer.scala:216) at akka.persistence.serialization.MessageSerializer.akka$persistence$serialization$MessageSerializer$$persistent(MessageSerializer.scala:198) at akka.persistence.serialization.MessageSerializer.fromBinary(MessageSerializer.scala:69) at akka.persistence.serialization.MessageSerializer.fromBinary(MessageSerializer.scala:28) at akka.serialization.Serialization$$anonfun$deserialize$3.apply(Serialization.scala:142) at scala.util.Try$.apply(Try.scala:192) at akka.serialization.Serialization.deserialize(Serialization.scala:142) at akka.persistence.cassandra.query.EventsByPersistenceIdPublisher.persistentFromByteBuffer(EventsByPersistenceIdPublisher.scala:90) at akka.persistence.cassandra.query.EventsByPersistenceIdPublisher.extractEvent(EventsByPersistenceIdPublisher.scala:84) at akka.persistence.cassandra.query.EventsByPersistenceIdPublisher.updateState(EventsByPersistenceIdPublisher.scala:77) at akka.persistence.cassandra.query.EventsByPersistenceIdPublisher.updateState(EventsByPersistenceIdPublisher.scala:48) at akka.persistence.cassandra.query.QueryActorPublisher.exhaustFetch(QueryActorPublisher.scala:213) at akka.persistence.cassandra.query.QueryActorPublisher.akka$persistence$cassandra$query$QueryActorPublisher$$exhaustFetchAndBecome(QueryActorPublisher.scala:117) at akka.persistence.cassandra.query.QueryActorPublisher$$anonfun$idle$1.applyOrElse(QueryActorPublisher.scala:101) at akka.actor.Actor$class.aroundReceive(Actor.scala:484) at akka.persistence.cassandra.query.QueryActorPublisher.akka$stream$actor$ActorPublisher$$super$aroundReceive(QueryActorPublisher.scala:45) at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:270) at akka.persistence.cassandra.query.QueryActorPublisher.aroundReceive(QueryActorPublisher.scala:45) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Am I missing something grossly ?
Regards
Muthu
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.