I am working on a project in which I need to insert data into Cassandra database. So for that I am usingPelops client
.
I have a Multithreaded code which will insert into Cassandra database using Pelops client
. And I am using ExecutorService
for that.
In my program, each thread will work on some range, like
Thread1 will work on 1 to 20
Thread2 will work on 21 to 40
...
...
Below is the code I have which I am using to insert into Cassandra database-
public static void main(String[] args) {
LOG.info("Loading data in Cassandra database..!!");
ExecutorService service = Executors.newFixedThreadPool(noOfThreads);
try {
// queue some tasks
for (int i = 0, nextId = startRange; i < noOfThreads; i++, nextId += noOfTasks) {
service.submit(new CassandraTask(nextId, noOfTasks));
}
service.shutdown();
service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
LOG.warn("Threw a Interrupted Exception in" + CNAME + ".PelopsLnPClient: boss told me to stop...Not my fault!!");
} catch (Exception e) {
LOG.error("Threw a Exception in" + CNAME + e);
}
}
Below is the CassandraTask class
that implements Runnable interface
class CassandraTask implements Runnable {
private final int id;
private final int noOfTasks;
private final String nodes = "localhost";
private final String thrift_connection_pool = "Test Cluster";
private final String keyspace = "my_keyspace";
private final String column_family = "PROFILE_USER";
public static ConcurrentHashMap<Long, AtomicLong> insertHistogram = new ConcurrentHashMap<Long, AtomicLong>();
public CassandraTask(int nextId, int noOfTasks) {
this.noOfTasks = noOfTasks;
}
public void run() {
try {
cassandraConnection();
Mutator mutator = Pelops.createMutator(thrift_connection_pool);
for (int userId = id; userId < id + noOfTasks; userId++) {
mutator.writeColumns(column_family, String.valueOf(userId),
mutator.newColumnList(
mutator.newColumn("aount", "{\"lv\":[{\"v\":{\"ss\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"),
mutator.newColumn("aising", "{\"lv\":[{\"v\":{\"ddd\":null,\"ebayAdsOnThirdParty\":null,\"userId\":" + userId + "},\"cn\":2}],\"lmd\":20130206211109}"),
mutator.newColumn("acats", "{\"lv\":[{\"v\":{\"ss\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"),
mutator.newColumn("aules", "{\"lv\":[{\"v\":{\"ss\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"),
mutator.newColumn("ahased", "{\"lv\":[{\"v\":{\"ss\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"),
mutator.newColumn("aservice", "{\"lv\":[{\"v\":{\"ss\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"),
mutator.newColumn("aphic", "{\"lv\":[{\"v\":{\"ss\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}"),
mutator.newColumn("arches", "{\"lv\":[{\"v\":{\"ss\":null,\"userState\":null,\"userId\":" + userId + "},\"cn\":1}],\"lmd\":20130206211109}")
));
}
mutator.execute(ConsistencyLevel.ONE);
} catch (Exception e) {
System.err.println("Threw a Exception in " + e);
} finally {
Pelops.shutdown();
}
}
/**
* Making a Cassandra Connection by adding nodes
*
/
private void cassandraConnection() {
Cluster cluster = new Cluster(nodes, 9160);
Pelops.addPool(thrift_connection_pool, cluster, keyspace);
}
}
Whenever I am running the above program, I am getting below exceptions always-
Threw a Exception in java.lang.RuntimeException: exception while registering MBean, com.scale7.cassandra.pelops.pool:type=PooledNode-my_keyspace-localhost
Can anyone help me out with this what wrong I am doing here? I believe there is some minor mistake I am making here? If I am running slowly slowly then I don't get this exception. By slow I means, by putting breakpoint in the code. Very strange somehow.
I am working with Cassandra 1.2.3