I am using Akka persistence to remember a sequence number from an event stream. The code in my receiveCommand that writes to the Cassandra journal is below:
persistAsync(MessageProcessed(message.eventSequenceNumber, mdMessage.trackingId, message.eventTimeStamp)) {
mesgProcessed =>
{
log.debug(s"Processing message for $organization, last message seqNum: ${state.messageSequence} as ${state.time}")
if ((mesgProcessed.messageSequence > state.messageSequence) && !config.getBoolean(ComputeConfigKeys.devMode)) {
if (mesgProcessed.messageSequence != state.messageSequence + 1) {
log.warning("Event Sequence is not in order, last event id ='" + state.messageSequence + "', this events sequence id = '" + mesgProcessed.messageSequence + "'")
context.parent ! OrganizationReset(organization)
}
context.parent ! MDMessage(mdContext, mdMessage)
state = mesgProcessed
} else {
context.parent ! MDMessage(mdContext, mdMessage)
state = mesgProcessed
}
}
}
In low message volume this code works exactly as expect. This week we started ramping up the load and eventually the system failed throwing this exception:
[report.compute.md.CtipsMessageDistributorClient$MessageProcessed] with sequence number [2767] for persistenceId [/messaging/hawks/cti].
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at akka.persistence.cassandra.package$$anon$1$$anonfun$run$1.apply(package.scala:17)
at scala.util.Try$.apply(Try.scala:192)
at akka.persistence.cassandra.package$$anon$1.run(package.scala:17)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:409)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
at com.datastax.driver.core.Responses$Error.asException(Responses.java:136)
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184)
at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:277)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:264)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:879)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:360)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:276)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:745)
Has anyone experience this? Is this a Cassandra tuning issue our should I drop the 'persistAsync' in favor of the more traditional persist call?