Cassandra persistence with Kryo-serialization - recovery and persistence performance issue

209 views
Skip to first unread message

Muthukumaran Kothandaraman

unread,
Aug 4, 2016, 1:25:01 PM8/4/16
to Akka User List
Hi, 

I am using following combination for a basic persistence actor

Version combination : Cassandra 3.7 + akka-persistence-cassandra-0.7 + akka 2.4.8 + akka-kryo-serialization_2.11 version-0.4.1 

I am using following conf to serialize using kryo-serialization

kryo  {

type = "graph"
idstrategy = "default"
buffer-size = 4096
max-buffer-size = -1
use-manifests = false
post-serialization-transformations = "lz4,aes"
implicit-registration-logging = false
kryo-trace = false
kryo-custom-serializer-init = "CustomKryoSerializerInitFQCN"
resolve-subclasses = false
mappings {
"com.myexperiments.akkaexps.persistence.events.Evt" = 20
}
classes = [
"com.myexperiments.akkaexps.persistence.events.Evt"
]
}


Observations : 
==============

1. I am able to see that the events do get persisted without any issues - checked via cqlsh of Cassandra to verify the message count in akka.messages table 
   Performance observation : there is an abnormal reduction in persistence-rate of events
           in fact,with Kryo serialization + persistAsync I got around ~580 events persisted/sec with Cassandra plugin when compared to plain java serialization which for same test run on same machine yielded upto 800 events/sec
           which looks weird. Cassandra runs in local node - no clustering (trying to thrash-out all variances before I can go to cluster with larger configuration so that I can isolate issues)


2. During recovery phase, however,  I got following exception and recovery failed. Also tried changing idstrategy from 'default' to 'incremental'  and still facing the same Exception

[ERROR] [08/02/2016 10:46:16.119] [example-akka.actor.default-dispatcher-8] [akka://example/system/cassandra-journal/$b/flow-1-0-asyncReplayMessages] Encountered unregistered class ID: 1406735620
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 1406735620
	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
	at com.romix.akka.serialization.kryo.KryoBasedSerializer.fromBinary(KryoSerializer.scala:483)
	at com.romix.akka.serialization.kryo.KryoSerializer.fromBinary(KryoSerializer.scala:339)
	at akka.serialization.Serialization$$anonfun$deserialize$2.apply(Serialization.scala:124)
	at scala.util.Try$.apply(Try.scala:192)
	at akka.serialization.Serialization.deserialize(Serialization.scala:114)
	at akka.persistence.serialization.MessageSerializer.akka$persistence$serialization$MessageSerializer$$payload(MessageSerializer.scala:216)
	at akka.persistence.serialization.MessageSerializer.akka$persistence$serialization$MessageSerializer$$persistent(MessageSerializer.scala:198)
	at akka.persistence.serialization.MessageSerializer.fromBinary(MessageSerializer.scala:69)
	at akka.persistence.serialization.MessageSerializer.fromBinary(MessageSerializer.scala:28)
	at akka.serialization.Serialization$$anonfun$deserialize$3.apply(Serialization.scala:142)
	at scala.util.Try$.apply(Try.scala:192)
	at akka.serialization.Serialization.deserialize(Serialization.scala:142)
	at akka.persistence.cassandra.query.EventsByPersistenceIdPublisher.persistentFromByteBuffer(EventsByPersistenceIdPublisher.scala:90)
	at akka.persistence.cassandra.query.EventsByPersistenceIdPublisher.extractEvent(EventsByPersistenceIdPublisher.scala:84)
	at akka.persistence.cassandra.query.EventsByPersistenceIdPublisher.updateState(EventsByPersistenceIdPublisher.scala:77)
	at akka.persistence.cassandra.query.EventsByPersistenceIdPublisher.updateState(EventsByPersistenceIdPublisher.scala:48)
	at akka.persistence.cassandra.query.QueryActorPublisher.exhaustFetch(QueryActorPublisher.scala:213)
	at akka.persistence.cassandra.query.QueryActorPublisher.akka$persistence$cassandra$query$QueryActorPublisher$$exhaustFetchAndBecome(QueryActorPublisher.scala:117)
	at akka.persistence.cassandra.query.QueryActorPublisher$$anonfun$idle$1.applyOrElse(QueryActorPublisher.scala:101)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
	at akka.persistence.cassandra.query.QueryActorPublisher.akka$stream$actor$ActorPublisher$$super$aroundReceive(QueryActorPublisher.scala:45)
	at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:270)
	at akka.persistence.cassandra.query.QueryActorPublisher.aroundReceive(QueryActorPublisher.scala:45)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Am I missing something grossly ? 


