How to use dropwizard metrics for asynchronous callback

23 views
Skip to first unread message

ANIRBAN GHOSH

unread,
May 9, 2018, 9:06:32 AM5/9/18
to metrics-user

Hi I want to use dropwizard metrics for asynchronous callback in my application.I am inserting records in cassandra asychrounously.


I have a list of queries (BoundStatements) being executed asynchronously. I have added a callback to track onSuccess() and onFailure() to my asynchronous execution. (A semaphore is also there just to throttle number of asynchronous insert query execution)

Now, I want correctly implement metric so that I can measure or monitor following, 1. Number of insert request/queries sent per minute. 2. Number of request executed succesfully 3. Number of response received after successful insertion (success count,success rate per minute) 3. Number of response received after failure insertion (failure count,failure rate per minute)

Below is my code snippet for which I need to include the metrics.Please help.


    
final Semaphore mySemaphore = new Semaphore(100);   
public List<ListenableFuture<ResultSet>> sendAsyncQueries(List<BoundStatement> boundStatements) throws CqlException, AdapterException {
        List<ListenableFuture<ResultSet>> futures = new ArrayList<>();
        final ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
        for (BoundStatement boundStatement : boundStatements) {
            try {
                queryPermits.acquire();
            } catch (InterruptedException e) {
                LOG.error("InterruptedException while acquire permit for asynchronous insertion ",e);
            }
            try {
                final ListenableFuture<ResultSet> resultSetFuture = executeAsynchronously(boundStatement, ConsistencyLevel.QUORUM);//<- having 
                Futures.addCallback(resultSetFuture,
                        new FutureCallback<ResultSet>() {
                            @Override
                            public void onSuccess(ResultSet result) {
                                LOG.debug("Inserted cassandra row ");
                                queryPermits.release();
                            }
                            @Override
                            public void onFailure(Throwable t) {
                                LOG.error("Error while inserting inCassandra ");
                                queryPermits.release();
                            }
                        },
                        executor
                );
                futures.add(resultSetFuture);
            }catch(AdapterException e){
                queryPermits.release();
                throw e;
            }
        }
        return  futures;
    }



Reply all
Reply to author
Forward
0 new messages