Akka persistence with Cassandra Journal throwing "Batch too large" exception

363 views
Skip to first unread message

Richard Ney

unread,
Oct 27, 2016, 1:21:40 PM10/27/16
to Akka User List
I am using Akka persistence to remember a sequence number from an event stream. The code in my receiveCommand that writes to the Cassandra journal is below:

persistAsync(MessageProcessed(message.eventSequenceNumber, mdMessage.trackingId, message.eventTimeStamp)) {
mesgProcessed =>
{
log.debug(s"Processing message for $organization, last message seqNum: ${state.messageSequence} as ${state.time}")
if ((mesgProcessed.messageSequence > state.messageSequence) && !config.getBoolean(ComputeConfigKeys.devMode)) {
if (mesgProcessed.messageSequence != state.messageSequence + 1) {
log.warning("Event Sequence is not in order, last event id ='" + state.messageSequence + "', this events sequence id = '" + mesgProcessed.messageSequence + "'")
context.parent ! OrganizationReset(organization)
}
context.parent ! MDMessage(mdContext, mdMessage)
state = mesgProcessed
} else {
context.parent ! MDMessage(mdContext, mdMessage)
state = mesgProcessed
}
}
}

In low message volume this code works exactly as expect. This week we started ramping up the load and eventually the system failed throwing this exception:

[report.compute.md.CtipsMessageDistributorClient$MessageProcessed] with sequence number [2767] for persistenceId [/messaging/hawks/cti].
java.util.concurrent.ExecutionException: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
    at com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at akka.persistence.cassandra.package$$anon$1$$anonfun$run$1.apply(package.scala:17)
    at scala.util.Try$.apply(Try.scala:192)
    at akka.persistence.cassandra.package$$anon$1.run(package.scala:17)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:409)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:136)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:179)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:184)
    at com.datastax.driver.core.RequestHandler.access$2500(RequestHandler.java:43)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.setFinalResult(RequestHandler.java:798)
    at com.datastax.driver.core.RequestHandler$SpeculativeExecution.onSet(RequestHandler.java:617)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)
    at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
    at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:277)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:264)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
    at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:879)
    at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:360)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:276)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
    at java.lang.Thread.run(Thread.java:745)

Has anyone experience this? Is this a Cassandra tuning issue our should I drop the 'persistAsync' in favor of the more traditional persist call?

Patrik Nordwall

unread,
Oct 29, 2016, 6:34:36 AM10/29/16
to akka...@googlegroups.com

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscribe@googlegroups.com.
To post to this group, send email to akka...@googlegroups.com.
Visit this group at https://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.



--

Patrik Nordwall
Akka Tech Lead
Lightbend -  Reactive apps on the JVM
Twitter: @patriknw

Reply all
Reply to author
Forward
0 new messages