A trivial query makes the connection stop responding

131 views
Skip to first unread message

Ondřej Nešpor

unread,
Feb 13, 2015, 4:51:16 AM2/13/15
to java-dri...@lists.datastax.com
Hi everybody,

I've spent the whole day yesterday trying to debug this and honestly, I have no idea. I am running C* 2.0.12 as a single node cluster for development. My application uses the 2.0.9.2 client and runs on the same Linux machine.

The problem is following - the application starts, initializes prepared statements (it uses only prepared statements, no ad-hoc queries), runs for a while and then something happens and "All host(s) tried for query failed (no host was tried)". I was able to debug that this is because the C* client thinks there was a read timeout for one of the queries (prepared statement executions) and shuts down the connection. And this is where is gets seriously weird.

I have set the logging level to TRACE for both the client and the C* server. I have also taken the 2.0.9.2 client and altered how it's logging (I have added the stream ID into logged messages because without that I wouldn't get very far) and have enabled the LoggingHandler in the pipeline. Other than that it's what is under the 2.0.9.2 tag in git. I have also set the application to use a single connection (setCoreConnectionsPerHost 1, setMaxConnectionsPerHost 1).

There is a table with the following structure:

CREATE TABLE t (
    a bigint,
    b ascii static,
    c text static,
    d ascii static,
    e ascii static,
    f decimal static,
    g decimal static,
    h decimal static,
    i ascii static,
    j boolean static,
    k timestamp static,
   
    l bigint,
    m decimal,
    n decimal,
    o bigint,
    p boolean,
    PRIMARY KEY (a, l)
) WITH CLUSTERING ORDER BY (l ASC);

and a prepared statement

SELECT * FROM t WHERE a = :id;

There are currently 4 rows in the table and all of them have the clustering part too (all of them have the "l" column not null).

And this is what happens:

