I am attempting to use the Cassandra Java driver to do a bulk insert of a large amount of records to two different tables on a cluster. I'm starting with a test data set of about 600 records (resulting in 600 records on each table), but will eventually be inserting about 2.5 million. I am using a prepared statement, and binding each record PreparedStatement and BoundStatement.
When I perform the inserts synchronously (with session.execute
), everything behaves as expected, with all records being inserted and showing up when I do a SELECT COUNT(*)
from cqlsh. However, if I use session.executeAsync
, only about 90% of the records are returned in each table when I query from cqlsh. After the inserts, I am waiting on all of the futures using ResultSetFuture.getUninterruptibly
, but this has not made a difference, even if I attempt to batch the inserts in groups (e.g. call session.executeAsync
50 times, then call ResultSetFuture.getUninterruptibly
on those 50 before continuing).
I do not see any exceptions, and looking at resultSet.wasApplied()
always returns true. I am using Cassandra 2.1.4, and have tried this both on a 3-node CCM cluster on localhost, as well as a deployed 3-node cluster on AWS.
I tried processing the larger dataset of 2.5 million records just to see what might happen, and after about 30,000 records, I start seeing the following message in console:
[cluster1-timeouter-0] DEBUG c.d.driver.core.RequestHandler - onTimeout triggered but the response was completed by another thread, cancelling (retryCount = 0, queryState = QueryState(count=0, inProgress=false, cancelled=false), queryStateRef = QueryState(count=0, inProgress=false, cancelled=false))
Additionally, with the larger dataset, the longer the inserts continue, the larger the gap between processed records and successful inserts. E.g. after about 600,000 records processed, I only see about 440,000 records in cqlsh. I also tried adding a callback to each future to log any failures to console:
ResultSetFuture accountFuture = session.executeAsync(insertAccount(insertStmt, record));
Futures.addCallback(accountFuture, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(@Nullable com.datastax.driver.core.ResultSet resultSet) {
// do nothing
}
@Override
public void onFailure(Throwable throwable) {
System.out.printf("Failed with: %s\n", throwable);
}
});
However, I do not see any failures in console.
CREATE KEYSPACE account WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '2'
};
To unsubscribe from this group and stop receiving emails from it, send an email to java-driver-us...@lists.datastax.com.
cassandra01$ clockdiff cassandra02
..................................................
host=cassandra02 rtt=1(0)ms/1ms delta=4ms/4ms Tue Jun 23 20:26:15 2015
cassandra01$ clockdiff cassandra03
.
host=cassandra03 rtt=750(187)ms/0ms delta=4ms/4ms Tue Jun 23 20:26:21 2015
cassandra01$ clockdiff cassandra04
..
host=cassandra04 rtt=562(280)ms/0ms delta=4ms/4ms Tue Jun 23 20:26:28 2015
--
You received this message because you are subscribed to the Google Groups "DataStax Java Driver for Apache Cassandra User Mailing List" group.