paging with Cassandra Async

572 views
Skip to first unread message

Nizar Alseddeg

unread,
Dec 14, 2016, 12:07:02 AM12/14/16
to DataStax Java Driver for Apache Cassandra User Mailing List
Hey Guys I need help, how to use Cassandra Asyc with paging.. not able to figure out how to fetch the result set... see my code below


public Iterator<?> fetchingMyData()
throws
MyException {

BoundStatement boundStatement = new BoundStatement(statementCache.getStatement(StatementCache.MY_QUERY));

boundStatement.setFetchSize(500);
    final MyCustomAsync customAsync = new MyCustomAsync();
ListenableFuture<ResultSet> future = Futures.transform(
executeAsyn(boundStatement
),
cassandraAsync.iterate(
1));

final List<Row> rows = new ArrayList<>();
// Do I really need this to fetch my result???
   // How Do I fetch the result set???
    try {
ResultSet rs= future.get();
for(Row r:rs){
rows.add(r);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch
(ExecutionException e) {
e.printStackTrace();
}


if (rows == null || rows.isEmpty()) {
throw new MyException(errorMsg);
}

return new Iterator<?>() {

private Iterator<Row> iterator = rows.iterator();

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public ? next() {
// my logic
       return ?;
}

@Override
public void remove() {
// Not implemented
}
};
}

// Custom Async Class

public class MyCustomAsync {

private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomAsync.class);

private List<Row> result = new ArrayList<>();


public List<Row> getResult() {
return result;
}

protected AsyncFunction<ResultSet, ResultSet> iterate(final int page) {

return new AsyncFunction<ResultSet, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {

// How far we can go without triggering the blocking fetch:
int remainingInPage = rs.getAvailableWithoutFetching();

LOGGER.info("Starting page %d (%d rows)%n", page, remainingInPage);

for (Row row : rs) {
LOGGER.info("[page %d - %d] row = %s%n", page, remainingInPage, row);
result.add(row);
if (--remainingInPage == 0)
break;
}
System.out.printf("Done page %d%n", page);

boolean wasLastPage = rs.getExecutionInfo().getPagingState() == null;
if (wasLastPage) {
LOGGER.info("Done iterating");
return Futures.immediateFuture(rs);
} else {
ListenableFuture<ResultSet> future = rs.fetchMoreResults();
return Futures.transform(future, iterate(page + 1));
}
}
};
}

}

DuyHai Doan

unread,
Dec 14, 2016, 8:28:31 AM12/14/16
to DataStax Java Driver for Apache Cassandra User Mailing List
Message has been deleted

Nizar Alseddeg

unread,
Dec 14, 2016, 12:37:09 PM12/14/16
to DataStax Java Driver for Apache Cassandra User Mailing List
Hi Doan, Thanks I know about this.. However I want to use session.executeAsync .. as it provides a better performance. I did follow this post 
and 
You can't use CallBack with paging as it will trigger sync call.. so a better way is to use getAvailableWithoutFetching to limit the iteration to the current page, and fetchMoreResults .

My question is how to fetch my result set?

DuyHai Doan

unread,
Dec 14, 2016, 2:43:39 PM12/14/16
to java-dri...@lists.datastax.com
"You can't use CallBack with paging as it will trigger sync call"

--> Right, that's quite sad indeed. If you want to use paging, the first call can be async but subsequent call will always be synchronous ... I hope there will be enhancement for this in the future, fingers crossed

--
You received this message because you are subscribed to a topic in the Google Groups "DataStax Java Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this topic, visit https://groups.google.com/a/lists.datastax.com/d/topic/java-driver-user/iqfAOoXpuuo/unsubscribe.
To unsubscribe from this group and all its topics, send an email to java-driver-user+unsubscribe@lists.datastax.com.

Andrew Tolbert

unread,
Dec 14, 2016, 3:15:24 PM12/14/16
to DataStax Java Driver for Apache Cassandra User Mailing List
--> Right, that's quite sad indeed. If you want to use paging, the first call can be async but subsequent call will always be synchronous ... I hope there will be enhancement for this in the future, fingers crossed

That's technically only true if you hit a page bounary while iterating over the ResultSet (causing the next page to be fetched in a blocking manner) in the callback.  The driver will actually throw an exception if you try to do that and the callback is executed on a netty executor thread (JAVA-1261).

As Nizar's code shows, fetchMoreResults returns a future and does not block, so you could use that as a means of fetching the next page asynchronously.  The async paging section of the java-driver-async-queries page that was previously linked shows a pattern for doing this.

I agree that it's not exactly easy and very error prone (JAVA-1302) as the user has to take care that while they are iterating over the result set that they don't hit a page boundary causing another fetch.  The driver definitely needs an API for paging in an asynchronous manner.  Iterable is not a good API for doing async.

Thanks,
Andy
To unsubscribe from this group and all its topics, send an email to java-driver-us...@lists.datastax.com.

Atul Saroha

unread,
Dec 28, 2016, 2:16:31 PM12/28/16
to java-dri...@lists.datastax.com
Hi Nizar,

If you are looking a performing utility in java for quick table scan. This is the link of utility. It works on same principle as spark does. You can throttle it accordingly. 



--
You received this message because you are subscribed to the Google Groups "DataStax Java Driver for Apache Cassandra User Mailing List" group.
To unsubscribe from this group and stop receiving emails from it, send an email to java-driver-user+unsubscribe@lists.datastax.com.



--
Atul Saroha
Ph.No. 8884071271
Reply all
Reply to author
Forward
0 new messages