Regards
Muthu




Justin du coeur

unread,
Aug 4, 2016, 4:19:23 PM8/4/16
to akka...@googlegroups.com
Several thoughts:

First and most important -- as far as I can tell, there's little community experience using Kryo for Akka Persistence.  I'm currently building that out for my own project, and when I asked about it, nobody could name a project that was already doing it.  It's been a bit of an adventure getting it right -- indeed, I wound up contributing an enhancement to the romix library a couple of weeks ago, which will be in 0.4.2.  Most of the community appear to be using protobuf for persistence.  So keep in mind that this may be a bit bleeding-edge.  (Eventually, after my code gets to production, I'll probably do a long blog entry on it.  The relevant branch can be found here.)

As for your error, I'm a bit surprised you got that with "default", which isn't supposed to be using class IDs but should instead be using FQCNs across the board.  "default" *is* explicitly slow, which might account for some of the time issues (although I wouldn't expect it to be *that* slow), but the error you're showing is kinda weird.  The implications seems to be that, for some reason, it's synthesizing some class IDs even in "default" mode.

Mind, that error is *exactly* what I'd expect to see if you use "incremental".  Really, I don't think "incremental" is ever a great idea, but it is absolutely a terrible one for persistence.  It's a recipe for accidentally persisting random class IDs that can't later be deserialized.  "Automatic" mode should, in principle, work, but "incremental" is very likely to cause random accidental failures.

Personally, I think that "explicit" is the most sensible way to go.  It's a bit of a pain in the ass, and will give you errors if you fail to pre-register any classes (and you will discover that you need to register many, many standard-library classes to get it to work), but when you're using this in the context of persistence I think it's the sanest approach, ensuring that *all* of your persisted classes are using the more-efficient IDs and *all* of them are pre-registered...

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Muthukumaran Kothandaraman

unread,
Aug 6, 2016, 1:42:25 PM8/6/16
to Akka User List
Thanks for detailed explanation Justin. 

The main objective of my exercise is to bechmark performance of Kryo + Cassandra-persistence combination for my event objects without any complex inheritance structure. In my case, types of message types are pretty much limited - max - 5 types. 

Would it be advisable to write my own serializer using JSerializer and ExtendedActorSystem only for basic benchmarking or would it be much more tougher than using generic kryo-serialization framework ? 

Regards
Muthu







Justin du coeur

unread,
Aug 6, 2016, 3:56:41 PM8/6/16
to akka...@googlegroups.com
Honestly don't know -- I'm not a deep expert in this stuff.  (I've been using it heavily for the past month or so, but that's about it.)  I suspect the romix library is reasonably well-optimized on the Akka side.  If you are confident that you only have five types, then I would recommend trying it in "explicit" mode, which should be pretty much full-speed (much faster than "default"), and a reasonably good benchmark.

As usual for benchmarking, make sure that you do some warmup, and keep in mind that Kryo's optimizations are a bit nuanced.  For example, I noticed when digging around in the Kryo code that it is optimized for writing a bunch of the same type in a row (caching the most-recently-used type), presumably because it's very common to serialize sequences of things; I suspect it would be a bit slower if you were consistently changing types with each write.

Reply all
Reply to author
Forward
0 new messages