public class Global extends GlobalSettings {
private static final ExecutorService EXECUTOR_SERVICE = Executors.newSingleThreadScheduledExecutor();
@Override
public void onStart(Application application) {
EXECUTOR_SERVICE.submit(() -> {
startNSQConsumer();
});
super.onStart(application);
}
@Override
public void onStop(Application application) {
EXECUTOR_SERVICE.shutdown();
super.onStop(application);
}
private void startNSQConsumer() {
NSQLookup lookup = new DefaultNSQLookup();
Configuration config = Play.application().configuration();
List<String> topics = config.getStringList("TOPICS");
String nsqlookupdIP = config.getString("NSQLOOKUPD_IP");
int nsqlookupdPort = Integer.parseInt(config.getString("NSQLOOKUPD_PORT"));
lookup.addLookupAddress(nsqlookupdIP, nsqlookupdPort);
Logger.info("Using nsqlookup at " + nsqlookupdIP + ":" + nsqlookupdPort);
topics.forEach((topic) -> {
NSQConsumer consumer = new NSQConsumer(lookup, topic, topic + "_channel", (message) -> {
String receivedMessage = new String(message.getMessage());
//Logger.info(receivedMessage); // When I enable or disable this I only see 50% of the messages.
EventBusManager.getEventBus().publish(new MsgEnvelope(topic, receivedMessage)); // broadcast message to all Actors listening on a particular topic
message.finished();
}, new NSQConfig(), new NSQErrorCallback() {
@Override
public void error(NSQException x) {
Logger.error("Cause: " + x.getCause() + ", Message: " + x.getMessage());
}
});
consumer.start();
});
}
}
I wrote an app in play that consumes messages from message queue (In our case, we use NSQ (not Kafka) ) and send those messages to group of Web Socket clients that are listening to it. so I want the consumer to be initialized once for each topic and start running forever until the Application itself terminates. What happening is that when I send 10 messages to a topic I can only see 5 messages coming through but when I run the same exact method "startNSQConsumer()" from the code below in a separate Plain old Java application by itself I can see all the 10 messages. So clearly this has something to do with play and I am not sure what is going on. any ideas?
--
You received this message because you are subscribed to a topic in the Google Groups "Play Framework" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/play-framework/z_O3WgalYp0/unsubscribe.
To unsubscribe from this group and all its topics, send an email to play-framework+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/play-framework/4580ab5b-80eb-4bba-9054-03e95839241f%40googlegroups.com.
Thanks for the reply, I am new to Play framework and Akka as well so I was just trying to see if there is a way where I can call a method only once in the entire application life time. In my case that is startNSQConsumer().I don't see any Camel bindings to NSQ but I like the idea. Also, I wanted to use Actors for modeling this but then again what is that I need to call startNSQConsumer() only once using Actors? For example I don't want it to be called for every WebSocket Connection or request. I can certainly put startNSQConsumer() under PreStart() Method of an Actor but I somehow need to make sure that Actor doesn't get created on every Web socket connection or request which I am not sure how to do it?