Cassandra Timing out when doing large write

1,012 views
Skip to first unread message

Neutronis Consulting

unread,
Feb 16, 2016, 10:12:34 AM2/16/16
to DataStax Java Driver for Apache Cassandra User Mailing List
My app needs to write millions of rows.  It runs for a bit then it kicks out:

16/02/16 09:51:50 ERROR ControlConnection: [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.prepareNextRow(ArrayBackedResultSet.java:295)
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.isExhausted(ArrayBackedResultSet.java:255)
at com.datastax.driver.core.ArrayBackedResultSet$1.hasNext(ArrayBackedResultSet.java:134)
at com.neutronis.spark_reports.Spark_Reports.computeResults(Spark_Reports.java:86)
at com.neutronis.spark_reports.Spark_Reports.run(Spark_Reports.java:33)
at com.neutronis.spark_reports.Spark_Reports.main(Spark_Reports.java:168)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:218)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:43)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:284)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.run(RequestHandler.java:406)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/02/16 09:51:51 ERROR ControlConnection: [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
16/02/16 09:51:53 ERROR ControlConnection: [Control connection] Cannot connect to any host, scheduling retry in 4000 milliseconds
16/02/16 09:51:55 INFO CassandraConnector: Disconnected from Cassandra cluster: cass_cl_01
16/02/16 09:51:55 INFO SparkContext: Invoking stop() from shutdown hook

Do i need to "throttle" the writes in some way?

Thank you,
Jason

Dom Garda

unread,
Feb 18, 2016, 12:04:03 PM2/18/16
to java-dri...@lists.datastax.com
Where I work we cannot request longer timeout times because it would effect everything if we change yaml config. We r working in writing in chunks and recombing the data later.
--
You received this message because you are subscribed to the Google Groups "DataStax Java Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-driver-us...@lists.datastax.com.

Jack Krupansky

unread,
Feb 18, 2016, 12:16:33 PM2/18/16
to java-dri...@lists.datastax.com
Superficially, it sounds like one of three possibilities:

1. The Java JVM is bogging down in garbage collection. Sometimes this means you need a larger heap, but sometimes it means that you need a smaller heap, so that GC has less work to do when it does occur rather than huge GC that stops the world (and causes time outs.)
2. (Manual) throttling is needed on the app side - even once you have the optimum JVM heap side.
3. You are overloading the node and need to provision more nodes so that the load on a given node is much less.

-- Jack Krupansky

Neutronis Consulting

unread,
Feb 18, 2016, 1:53:57 PM2/18/16
to DataStax Java Driver for Apache Cassandra User Mailing List
I applied a bit of throttling but am now getting: 
16/02/18 13:40:00 ERROR ControlConnection: [Control connection] Cannot connect to any host, scheduling retry in 1000 milliseconds
Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:84)
at com.datastax.driver.core.exceptions.NoHostAvailableException.copy(NoHostAvailableException.java:37)
at com.datastax.driver.core.DriverThrowables.propagateCause(DriverThrowables.java:37)
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.prepareNextRow(ArrayBackedResultSet.java:295)
at com.datastax.driver.core.ArrayBackedResultSet$MultiPage.isExhausted(ArrayBackedResultSet.java:255)
at com.datastax.driver.core.ArrayBackedResultSet$1.hasNext(ArrayBackedResultSet.java:134)
at com.neutronis.spark_reports.Spark_Reports.computeResults(Spark_Reports.java:102)
at com.neutronis.spark_reports.Spark_Reports.run(Spark_Reports.java:35)
at com.neutronis.spark_reports.Spark_Reports.main(Spark_Reports.java:198)
Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (no host was tried)
at com.datastax.driver.core.RequestHandler.reportNoMoreHosts(RequestHandler.java:218)
at com.datastax.driver.core.RequestHandler.access$1000(RequestHandler.java:43)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution.sendRequest(RequestHandler.java:284)
at com.datastax.driver.core.RequestHandler$SpeculativeExecution$1.run(RequestHandler.java:406)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/02/18 13:40:01 ERROR ControlConnection: [Control connection] Cannot connect to any host, scheduling retry in 2000 milliseconds
16/02/18 13:40:03 ERROR ControlConnection: [Control connection] Cannot connect to any host, scheduling retry in 4000 milliseconds


Should I be tweeking my connection parameters?  Here is my connection class:


*/
        public static void main(String[] args) {
        if (args.length != 2) {
            System.err.println("Syntax: com.neutronis.Spark_Reports <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Spark Reports");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);
       
        Spark_Reports app = new Spark_Reports(conf);
        
        app.run();
    }
}



The code I'm using to throttle is as follows:

   Session session = null;
  try {
    session = connector.openSession();
  } catch( Exception ex ) {
    //  .. moan and complain..
    System.err.printf("Got %s trying to openSession - %s\n", ex.getClass().getCanonicalName(), ex.getMessage() );
  }
  if( session != null ) {

   
   
// Prepared Statement for Cassandra Inserts
        PreparedStatement statement = session.prepare(
                                "INSERT INTO model.base " +
                                "(channel, " +
                                "time_key, " +
                                "power" +
                                ") VALUES (?,?,?);");
        BoundStatement boundStatement = new BoundStatement(statement); 
    

//Query Cassandra Table that has capital letters in the column names        
        ResultSet results = session.execute("SELECT \"Time_Key\",\"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\" limit 800000;");
       
 // Get the Variables from each Row of Cassandra Data        
       for (Row row : results){
           // Upper Case Column Names in Cassandra
           time_key = row.getLong("Time_Key");
           start_frequency = row.getDouble("Start_Frequency");
           power = row.getFloat("Power");
           bandwidth = row.getDouble("Bandwidth");
           
           
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.
                for(channel = 1.6000E8; channel <= channel_end; channel+=increment ){       
                    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {

                  ResultSetFuture rsf =  session.executeAsync(boundStatement.bind(channel,time_key,power));  
                       backlogList.add( rsf );   // put the new one at the end of the list
                       if( backlogList.size() > 10000 ) {      // wait till we have a few
 
                           while( backlogList.size() > 5432 ) {      // then harvest about half of the oldest ones of them
 
                               rsf = backlogList.remove(0);
 
                               rsf.getUninterruptibly();
 
                           }    // end while

                       }  // end if

                    }  // end if

                }  // end for

  } // end "row" for
   
 } // end session