The application starts and prepares statements (the above one and about 190 others). The testing server really isn't powerful but during the startup there are up to 50 in flight queries to prepare a statement and everything is snappy.
After a while, the application needs to load two rows from the t table above. It executes the prepared statement twice with different IDs. And this is what happens (reading from the application and C* logs):
  1. There is a request (stream ID #1) that has just received the response so number #1 is occupied.
  2. APP: At 08:19:43.214 the application sends an EXECUTE request to C* (stream ID #0) - this is the request to load the first row.
  3. APP: At 08:19:43.215 the application sends an EXECUTE request to C* (stream ID #2) - this is the request to load the second row.
  4. CAS: At 08:19:43,221 C* has received #0.
  5. APP: At 08:19:43.238 the application sends an EXECUTE request (of another statement) (#1).
  6. CAS: At 08:19:43,258 C* has received #2.
  7. CAS: At 08:19:43,258 C* has received #1.
  8. APP: At 08:19:43.308 the application sends EXECUTE #3.
  9. CAS: At 08:19:43,340 C* has received #3.
  10. APP: At 08:19:43.346 the application sends EXECUTE #4.
  11. CAS: At 08:19:43,354 C* has received #4.
  12. CAS: At 08:19:43,385 C* sends a response to #0 with the first row (looking at the data, it is really a correct response to that request).
  13. CAS: At 08:19:43,389 C* responds to #1 (no rows).
  14. APP: At 08:19:43.390 the application has received the #0 response (again, the response is correct).
  15. APP: At 08:19:43.433 the application sends EXECUTE #0 (reuses the stream ID for another request).
  16. CAS: At 08:19:43,504 C* responds to #3 (one row).
  17. CAS: At 08:19:43,516 C* sends a response to #2 with the secod row.
  18. CAS: At 08:19:43,541 C* sends a response to #4 (one row).
  19. APP: The applications tries to send further EXECUTE statements until the #2 times out and shuts down the connection.

The weird part is that something wrong has happened after the application received the #0 response because although C* was sending responses, the application never received them. And vice versa - the application was sending requests but C* never received them (there were 24 in flight requests in the moment of #2 timeout). Basically, both sides think the connection is working and are writing to it but the other side doesn't receive anything. And there are no communication related exceptions in logs (or any other exceptions for that matter).

It could be one time network glitch or something but the problem is that this error is 100% reliable. Every time I start the application, the communication stops when the response to this particular request is received. And it behaves the same when I start the application in NetBeans debugger under Windows (and connect to C* on the Linux box).

Has anybody seen anything like that?

I'll try to make a small application that will hopefully be able to reproduce it.


Thanks.

Andrew


Bottom line: I would consider it VERY USEFUL if both the client and C* logged stream IDs of requests and responses.

Olivier Michallat

unread,
Feb 13, 2015, 8:54:36 AM2/13/15
to java-dri...@lists.datastax.com
Hi,

Did you customize any of the options when you initialize the Cluster object?

--

Olivier Michallat

Driver & tools engineer, DataStax


To unsubscribe from this group and stop receiving emails from it, send an email to java-driver-us...@lists.datastax.com.

Ondřej Nešpor

unread,
Feb 14, 2015, 11:09:41 AM2/14/15
to java-dri...@lists.datastax.com
Hi,

I've found the problem. It's not the query nor the table nor C*. It seems that the problem is somewhere inside the driver.

I have discovered the following: You call session.executeAsync() and get a ResultSetFuture. You want to process it somehow, so you call Futures.transform(future, function). And if the transforming function contains another CQL call, the connection may stop responding. According to my observations, if you have more than 2 * futures connections (one for the async call, another for the transforming function), you are fine. Otherwise the connection may time out. Or maybe if the server responds fast enough, you may be OK, too.

I am attaching a simple class that demonstrates this. In my observation, if you set connections per host to 2 * ROW_COUNT, it runs fine. Otherwise it may reach the read timeout. I've also tried a workaround that does not transform the Future but uses a future callback and it works. However the Future.transform approach is much cleaner in some cases.

Is this something known or should I create a JIRA issue?


Andrew


Dne pátek 13. února 2015 14:54:36 UTC+1 Olivier Michallat napsal(a):
Test.java

Andrew Tolbert

unread,
Feb 14, 2015, 8:02:51 PM2/14/15
to java-dri...@lists.datastax.com
Hi,

Thanks for sharing the example code.  There are a few implications of your code that helps manifest a situation like this:
  1. The call to Futures.transform is not providing an Executor.  When an Executor is not provided, the thread that completed the future is also the one that also invokes the function you are providing to Futures.transform and the future completion logic does not complete until your function does.   The reason why this is important is that the driver does not release the connection processing the request until the future.set() call completes.   This prevents further requests from being processed (if maxed out), and in your case this can include what the function is invoking (session.execute) which will cause the thread to block indefinitely.  This is also problematic since this is inside an netty I/O worker thread, so another failure case is that if you have greater than the worker thread count responses completing at once, you could completely block all I/O in the driver.  As you mentioned, increasing the number of connections per host seemed to alleviate the problem, that is because this allowed more simultaneous requests to be processed at a time and you did not encounter the scenario where session.execute hangs because there are no available connections.
  2. The code doesn't govern how many requests are being executed at a time.  executeAsync is being ROW_COUNT times, which would submit many requests at a time.  You should govern how many requests can be executed at once in the event that you are worried about overwhelming your cassandra cluster.  When I increase ROW_COUNT to a larger number (i.e. 20000), it is very apparent that my cassandra cluster is under stress.
  3. You are mixing synchronous and asynchronous calls.  The result of a executeAsync query being completed causes a session.execute query to be executed, blocking whatever thread is calling it until the request completes.  While this isn't a big problem, it does create some messy situations and does in the way reduce the benefit of the async part of the code since you are ultimately still blocking somewhere.   I'd recommend chaining executeAsync calls using Future.transform with AsyncFunction.  This would give you a ResultSetFuture back that completes when the second query finishes and does no heavy blocking I/O in between calls.
By providing a separate executor for executing the function you are providing to transform, this effectively alleviates this problem.  I'd also recommend governing the number of simultaneous executeAsync executions at a time (using something like a Semaphore), and changing the execute calls in your Callbacks/Transform Functions to executeAsync.

Here's an update of your implementation that adds a separate executor and uses AsyncFunction with executeAsync to chain the queries together.  The only potentially long waiting is done in the for loop at the end that gets the value from the resulting future.

    private static void loadAsync2Timeout(final Session session) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(4);

        final Map<Long, ListenableFuture<ResultSet>> futures = Maps.newHashMap();

        final AsyncFunction<ResultSet, ResultSet> transformer = new AsyncFunction<ResultSet, ResultSet>() {

            @Override
            public ListenableFuture<ResultSet> apply(ResultSet input) {
                final long a = input.one().getLong("a");

                return session.executeAsync(
                    preparedStatements.get(STATEMENT.GET_ROW2).bind(a + 1)
                );
            }

        };

        for (long i = 0; i < ROW_COUNT; ++i) {
            futures.put(
                i,
                Futures.transform(
                    session.executeAsync(preparedStatements.get(STATEMENT.GET_ROW1).bind(i)),
                    transformer,
                    executor
                )
            );
        }

        for (final Map.Entry<Long, ListenableFuture<ResultSet>> entry : futures.entrySet()) {
            System.out.format("t1 %d async loaded with t2 value %d\n", entry.getKey(), entry.getValue().get().one().getLong("b"));
        }
    }

Olivier wrote a good article explaining the workflow of how async queries are processed in the driver which also shows some examples.

---

The following thread stack trace shows a response from cassandra being received (red), the apply function being called on your function (green) and then the subsequent session.execute call which blocks on completion of your query (blue):

"New I/O worker #12" nid=23 state=WAITING
    - waiting on <0x381ba0c6> (a com.google.common.util.concurrent.AbstractFuture$Sync)
    - locked <0x381ba0c6> (a com.google.common.util.concurrent.AbstractFuture$Sync)
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303)
    at com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:292)
    at com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:177)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
    at com.datastax.cassandra.examples.Test$1.apply(Test.java:136)
    at com.datastax.cassandra.examples.Test$1.apply(Test.java:130)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:720)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:859)
    at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293)
    at com.google.common.util.concurrent.ExecutionList$RunnableExecutorPair.execute(ExecutionList.java:150)
    at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:135)
    at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:185)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:109)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:283)
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:313)
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:715)
    at com.datastax.shaded.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at com.datastax.shaded.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at com.datastax.shaded.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at com.datastax.shaded.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at com.datastax.shaded.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at com.datastax.shaded.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at com.datastax.shaded.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at com.datastax.shaded.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at com.datastax.shaded.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at com.datastax.shaded.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at com.datastax.shaded.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at com.datastax.shaded.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at com.datastax.shaded.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at com.datastax.shaded.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at com.datastax.shaded.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
      - java.util.concurrent.ThreadPoolExecutor$Worker@e5297a7

