Hey Guys I need help, how to use Cassandra Asyc with paging.. not able to figure out how to fetch the result set... see my code below
public Iterator<?> fetchingMyData()
throws MyException {
BoundStatement boundStatement = new BoundStatement(statementCache.getStatement(StatementCache.MY_QUERY));
boundStatement.setFetchSize(500);
final MyCustomAsync customAsync = new MyCustomAsync();
ListenableFuture<ResultSet> future = Futures.transform(
executeAsyn(boundStatement),
cassandraAsync.iterate(1));
final List<Row> rows = new ArrayList<>();
// Do I really need this to fetch my result???
// How Do I fetch the result set???
try {
ResultSet rs= future.get();
for(Row r:rs){
rows.add(r);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
if (rows == null || rows.isEmpty()) {
throw new MyException(errorMsg);
}
return new Iterator<?>() {
private Iterator<Row> iterator = rows.iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public ? next() {
// my logic
return ?;
}
@Override
public void remove() {
// Not implemented
}
};
}
// Custom Async Class
public class MyCustomAsync {
private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomAsync.class);
private List<Row> result = new ArrayList<>();
public List<Row> getResult() {
return result;
}
protected AsyncFunction<ResultSet, ResultSet> iterate(final int page) {
return new AsyncFunction<ResultSet, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
// How far we can go without triggering the blocking fetch:
int remainingInPage = rs.getAvailableWithoutFetching();
LOGGER.info("Starting page %d (%d rows)%n", page, remainingInPage);
for (Row row : rs) {
LOGGER.info("[page %d - %d] row = %s%n", page, remainingInPage, row);
result.add(row);
if (--remainingInPage == 0)
break;
}
System.out.printf("Done page %d%n", page);
boolean wasLastPage = rs.getExecutionInfo().getPagingState() == null;
if (wasLastPage) {
LOGGER.info("Done iterating");
return Futures.immediateFuture(rs);
} else {
ListenableFuture<ResultSet> future = rs.fetchMoreResults();
return Futures.transform(future, iterate(page + 1));
}
}
};
}
}