Batch too large error

2,774 views
Skip to first unread message

Edward Tseng

unread,
Feb 17, 2015, 7:55:49 PM2/17/15
to spark-conn...@lists.datastax.com
Hi,

I have been writing a small spark application where I am processing a batch of tweets stored on S3. When the spark application is done map reducing, it tries to save the result by invoking saveToCassandra(), however, I am getting an error (see below) from the console. All the data are saved into the same partition key, and I noticed if the column exceeds ~100, I will get the error below. Anything below 100, it will execute and save correctly into Cassandra.

My question is, I am potentially processing +100k lines and will have +100k columns per saveToCassandra() call. What is the best way to move data from Spark to Cassandra? Can I still use the Spark Connector for the job?

Regards,
Ed

15/02/18 00:37:26 ERROR QueryExecutor: Failed to execute: com.datastax.driver.core.BatchStatement@c5430c9
com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large
at com.datastax.driver.core.Responses$Error.asException(Responses.java:103)
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:110)
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:249)
at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:421)
at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:668)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/02/18 00:37:26 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 2)
java.io.IOException: Failed to write 1 statements to insight.conference_batch_details.

Russell Spitzer

unread,
Feb 17, 2015, 8:00:46 PM2/17/15
to spark-conn...@lists.datastax.com
What version of the connector are you using, and have you tried lowering the spark.cassandra.output.batch.size.rows
or
spark.cassandra.output.batch.size.bytes
?

Edward Tseng

unread,
Feb 17, 2015, 8:26:52 PM2/17/15
to spark-conn...@lists.datastax.com
Thank you for the response. This is my setup sbt:

name := "HourlyMR"

version := "1.0"

scalaVersion := "2.10.4"

val sparkVersion = "1.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.2.0-alpha1",
  "com.twitter" %% "util-logging" % "6.12.1"
)

By lowering, do you meaning raising spark.cassandra.output.batch.size.rows and spark.cassandra.output.batch.size.bytes?

I will try that, meanwhile, How big of a batch size can the connector support?


To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.

Edward Tseng

unread,
Feb 17, 2015, 8:30:54 PM2/17/15
to spark-conn...@lists.datastax.com
I have these settings in my scala file. As you can see, I was trying a lot of different things. Let me know if this is correct.

    val conf = new SparkConf(true).setAppName(AppName)
                                  .set("spark.cassandra.connection.host", "54.x.x.224")
                                  .set("spark.cassandra.connection.timeout_ms", "5000")
                                  .set("spark.cassandra.output.batch.size.rows", "100000")
                                  .set("spark.cassandra.output.batch.size.bytes", "1000000")
                                  .set("spark.cassandra.output.concurrent.writes", "10")

Russell Spitzer

unread,
Feb 18, 2015, 11:23:26 AM2/18/15
to spark-conn...@lists.datastax.com
You only want to set 1 of those parameters (rows/bytes) and I would suggest keeping it much smaller. Say 4096 bytes? or even limiting the batch size to 1 row.

Edward Tseng

unread,
Feb 18, 2015, 6:05:49 PM2/18/15
to spark-conn...@lists.datastax.com
Hi Russell,

Thank you for your help!

Yes, by setting batch size to 1, that solved the problem. Now Spark can upload to Cassandra without complaints of batch size too large. To help my understanding what is happening - I am guessing the "spark.cassandra.output.batch.size.rows" limits the number of rows that Spark is writing to Cassandra? Because Cassandra is a quorum, by setting a small batch size, limits the amount of load that Spark puts on a single node? Is that correct?

Regards,
Ed


Russell Spitzer

unread,
Feb 18, 2015, 6:10:48 PM2/18/15
to spark-conn...@lists.datastax.com
In the connector 1.2 it controls how many Cassandra rows in a Cassandra partition will be batched together before submitting a write request. Setting it to 1 basically means you won't do any batching at all. I would try scaling that number up from 1 as you will most likely see much greater performance when Cassandra can do more simultaneous operations on the same partition. The new batching system hasn't been fully exercised so we haven't really seen the limits yet or changed the defaults to match.

Hemalatha A

unread,
Sep 30, 2016, 5:24:42 AM9/30/16
to DataStax Spark Connector for Apache Cassandra
Hello Russell,

I am using spark-cassandra-connector - 1.6.0, and I am setting spark.cassandra.output.batch.size.bytes to 8KB, but still, somehow this limit is not being considered by Spark, and it still writes more data beyond 8KB. Once it reached maximum in cassandra.yaml file which is 50KB, SPark app crahes.

Any idea, why this limit is not being considered by Spark?

Thanks
Hema

Jacek Lewandowski

unread,
Sep 30, 2016, 7:04:23 AM9/30/16
to DataStax Spark Connector for Apache Cassandra
Hello Hema,

please check few things:
- do you have spark.cassandra.output.batch.size.rows also set in your configuration? can you check for that sc.getConf.get("spark.cassandra.output.batch.size.rows") ? - if so, remove this setting
- this problem may also occur if a single row is bigger than 8kb - what are you saving - is it possible that you exceeds 8kb in a single row?

Hemalatha A

unread,
Oct 5, 2016, 2:49:24 AM10/5/16
to DataStax Spark Connector for Apache Cassandra


Hi Jacek,

I am not setting spark.cassandra.output.batch.size.rows. I am only setting spark.cassandra.output.batch.size.bytes to 8KB. The property appears in environment tab in Spark UI correctly, but this limit is not being considered by Spark while limiting, which is why I am seeing Batch too Long error and my app ends.

Also, my single row is just in bytes 300-400 bytes. Not beyond that.

Thanks
Hema

Jacek Lewandowski

unread,
Oct 5, 2016, 10:19:38 AM10/5/16
to DataStax Spark Connector for Apache Cassandra
Could you provide a code to replicate this behaviour? We have integration tests to verify limiting batch size (https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/GroupingBatchBuilderSpec.scala) and they pass. Would be great to see when it fails and fix it.

Thanks

--
You received this message because you are subscribed to the Google Groups "DataStax Spark Connector for Apache Cassandra" group.

To unsubscribe from this group and stop receiving emails from it, send an email to spark-connector-...@lists.datastax.com.
--

JACEK LEWANDOWSKI
SOFTWARE ENGINEER | jlewan...@datastax.com



DS_Sig1.jpg


chandra sekhar Reddy

unread,
Mar 6, 2018, 10:53:55 AM3/6/18
to DataStax Spark Connector for Apache Cassandra
On Thursday, February 19, 2015 at 4:40:48 AM UTC+5:30, Russell Spitzer wrote:
Hi Russell,

I have similar problem. I am able to load data to cassandra successfully. However,I'm using DSE 5.0.5 which has spark cassandra 1.6.3 connector.

So,spark job is taking 285218 rows to keyspace.table in 741.160 s.

I have tried all setting batch size to lower in spark conf. But spark is not taking this.
Can you suggest how to improve the performance ?
Reply all
Reply to author
Forward
0 new messages