Hello Crossbar team,
We decided to make our node in java, everything works fine, till client loses connection to server.
I'we seen the reconnect example in android example but I couldn't recreate this for netty, so I wrote my own reconnecting mechanism but sometimes produces two sessions after connecting.
Here is the snippet of the code:
main.java:
package ledinek.control;
import java.io.IOException;
import ledinek.control.helpers.PropUtils;
import ledinek.control.ws.WebStatusModel;
import ledinek.control.ws.WsConnection;
import org.apache.log4j.Logger;
public class Main {
static Logger logger = Logger.getLogger(Main.class);
public static void main(String[] args) {
logger.info("Entering application.");
try {
PropUtils.load(System.getenv("PROPERTIES"));
} catch (IOException e) {
e.printStackTrace();
}
//BasicConfigurator.configure();
//logger.setLevel(Level.OFF);
WebStatusModel webstatus = new WebStatusModel();
Object wssync = new Object();
String ws_url = PropUtils.properties.getProperty("websocket-url");
String ws_realm = PropUtils.properties.getProperty("websocket-realm");
WsConnection backendComm = new WsConnection(ws_url, ws_realm, webstatus, wssync);
final Thread wsc = new Thread(backendComm);
wsc.start();
}
}
WsConnection.java
package ledinek.control.ws;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import io.crossbar.autobahn.wamp.Client;
import io.crossbar.autobahn.wamp.Session;
import io.crossbar.autobahn.wamp.types.*;
import java.util.concurrent.TimeUnit;
import java.net.URI;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Logger;
public class WsConnection implements Runnable {
private static final Logger LOGGER = Logger.getLogger(WsConnection.class.getName());
private String url;
private String realm;
private CompletableFuture<ExitInfo> connection;
private Session session = null;
private final PublishOptions publishOptions = new PublishOptions(true, false);
private boolean connected = false;
private int retry_attempts = 0;
private int retry_sleep = 0;
private int sleep = 0;
private WebStatusModel webstatus;
private Gson gson = new Gson();
private URI uri;
private Object wssync;
private String prefix = "com.ledinek.control.";
private Client client;
public WsConnection(String url, String realm, WebStatusModel webstatus, Object wssync) {
this.url = url;
this.realm = realm;
this.webstatus = webstatus;
this.wssync = wssync;
}
@Override
public void run() {
while(true) {
try {
// reconnecting
if (!connected){
if (retry_attempts == 0 ){
System.out.println("Connecting...");
} else {
if (retry_sleep > 6){retry_sleep = 1;}
sleep = retry_sleep * retry_sleep;
System.out.println("Connection attempt " + retry_attempts + " sleeping for " + sleep + " seconds");
}
retry_attempts++;
retry_sleep++;
System.out.println("Make new session");
this.session = null;
this.session = new Session();
this.client = null;
this.client = new Client(session, url, realm);
this.session.addOnConnectListener(this::onConnect);
this.session.addOnJoinListener(this::onJoin);
this.session.addOnLeaveListener(this::onLeave);
this.session.addOnDisconnectListener(this::onDisconnect);
this.connection = null;
this.connection = this.client.connect();
}
if (!connected){
Thread.sleep(1000 * retry_sleep);
} else {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/*
* Websocket client
*/
private void onJoin(Session session, SessionDetails details) {
this.session = session;
this.retry_attempts = 0;
this.retry_sleep=1;
// Send to event_log
LOGGER.info("Published restart to event_log");
this.publish("event_log", "error","Control system restarted.");
/*
* RPC registrations
*/
CompletableFuture<Registration> order_to_produce = session.register(prefix + "order_to_produce", this::order_to_produce);
order_to_produce.thenAccept(reg -> LOGGER.info("Registered procedure: " + prefix + "order_to_produce"));
/*
* ADD SERVICE ON BACKEND COMPONENT
*/
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
CompletableFuture<CallResult> add_service = session.call("com.ledinek.backend.add_service",
"control", session.getID());
add_service.thenAccept(callResult ->{
// System.out.println(callResult.results.get(0).getClass());
if (callResult.results.get(0).equals(true)){
LOGGER.info("Service successfully added to list of services");
} else {
LOGGER.warning("Service is already added to list of online services. EXIT!");
}
}).exceptionally(ex -> {
System.out.println(ex.getMessage());
return null;
});
}
private void onConnect(Session session) {
connected = true;
retry_attempts = 1;
LOGGER.info("Session connected, ID=" + session.getID());
}
private void onLeave(Session session, CloseDetails detail) {
connected = false;
LOGGER.info(String.format("Left reason=%s, message=%s", detail.reason, detail.message));
}
private void onDisconnect(Session session, boolean wasClean) {
connected = false;
System.out.println(String.format("Session with ID=%s, disconnected.", session.getID()));
}
public void publish(String topic, String type, String message){
CompletableFuture<Publication> p = session.publish(prefix + topic,
type, message,System.currentTimeMillis() / 1000L, UUID.randomUUID().toString(), publishOptions);
p.whenComplete((publication, throwable) -> {
if (throwable != null) {
// LOGGER.info(String.format("published to %s with message %s ", prefix + topic, message));
LOGGER.warning(String.format(" ERROR - published to %s with message %s - FAILED !", prefix + topic, message));
}
});
}
/*
* RPC FUNCTIONS
* ( you need to register them in function onJoin )
*/
/**
* @apiNote Call URI - com.ledinek.control.order_to_produce
*
* @param args - [ float maxMoistureLimit, float minMoistureLimit ]
* @param details - call details
* @return - Success / Fail
*
* curl -H "Content-Type: application/json" -d '{"procedure": "com.ledinek.control.order_to_produce": [0.12, 0.15]}' localhost:8080/caller
*/
//TODO [ rpc on db_component that calls this function ]
private String order_to_produce(List<LinkedHashMap> args, InvocationDetails details) {
/*
frontend call
const response = yield saveItem('/api/v1/orders/all/check_to_production/', 'POST', a.items.map((item) => item.id));
*/
try {
Order order = null;
synchronized(this.wssync) {
order = this.gson.fromJson(gson.toJson(args.get(0)), Order.class);
System.out.println(order.id);
}
if (order != null) {
System.out.println("Produce order");
}
return "order_to_produce: True";
} catch (Exception e) {
e.printStackTrace();
}
return "order_to_produce: False";
}
}
and terminal output:
Connecting...
Make new session
Init line started
Trying to connect to PLC: 10.10.12.10 Init line finished
Connection attempt 1 sleeping for 1 seconds
Make new session
Apr 13, 2018 11:43:49 AM ledinek.control.ws.WsConnection onConnect
INFO: Session connected, ID=0
Apr 13, 2018 11:43:49 AM ledinek.control.ws.WsConnection onConnect
INFO: Session connected, ID=0
...
Apr 13, 2018 11:44:54 AM io.crossbar.autobahn.wamp.transports.NettyWebSocketClientHandler
INFO: WebSocket Client disconnected!
Apr 13, 2018 11:44:54 AM io.crossbar.autobahn.wamp.transports.NettyWebSocketClientHandler
INFO: WebSocket Client disconnected!
Session with ID=7959461717222276, disconnected.
Session with ID=5698032712161101, disconnected.
Any idea how this could be fixed ?
Regards,
Marko