List<Aggregate> updatedAggregates = null;
try (StatefulRedisConnection<String, String> connection = this.redisClient.connect()) {
RedisCommands<String, String> syncCommands = connection.sync();
log.info("Connected to Redis in {}", stopwatch.stop().toString()); stopwatch.start();
updatedAggregates = aggregates.stream()
.map(aggregate -> {
String key = aggregate.getId();
Long requestCount = aggregate.getRequestCount();
Long newCount = syncCommands.incrby(key, requestCount);
syncCommands.expire(key, this.aggregateTtl);
return new DefaultAggregate(key, aggregate.getBlockedValue(),
aggregate.getTimeBucket(), newCount, aggregate.getExpirationTime());
})
.collect(toList());
} catch (Exception e) {
log.error("Error updating aggregates", e);
}