Java Client closes Connection after a while

275 views
Skip to first unread message

Alexander Duque

unread,
Feb 3, 2021, 5:25:27 PM2/3/21
to ksqldb-users
Hi,

I have next code:

import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.ClientOptions;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public class KsqlDBClient {
     public static String KSQLDB_SERVER_HOST = "localhost";
      public static int KSQLDB_SERVER_HOST_PORT = 8088;

     public static void main(String[] args) throws ExecutionException, InterruptedException {
          ClientOptions options = ClientOptions.create()
               .setHost(KSQLDB_SERVER_HOST)
               .setPort(KSQLDB_SERVER_HOST_PORT);
          Client client = Client.create(options);

          Map<String, Object> properties = Collections.singletonMap("auto.offset.reset", "earliest");
          String events = "SELECT codigo, filtros FROM stream_evento EMIT CHANGES;";

          client.streamQuery(events, properties)
               .thenAccept(streamedQueryResult -> {
                    System.out.println("Query has started. Query ID: " + streamedQueryResult.queryID());

                RowSubscriber subscriber = new RowSubscriber();
                streamedQueryResult.subscribe(subscriber);
             }).exceptionally(e -> {
                    System.out.println("Request failed: " + e);
                    return null;
             });
       }
}

I need to keep alive the connection between Client and Server, but after a while (10 idle minutes) I'm getting next error:

Received an error: java.lang.Exception: io.vertx.core.VertxException: Connection was closed

What can I do to fix it?

I hope you can help me.

Best regards,
Alex

Victoria Xia

unread,
Feb 8, 2021, 11:56:30 AM2/8/21
to ksqldb-users
Hi Alex,

The 10-minute timeout is actually enforced on the server side (the behavior is not specific to the Java client) and I don't know of any potential workarounds today. I've filed a Github issue to add support for this: https://github.com/confluentinc/ksql/issues/6970 Feel free to chime in with your use case for longer push queries on the Github issue, and we can continue the conversation there.

Best,
Victoria

Alexander Duque

unread,
Feb 8, 2021, 12:28:32 PM2/8/21
to ksqldb-users
Hi Victoria,

I really appreciate your response. I hope in the future we can keep alive connection for push query as long as we need it. For now I've solved it momentarily by adding a second thread that lists the streams every 5 minutes.

class KeepAlive implements Runnable {
     private Client client;
     Thread t;

     KeepAlive(Client client) {
          this.client = client;
          t = new Thread(this, "Thread");
          t.start();
     }
     
     public void run() {
          try {
               while(true){
                    client.listStreams().get();
                    sleep(300000);
               }
          } catch (InterruptedException | ExecutionException e) {
               System.out.println("The child thread is interrupted.");
          }
     }
}

I know it is not the best way to deal with it.

Regards,
Alex

Victoria Xia

unread,
Feb 8, 2021, 7:06:07 PM2/8/21
to ksqldb-users
Hi Alex,

Thanks for sharing! I've linked to your workaround from the GitHub issue in case others need to workaround this limitation before a better solution is implemented.

Thanks,
Victoria

Reply all
Reply to author
Forward
0 new messages