How to reconnect propperly in autobahn-java for non-android

80 views
Skip to first unread message

Marko Štumberger

unread,
Apr 13, 2018, 7:43:29 AM4/13/18
to Autobahn
Hello Crossbar team,

We decided to make our node in java, everything works fine, till client loses connection to server.
I'we seen the reconnect example in android example but I couldn't recreate this for netty, so I wrote my own reconnecting mechanism but sometimes produces two sessions after connecting.

Here is the snippet of the code:
main.java:

        package ledinek.control;

import java.io.IOException;

import ledinek.control.helpers.PropUtils;
import ledinek.control.ws.WebStatusModel;
import ledinek.control.ws.WsConnection;
import org.apache.log4j.Logger;

public class Main {
static Logger logger = Logger.getLogger(Main.class);


public static void main(String[] args) {

logger.info("Entering application.");

try {
PropUtils.load(System.getenv("PROPERTIES"));
} catch (IOException e) {
e.printStackTrace();
}

//BasicConfigurator.configure();
//logger.setLevel(Level.OFF);

WebStatusModel webstatus = new WebStatusModel();
Object wssync = new Object();

String ws_url = PropUtils.properties.getProperty("websocket-url");
String ws_realm = PropUtils.properties.getProperty("websocket-realm");
WsConnection backendComm = new WsConnection(ws_url, ws_realm, webstatus, wssync);
final Thread wsc = new Thread(backendComm);
wsc.start();
}

}