-- Jack Krupansky

To unsubscribe from this group and stop receiving emails from it, send an email to java-driver-user+unsubscribe@lists.datastax.com.

Nate McCall

unread,
Feb 19, 2016, 4:29:37 PM2/19/16
to java-dri...@lists.datastax.com


//Query Cassandra Table that has capital letters in the column names        
        ResultSet results = session.execute("SELECT \"Time_Key\",\"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\" limit 800000;");
       

That limit is way high. Just let the driver do the paging for you:
 

--
-----------------
Nate McCall
Austin, TX
@zznate

Co-Founder & Sr. Technical Consultant
Apache Cassandra Consulting
http://www.thelastpickle.com

Jason from Neutronis

unread,
Feb 19, 2016, 4:39:13 PM2/19/16
to java-dri...@lists.datastax.com
I changed my code to the following.  

//Build ResultSet from cassandra query for data manipulation.
        Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
        //Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;");
        stmt.setFetchSize(250000);
        ResultSet results = session.execute(stmt);
       
// Get the Variables from each Row of Cassandra Data        
 Multimap<Double, Float> data = LinkedListMultimap.create();
        for (Row row : results){    
            if (results.getAvailableWithoutFetching() == 1000 && !results.isFullyFetched()){
                results.fetchMoreResults();
            
           // Column Names in Cassandra (Case Sensitive)
           start_frequency = row.getDouble("Start_Frequency");
           power = row.getFloat("Power");
           bandwidth = row.getDouble("Bandwidth"); 
           
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.            
                for(channel = 1.6000E8; channel <= channel_end;  ){ 
                    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {     
                     data.put(channel, power);
                    }  // end if
                    channel+=increment;
                }  // end for    
            } // end fetching IF       
        } // end "row" for
        
        //System.out.println(data); 
   
// Create Spark List for DataFrame        
        List<Value> values = data.asMap().entrySet()
            .stream()
            .flatMap(x -> x.getValue()
                    .stream()
                    .map(y -> new Value(x.getKey(), y)))
            .collect(Collectors.toList());
    
// Create DataFrame and Calculate Results
    sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
        .agg(min("power"), max("power"), avg("power"))
        .write().mode(SaveMode.Append)      
        .option("table", "results")
        .option("keyspace", "model")
        .format("org.apache.spark.sql.cassandra").save();
  
    } // end session
} // End Compute 

--
You received this message because you are subscribed to the Google Groups "DataStax Java Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-driver-us...@lists.datastax.com.

Nate McCall

unread,
Feb 19, 2016, 5:23:14 PM2/19/16
to java-dri...@lists.datastax.com
 
>
>         stmt.setFetchSize(250000);
>         ResultSet results = session.execute(stmt);


This should be more like 1000, if that (and availableWithoughtFetching to ~10%  so 100).
 
>
>    
> // Create Spark List for DataFrame        
>         List<Value> values = data.asMap().entrySet()
>             .stream()
>             .flatMap(x -> x.getValue()
>                     .stream()
>                     .map(y -> new Value(x.getKey(), y)))
>             .collect(Collectors.toList());


Wait, this is for spark? Why are you building a direct CQL connection to Cassandra?

Just get the RDD for the table (havent looked at the connector's dataframe API yet, so YMMV):
val rdd = sc.cassandraTable("SB1000_49552019", "Measured_Value").select("Power","Bandwidth","Start_Frequency");

Jason from Neutronis

unread,
Feb 19, 2016, 6:47:00 PM2/19/16
to java-dri...@lists.datastax.com
I dont know how to manipulate the RDD directly to do the same manipulation I'm doing in the for loop to create a custom set to run the spark statistics on.

Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_47130646\".\"Measured_Value\";");
        //Statement stmt = new SimpleStatement("SELECT power, bandwidth, start_frequency FROM model.reports;");
        stmt.setFetchSize(10000);
        ResultSet results = session.execute(stmt);

Multimap<Double, Float> data = LinkedListMultimap.create();
        for (Row row : results){    
            if (results.getAvailableWithoutFetching() == 1000 && !results.isFullyFetched()){
                results.fetchMoreResults();
            
           // Column Names in Cassandra (Case Sensitive)
           start_frequency = row.getDouble("Start_Frequency");
           power = row.getFloat("Power");
           bandwidth = row.getDouble("Bandwidth"); 
           
// Create Channel Power Buckets, place information into prepared statement binding, write to cassandra.            
                for(channel = 1.6000E8; channel <= channel_end;  ){ 
                    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {     
                     data.put(channel, power);
                    }  // end if
                    channel+=increment;
                }  // end for    
            } // end fetching IF       
        } // end "row" for
Reply all
Reply to author
Forward
0 new messages