Hello Crossbar team,
We decided to make our node in java, everything works fine, till client loses connection to server.
I'we seen the reconnect example in android example but I couldn't recreate this for netty, so I wrote my own reconnecting mechanism but sometimes produces two sessions after connecting.
Here is the snippet of the code:
main.java:
    package ledinek.control;
import java.io.IOException;
import ledinek.control.helpers.PropUtils;
import ledinek.control.ws.WebStatusModel;
import ledinek.control.ws.WsConnection;
import org.apache.log4j.Logger;
public class Main {
static Logger logger = Logger.getLogger(Main.class);
public static void main(String[] args) {
logger.info("Entering application.");
try {
PropUtils.load(System.getenv("PROPERTIES"));
} catch (IOException e) {
e.printStackTrace();
}
//BasicConfigurator.configure();
//logger.setLevel(Level.OFF);
WebStatusModel webstatus = new WebStatusModel();
Object wssync = new Object();
String ws_url = PropUtils.properties.getProperty("websocket-url");
String ws_realm = PropUtils.properties.getProperty("websocket-realm");
WsConnection backendComm = new WsConnection(ws_url, ws_realm, webstatus, wssync);
final Thread wsc = new Thread(backendComm);
wsc.start();
}
}
WsConnection.java
  package ledinek.control.ws;
  Â
  import com.google.gson.Gson;
  import com.google.gson.JsonObject;
  import io.crossbar.autobahn.wamp.Client;
  import io.crossbar.autobahn.wamp.Session;
  import io.crossbar.autobahn.wamp.types.*;
  Â
  import java.util.concurrent.TimeUnit;
  import java.net.URI;
  import java.util.*;
  import java.util.concurrent.CompletableFuture;
  import java.util.logging.Logger;
  Â
  public class WsConnection implements Runnable {
  Â
    private static final Logger LOGGER = Logger.getLogger(WsConnection.class.getName());
    private String url;
    private String realm;
    private CompletableFuture<ExitInfo> connection;
    private Session session = null;
    private final PublishOptions publishOptions = new PublishOptions(true, false);
    private boolean connected = false;
    private int retry_attempts = 0;
    private int retry_sleep = 0;
    private int sleep = 0;
  Â
    private WebStatusModel webstatus;
    private Gson gson = new Gson();
    private URI uri;
    private Object wssync;
    private String prefix = "com.ledinek.control.";
    private Client client;
  Â
    public WsConnection(String url, String realm, WebStatusModel webstatus, Object wssync) {
      this.url = url;
      this.realm = realm;
      this.webstatus = webstatus;
      this.wssync = wssync;
  Â
    }
  Â
    @Override
    public void run() {
      while(true) {
        try {
          // reconnecting
          if (!connected){
            if (retry_attempts == 0 ){
              System.out.println("Connecting...");
            } else {
              if (retry_sleep > 6){retry_sleep = 1;}
              sleep = retry_sleep * retry_sleep;
              System.out.println("Connection attempt " + retry_attempts + " sleeping for " + sleep + " seconds");
            }
            retry_attempts++;
            retry_sleep++;
            System.out.println("Make new session");
            this.session = null;
            this.session = new Session();
            this.client = null;
            this.client = new Client(session, url, realm);
            this.session.addOnConnectListener(this::onConnect);
            this.session.addOnJoinListener(this::onJoin);
            this.session.addOnLeaveListener(this::onLeave);
            this.session.addOnDisconnectListener(this::onDisconnect);
            this.connection = null;
            this.connection = this.client.connect();
          }
          if (!connected){
            Thread.sleep(1000 * retry_sleep);
          } else {
            Thread.sleep(1000);
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  Â
    /*
     * Websocket client
     */
  Â
    private void onJoin(Session session, SessionDetails details) {
      this.session = session;
      this.retry_attempts = 0;
      this.retry_sleep=1;
      // Send to event_log
      LOGGER.info("Published restart to event_log");
      this.publish("event_log", "error","Control system restarted.");
  Â
      /*
       * RPC registrations
       */
      CompletableFuture<Registration> order_to_produce = session.register(prefix + "order_to_produce", this::order_to_produce);
      order_to_produce.thenAccept(reg -> LOGGER.info("Registered procedure: " + prefix + "order_to_produce"));
  Â
      /*
       * ADD SERVICE ON BACKEND COMPONENT
       */
  Â
      try {
        TimeUnit.SECONDS.sleep(1);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      CompletableFuture<CallResult> add_service = session.call("com.ledinek.backend.add_service",
          "control", session.getID());
      add_service.thenAccept(callResult ->{
  //      System.out.println(callResult.results.get(0).getClass());
        if (callResult.results.get(0).equals(true)){
          LOGGER.info("Service successfully added to list of services");
        } else {
          LOGGER.warning("Service is already added to list of online services. EXIT!");
        }
      }).exceptionally(ex -> {
        System.out.println(ex.getMessage());
        return null;
      });
  Â
  Â
    }
  Â
    private void onConnect(Session session) {
      connected = true;
      retry_attempts = 1;
      LOGGER.info("Session connected, ID=" + session.getID());
    }
  Â
    private void onLeave(Session session, CloseDetails detail) {
      connected = false;
      LOGGER.info(String.format("Left reason=%s, message=%s", detail.reason, detail.message));
    }
  Â
    private void onDisconnect(Session session, boolean wasClean) {
      connected = false;
      System.out.println(String.format("Session with ID=%s, disconnected.", session.getID()));
    }
  Â
    public void publish(String topic, String type, String message){
      CompletableFuture<Publication> p = session.publish(prefix + topic,
          type, message,System.currentTimeMillis() / 1000L, UUID.randomUUID().toString(), publishOptions);
      p.whenComplete((publication, throwable) -> {
        if (throwable != null) {
  Â
  //          LOGGER.info(String.format("published to %s with message %s ", prefix + topic, message));
          LOGGER.warning(String.format(" ERROR - published to %s with message %s - FAILED !", prefix + topic, message));
        }
      });
    }
  Â
  Â
    /*
     * RPC FUNCTIONS
     * ( you need to register them in function onJoin )
     */
  Â
    /**
     * @apiNote Call URI - com.ledinek.control.order_to_produce
     *
     * @param args - [ float maxMoistureLimit, float minMoistureLimit ]
     * @param details - call details
     * @return - Success / Fail
     *
     * curl -H "Content-Type: application/json" -d '{"procedure": "com.ledinek.control.order_to_produce": [0.12, 0.15]}' localhost:8080/caller
     */
    //TODO [ rpc on db_component that calls this function ]
    private String order_to_produce(List<LinkedHashMap> args, InvocationDetails details) {
    /*
    frontend call
    const response = yield saveItem('/api/v1/orders/all/check_to_production/', 'POST', a.items.map((item) => item.id));
     */
      try {
        Order order = null;
        synchronized(this.wssync) {
          order = this.gson.fromJson(gson.toJson(args.get(0)), Order.class);
          System.out.println(order.id);
        }
        if (order != null) {
          System.out.println("Produce order");
        }
        return "order_to_produce: True";
      } catch (Exception e) {
        e.printStackTrace();
      }
      return "order_to_produce: False";
    }
  Â
  }
and terminal output:
  Connecting...
  Make new session
  Init line started
  Trying to connect to PLC: 10.10.12.10 Init line finished
  Connection attempt 1 sleeping for 1 seconds
  Make new session
  Apr 13, 2018 11:43:49 AM ledinek.control.ws.WsConnection onConnect
  INFO: Session connected, ID=0
  Apr 13, 2018 11:43:49 AM ledinek.control.ws.WsConnection onConnect
  INFO: Session connected, ID=0
  ...
  Apr 13, 2018 11:44:54 AM io.crossbar.autobahn.wamp.transports.NettyWebSocketClientHandler
  INFO: WebSocket Client disconnected!
  Apr 13, 2018 11:44:54 AM io.crossbar.autobahn.wamp.transports.NettyWebSocketClientHandler
  INFO: WebSocket Client disconnected!
  Session with ID=7959461717222276, disconnected.
  Session with ID=5698032712161101, disconnected.
Any idea how this could be fixed ?
Regards,
Marko