WsConnection.java

    package ledinek.control.ws;
    
    import com.google.gson.Gson;
    import com.google.gson.JsonObject;
    import io.crossbar.autobahn.wamp.Client;
    import io.crossbar.autobahn.wamp.Session;
    import io.crossbar.autobahn.wamp.types.*;
    
    import java.util.concurrent.TimeUnit;
    import java.net.URI;
    import java.util.*;
    import java.util.concurrent.CompletableFuture;
    import java.util.logging.Logger;
    
    public class WsConnection implements Runnable {
    
        private static final Logger LOGGER = Logger.getLogger(WsConnection.class.getName());
        private String url;
        private String realm;
        private CompletableFuture<ExitInfo> connection;
        private Session session = null;
        private final PublishOptions publishOptions = new PublishOptions(true, false);
        private boolean connected = false;
        private int retry_attempts = 0;
        private int retry_sleep = 0;
        private int sleep = 0;
    
        private WebStatusModel webstatus;
        private Gson gson = new Gson();
        private URI uri;
        private Object wssync;
        private String prefix = "com.ledinek.control.";
        private Client client;
    
        public WsConnection(String url, String realm, WebStatusModel webstatus, Object wssync) {
            this.url = url;
            this.realm = realm;
            this.webstatus = webstatus;
            this.wssync = wssync;
    
        }
    
        @Override
        public void run() {
            while(true) {
                try {
                    // reconnecting
                    if (!connected){
                        if (retry_attempts == 0 ){
                            System.out.println("Connecting...");
                        } else {
                            if (retry_sleep > 6){retry_sleep = 1;}
                            sleep = retry_sleep * retry_sleep;
                            System.out.println("Connection attempt " + retry_attempts + " sleeping for " + sleep + " seconds");
                        }
                        retry_attempts++;
                        retry_sleep++;
                        System.out.println("Make new session");
                        this.session = null;
                        this.session = new Session();
                        this.client = null;
                        this.client = new Client(session, url, realm);
                        this.session.addOnConnectListener(this::onConnect);
                        this.session.addOnJoinListener(this::onJoin);
                        this.session.addOnLeaveListener(this::onLeave);
                        this.session.addOnDisconnectListener(this::onDisconnect);
                        this.connection = null;
                        this.connection = this.client.connect();
                    }
                    if (!connected){
                        Thread.sleep(1000 * retry_sleep);
                    } else {
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        /*
         * Websocket client
         */
    
        private void onJoin(Session session, SessionDetails details) {
            this.session = session;
            this.retry_attempts = 0;
            this.retry_sleep=1;
            // Send to event_log
            LOGGER.info("Published restart to event_log");
            this.publish("event_log", "error","Control system restarted.");
    
            /*
             *  RPC registrations
             */
            CompletableFuture<Registration> order_to_produce = session.register(prefix + "order_to_produce", this::order_to_produce);
            order_to_produce.thenAccept(reg -> LOGGER.info("Registered procedure: " + prefix + "order_to_produce"));
    
            /*
             * ADD SERVICE ON BACKEND COMPONENT
             */
    
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            CompletableFuture<CallResult> add_service = session.call("com.ledinek.backend.add_service",
                    "control", session.getID());
            add_service.thenAccept(callResult ->{
    //            System.out.println(callResult.results.get(0).getClass());
                if (callResult.results.get(0).equals(true)){
                    LOGGER.info("Service successfully added to list of services");
                } else {
                    LOGGER.warning("Service is already added to list of online services. EXIT!");
                }
            }).exceptionally(ex -> {
                System.out.println(ex.getMessage());
                return null;
            });
    
    
        }
    
        private void onConnect(Session session) {
            connected = true;
            retry_attempts = 1;
            LOGGER.info("Session connected, ID=" + session.getID());
        }
    
        private void onLeave(Session session, CloseDetails detail) {
            connected = false;
            LOGGER.info(String.format("Left reason=%s, message=%s", detail.reason, detail.message));
        }
    
        private void onDisconnect(Session session, boolean wasClean) {
            connected = false;
            System.out.println(String.format("Session with ID=%s, disconnected.", session.getID()));
        }
    
        public void publish(String topic, String type, String message){
            CompletableFuture<Publication> p = session.publish(prefix + topic,
                    type, message,System.currentTimeMillis() / 1000L, UUID.randomUUID().toString(), publishOptions);
            p.whenComplete((publication, throwable) -> {
                if (throwable != null) {
    
    //                    LOGGER.info(String.format("published to %s with message %s ", prefix + topic, message));
                    LOGGER.warning(String.format(" ERROR - published to %s with message %s - FAILED !", prefix + topic, message));
                }
            });
        }
    
    
        /*
         *  RPC FUNCTIONS
         *  ( you need to register them in function onJoin )
         */
    
        /**
         * @apiNote Call URI - com.ledinek.control.order_to_produce
         *
         * @param args - [ float maxMoistureLimit, float minMoistureLimit ]
         * @param details - call details
         * @return - Success / Fail
         *
         * curl -H "Content-Type: application/json" -d '{"procedure": "com.ledinek.control.order_to_produce": [0.12, 0.15]}' localhost:8080/caller
         */
        //TODO [ rpc on db_component that calls this function ]
        private String order_to_produce(List<LinkedHashMap> args, InvocationDetails details) {
        /*
        frontend call
        const response = yield saveItem('/api/v1/orders/all/check_to_production/', 'POST', a.items.map((item) => item.id));
         */
            try {
                Order order = null;
                synchronized(this.wssync) {
                    order = this.gson.fromJson(gson.toJson(args.get(0)), Order.class);
                    System.out.println(order.id);
                }
                if (order != null) {
                    System.out.println("Produce order");
                }
                return "order_to_produce: True";
            } catch (Exception e) {
                e.printStackTrace();
            }
            return "order_to_produce: False";
        }
    
    }


and terminal output:

    Connecting...
    Make new session
    Init line started
    Trying to connect to PLC: 10.10.12.10 Init line finished
    Connection attempt 1 sleeping for 1 seconds
    Make new session
    Apr 13, 2018 11:43:49 AM ledinek.control.ws.WsConnection onConnect
    INFO: Session connected, ID=0
    Apr 13, 2018 11:43:49 AM ledinek.control.ws.WsConnection onConnect
    INFO: Session connected, ID=0
    ...
    Apr 13, 2018 11:44:54 AM io.crossbar.autobahn.wamp.transports.NettyWebSocketClientHandler
    INFO: WebSocket Client disconnected!
    Apr 13, 2018 11:44:54 AM io.crossbar.autobahn.wamp.transports.NettyWebSocketClientHandler
    INFO: WebSocket Client disconnected!
    Session with ID=7959461717222276, disconnected.
    Session with ID=5698032712161101, disconnected.


Any idea how this could be fixed ?

Regards,
Marko

Tobias Oberstein

unread,
Apr 13, 2018, 8:53:07 AM4/13/18
to autob...@googlegroups.com, Marko Štumberger
Hi,

in your while (true) loop, you need to actually wait for the exit future
returned by client.connect(). see here:

https://github.com/crossbario/autobahn-java/blob/master/demo-gallery/src/main/java/io/crossbar/autobahn/demogallery/netty/CIService.java#L56

Remember, Autobahn is all async land;) Hope this gets you on the right
track ..

Cheers,
/Tobias

Marko Štumberger

unread,
Apr 13, 2018, 1:41:00 PM4/13/18
to Autobahn
Hello Tobias,

thank you for pointing me to the right direction!

I will keep that in mind :)

  
Dne petek, 13. april 2018 14.53.07 UTC+2 je oseba Tobias Oberstein napisala:
Reply all
Reply to author
Forward
0 new messages