Andy

Olivier Michallat

unread,
Feb 15, 2015, 2:58:33 PM2/15/15
to java-dri...@lists.datastax.com
the driver does not release the connection processing the request until the future.set() call completes

I'll note that this is something that changed in 2.0.9.2. The code used to release the connection before, but I moved it to a finally block (there's one place in the middle of the method where we don't want to release the connection).

Even though you should still follow Andy's recommendations, I think the code should have handled this better, so I'm going to revisit my changes (see JAVA-666).

Andrew Tolbert

unread,
Feb 15, 2015, 3:03:45 PM2/15/15
to java-dri...@lists.datastax.com

I'll note that this is something that changed in 2.0.9.2. The code used to release the connection before, but I moved it to a finally block (there's one place in the middle of the method where we don't want to release the connection).

Good catch Olivier!  I did not make the connection that the behavior changed in 2.0.9.2.  It does seem like a good idea to release the connection earlier if it can be.

Sylvain Lebresne

unread,
Feb 16, 2015, 3:22:28 AM2/16/15
to java-dri...@lists.datastax.com
On Sun, Feb 15, 2015 at 9:03 PM, Andrew Tolbert <andrew....@datastax.com> wrote:

I'll note that this is something that changed in 2.0.9.2. The code used to release the connection before, but I moved it to a finally block (there's one place in the middle of the method where we don't want to release the connection).

Good catch Olivier!  I did not make the connection that the behavior changed in 2.0.9.2.  It does seem like a good idea to release the connection earlier if it can be.

For the record, while it's not necessarily a bad idea  to change back to release the connection sooner, this won't make much difference here. The problem is that if no executor are provided, callbacks are called on a Netty I/O thread (as it kind of should be) and that means that if you execute a request (and block on it) you will deadlock, no matter how soon we release the connection. This, btw, also mean that doing expensive operation in a callback without having passed an executor is also a bad idea as it will kill your performance (as it will occupy a Netty I/O thread for a long time which is a big "don't do" for Netty).

It would probably be worth documenting in executeAsync() that you should always provide an executor for your callbacks unless those are very lightweight and non-blocking (even though this is already document in guava's doc).

--
Sylvain
 
 
On Sunday, February 15, 2015 at 1:58:33 PM UTC-6, Olivier Michallat wrote:
the driver does not release the connection processing the request until the future.set() call completes

I'll note that this is something that changed in 2.0.9.2. The code used to release the connection before, but I moved it to a finally block (there's one place in the middle of the method where we don't want to release the connection).

Even though you should still follow Andy's recommendations, I think the code should have handled this better, so I'm going to revisit my changes (see JAVA-666).

--

Olivier Michallat

Driver & tools engineer, DataStax


Ondřej Nešpor

unread,
Feb 16, 2015, 4:01:32 AM2/16/15
to java-dri...@lists.datastax.com
Hi guys. I agree it would be good to explicitly mention this in the driver docs because I'm quite sure there are more people who assume "the query runs asynchronously anyway, so why bother with a separate executor for its transformation" although it's obviously wrong :)

Thanks for the help!


Andrew


Dne pondělí 16. února 2015 9:22:28 UTC+1 Sylvain Lebresne napsal(a):

Matt Jurik

unread,
Feb 17, 2015, 12:27:48 AM2/17/15
to java-dri...@lists.datastax.com
I'll second this. I just finished an investigation into why some of our queries were taking 12s and concluded that it was a driver bug (in fact, I found this thread while searching to see if the bug was already reported.. before going to file this myself). Like mentioned here, we were doing a Futures.transform that simply read the columns and converted them into domain-specific objects (no additional queries, no expensive operations, etc). However, executing the transform in a separate executor solved the issue.

It would be great if the driver could do a better job of protecting users of this trap. Having queries mysteriously take 12s and having driver connections get reset and then recreated all because of this is all very confusing and will waste a lot of people's time.

-Matt
Reply all
Reply to author
Forward
0 new messages