Issue with "IF NOT EXISTS" INSERT in batch not inserting all records into Cassandra

427 views
Skip to first unread message

Antonio Ye

unread,
Mar 9, 2017, 4:42:28 PM3/9/17
to DataStax Java Driver for Apache Cassandra User Mailing List
I have an issue where I am inserting some data in batch using "IF NOT EXISTS" in my insert statements. I have 9675 unique records but for some reason only 8642 are inserted. I enabled tracing but see never see the requests for the missing records in either system_traces.events nor system_traces.sessions. I suspect that for some reason the driver is not sending those records to Cassandra but have no error message anywhere to tell me why. I had brought this up on the Spark Connector for Cassandra forum but had no luck getting to the bottom of the problem. Here is the link to that post:

https://groups.google.com/a/lists.datastax.com/d/topic/spark-connector-user/8yLSqqjtwJ0/discussion

Any ideas of where to look to debug this issue?

Antonio Ye

unread,
Mar 9, 2017, 11:48:42 PM3/9/17
to DataStax Java Driver for Apache Cassandra User Mailing List
Here is a simple project that shows the problem. When I run this locally, I end up with 975 records in Cassandra instead of the expected 1000.

Antonio Ye

unread,
Mar 10, 2017, 2:29:56 PM3/10/17
to DataStax Java Driver for Apache Cassandra User Mailing List
Can someone please help? I must be doing something wrong here because I just re-wrote what I have done in Spark using the Cassandra Java driver and have a similar problem. I run a batch inserting 10 unique records followed by batch insert of 100 records and only the first 10 records get inserted.

object TestCassandraDriver {
  def main(args: Array[String]): Unit = {
    val cluster = Cluster.builder()
      .addContactPoint("127.0.0.1")
      .build()
    val session = cluster.connect()
    val batchStatement = new BatchStatement()
    val preparedStatement =
      session.prepare("INSERT INTO dev_data_lake.test_connector (id,  sequence, value) VALUES (?, ?, ?) IF NOT EXISTS")

    for (sequence <- 1 to 10) {
      val seq: java.lang.Integer = sequence
      val boundStatement = new BoundStatement(preparedStatement)
      boundStatement.bind("key1", seq, seq)
      batchStatement.add(boundStatement)
    }
    session.execute(batchStatement)

    batchStatement.clear()
    for (sequence <- 1 to 100) {
      val seq: java.lang.Integer = sequence
      val boundStatement = new BoundStatement(preparedStatement)
      boundStatement.bind("key1", seq, seq)
      batchStatement.add(boundStatement)
    }
    session.execute(batchStatement)

    session.close()
    cluster.close()

Lahiru Gamathige

unread,
Mar 10, 2017, 2:57:14 PM3/10/17
to java-dri...@lists.datastax.com
I think the issue is your last batch never inserted because it didn't reach the max batch size (but I'm not 100% sure). I'm not sure why you want to do if not exists, can you design the table in a way that it just overwrite the table.

--
You received this message because you are subscribed to the Google Groups "DataStax Java Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-driver-user+unsubscribe@lists.datastax.com.

Lahiru Gamathige

unread,
Mar 10, 2017, 3:06:48 PM3/10/17
to java-dri...@lists.datastax.com
On Fri, Mar 10, 2017 at 11:29 AM, Antonio Ye <anto...@gmail.com> wrote:
Can someone please help? I must be doing something wrong here because I just re-wrote what I have done in Spark using the Cassandra Java driver and have a similar problem. I run a batch inserting 10 unique records followed by batch insert of 100 records and only the first 10 records get inserted.

object TestCassandraDriver {
  def main(args: Array[String]): Unit = {
    val cluster = Cluster.builder()
      .addContactPoint("127.0.0.1")
      .build()
    val session = cluster.connect()
    val batchStatement = new BatchStatement()
    val preparedStatement =
      session.prepare("INSERT INTO dev_data_lake.test_connector (id,  sequence, value) VALUES (?, ?, ?) IF NOT EXISTS")

    for (sequence <- 1 to 10) {
      val seq: java.lang.Integer = sequence
      val boundStatement = new BoundStatement(preparedStatement)
      boundStatement.bind("key1", seq, seq)
      batchStatement.add(boundStatement)
    }
    session.execute(batchStatement)

    batchStatement.clear()
    for (sequence <- 1 to 100) {
      val seq: java.lang.Integer = sequence
      val boundStatement = new BoundStatement(preparedStatement)
      boundStatement.bind("key1", seq, seq)
      batchStatement.add(boundStatement)
    }
    session.execute(batchStatement)

This is bit suspicious too, because I believe session.execute is returning a future and you are closing the session before all the writes are done or without checking the future success. 
    session.close()
    cluster.close()
  }
}

On Thursday, March 9, 2017 at 8:48:42 PM UTC-8, Antonio Ye wrote:
Here is a simple project that shows the problem. When I run this locally, I end up with 975 records in Cassandra instead of the expected 1000.


On Thursday, March 9, 2017 at 1:42:28 PM UTC-8, Antonio Ye wrote:
I have an issue where I am inserting some data in batch using "IF NOT EXISTS" in my insert statements. I have 9675 unique records but for some reason only 8642 are inserted. I enabled tracing but see never see the requests for the missing records in either system_traces.events nor system_traces.sessions. I suspect that for some reason the driver is not sending those records to Cassandra but have no error message anywhere to tell me why. I had brought this up on the Spark Connector for Cassandra forum but had no luck getting to the bottom of the problem. Here is the link to that post:

https://groups.google.com/a/lists.datastax.com/d/topic/spark-connector-user/8yLSqqjtwJ0/discussion

Any ideas of where to look to debug this issue?

--

Antonio Ye

unread,
Mar 10, 2017, 4:33:21 PM3/10/17
to java-dri...@lists.datastax.com
Wait, no, session.execute returns a ResultSet not a Future. It looks like maybe the whole batch is failing because there are duplicates; sequence 1-10. If I break the second batch into two; 1-10 and 11-100 I get all 100 records in the table. Can anyone confirm that this is in fact happening? Is there any way to tell Cassandra not to fail the whole batch if there are duplicates?

Here is the table definition:
CREATE TABLE dev_data_lake.test_connector (
    id text,
    sequence int,
    value int,
    PRIMARY KEY (id, sequence)
) WITH CLUSTERING ORDER BY (sequence ASC)
Reply all
Reply to author
Forward
0 new messages