Stomp: how to react on missing heartbeat or broken connection

648 views
Skip to first unread message

César Varona

unread,
Nov 10, 2015, 1:02:03 PM11/10/15
to vert.x

Hi,

I'm using vert.x+stomp in order to build a server platform which will server an arbitrary number of clients. These clients will have a client id and will subscribe to a stomp queue named after this id to receive (individualized) messages from the server. In order to implement communication from client to server they will be posting to another queue also named after this id. This client-to-server queue will have no stomp subscriptions, it will be connected to the vert.x event bus instead.

The server code would be like this:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.impl.ConcurrentHashSet;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.stomp.*;

import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
* @author César Varona
*/
public class StompBasedServer extends AbstractVerticle {

public static final Pattern STOMP_QUEUE_NAME = Pattern.compile( "/queue/stomp/(\\w+)" );

@Override
public void start() throws Exception {

Set<String> clients = new ConcurrentHashSet<>();

StompServerHandler handler = StompServerHandler.create( vertx );
handler.destinationFactory( ( v, name ) -> {

System.out.println( "Create destination " + name );

Matcher m = STOMP_QUEUE_NAME.matcher( name );
boolean stomp = m.matches();
if( stomp ) {
clients.add( name );
String busQueueName = "/queue/bus/" + m.group( 1 );
handler.getOrCreateDestination( busQueueName );

vertx.eventBus().consumer( busQueueName, event -> {
System.out.printf( "%s receives '%s'%n", busQueueName, event.body() );
} );
}

PermittedOptions pos = new PermittedOptions().setAddress( name );
return Destination.bridge(
vertx,
stomp ? new BridgeOptions().addOutboundPermitted( pos ) :
new BridgeOptions().addInboundPermitted( pos )
);
}
);

StompServer srv = StompServer.create( vertx, options() ).handler( handler );
srv.listen( 61613, ar -> {
if( ar.failed() ) {
ar.cause().printStackTrace();
} else {
System.out.println( "Ready to receive STOMP frames" );

ScheduledExecutorService ses = Executors.newSingleThreadScheduledExecutor();
ses.scheduleAtFixedRate(
() -> {
clients.forEach(
client -> {
vertx.eventBus().send( client, String.format( "Dear %s, please note that right now it's %tc", client, System.currentTimeMillis() ) );
}
);
},
2, 5, TimeUnit.SECONDS
);
}
} );
}

private StompServerOptions options() {
return new StompServerOptions().
setHeartbeat(
new JsonObject().
put( "x", 1000 ).
put( "y", 1000 ) );
}

public static void main( String[] args ) {
Utils.verticle( StompBasedServer.class );
}
}

whereas the client code would resemble something like this:

import io.vertx.core.AbstractVerticle;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.NetClientOptions;
import io.vertx.ext.stomp.StompClient;
import io.vertx.ext.stomp.StompClientOptions;

import java.util.UUID;

/**
* @author César Varona
*/
public class StompBasedClient extends AbstractVerticle {

@Override
public void start() throws Exception {
connect();
}

private void connect() {

String clientId = UUID.randomUUID().toString().substring( 0, 8 );

StompClient client = StompClient.create(
vertx,
new StompClientOptions().
setHeartbeat(
new JsonObject().
put( "x", 10000 ).
put( "y", 10000 ) ) );

client.connect(
61613,
"localhost",
vertx.createNetClient( new NetClientOptions().setReconnectAttempts( Integer.MAX_VALUE ) ),
ar -> {
if( ar.succeeded() ) {

System.out.println( "Ready to send STOMP frames" );

ar.result().errorHandler( event -> {
System.out.println( "Error: " + event );
} );

ar.result().subscribe(
"/queue/stomp/" + clientId,
frame -> {
System.out.printf( "Msg from /queue/stomp/%s: '%s'%n", clientId, frame.getBody() );
}
);

vertx.setPeriodic( 5000, l -> {
try {
ar.result().send(
"/queue/bus/" + clientId,
Buffer.buffer( String.format( "This is %s at %tc", clientId, System.currentTimeMillis() ) )
);

} catch( Exception ex ) {
ex.printStackTrace();
vertx.cancelTimer( l );
client.close();
connect();
}
}
);

} else {
ar.cause().printStackTrace();
}
} );
}

public static void main( String[] args ) {
Utils.verticle( new StompBasedClient() );
}
}

(When I test this prototype I run the client in a virtual box and change the host accordingly)

Now, I would like to make both server and client resilient to network failures. By specifying the reconnect attemps in the client options I can be sure that the client will retry to connect the server upon starting. How could I make it to realise either a) a heartbeat response is missing or b) the connection is broken and react accordingly by starting a try-reconnect sequence? I usually test it by disconnecting the network adapter in the virtualized box and I have seen the following piece of code in the StompClientConnectionImpl is involved:

...
if
(pong > 0) {
ponger = client.vertx().setPeriodic(pong, l -> {
long delta = System.nanoTime() - lastServerActivity;
final long deltaInMs = TimeUnit.MILLISECONDS.convert(delta, TimeUnit.NANOSECONDS);
if (deltaInMs > pong * 2) {
log.error("Disconnecting client " + client + " - no server activity detected in the last " + deltaInMs + " ms.");
client.vertx().cancelTimer(ponger);
disconnect();
}
});
}
...


I have no idea as to how could I use this heartbeat mechanism in order to get notified something is going wrong without providing my own implementation. I'm also a bit
surprised that the out-of-the box reaction to a missing heartbeat is to send a disconnect request, but this is secondary. I've searched in the documentation, examples and
source code how to do this, to no avail. Any help will be much appreciated.

As for the server side, I have the same problem: I would like to detect when the connection or the heartbeat to a client has been lost, in order to remove the associated
queue/consumer, but the pong handler is a fragment quite similar to that of the client connection implementation and cannot be customized. This would not be that
important if I could profit from the regexp address feature provided by the stomp bridge, but I don't know how to register a consumer in the event bus for a regexp address
(if anybody can provide any feedback as to how to accomplish this please don't refrain).

Kind regards

Kirk Stork

unread,
Nov 13, 2015, 8:50:47 PM11/13/15
to vert.x
I wonder if the sockjs handler will receive a BridgeEventType.UNREGISTER or a  BridgeEventType.SOCKET_CLOSED event when the connection fails due to lost heartbeat.  If so, the rawMessage in the event might have what you need.

Clement Escoffier

unread,
Nov 16, 2015, 5:25:41 AM11/16/15
to ve...@googlegroups.com
Hi,

On the server and client sides you can configure a closeHandler to be notified when a client/server is disconnected. However, there is no mechanism telling you that it comes from a heartbeat failure. 
It could be interesting to invokes a handler in this case (disconnection following a missing pong message), specially on the client side. Would this be useful for you ?

Clement
--
You received this message because you are subscribed to the Google Groups "vert.x" group.
To unsubscribe from this group and stop receiving emails from it, send an email to vertx+un...@googlegroups.com.
Visit this group at http://groups.google.com/group/vertx.
To view this discussion on the web, visit https://groups.google.com/d/msgid/vertx/11120b2b-4995-40e9-9f52-a4a9fe00a266%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Reply all
Reply to author
Forward
0 new messages