Hi Yosvany,
Just another CometD user here. But the app I'm working on is very heavy on Spring and Spring Boot. As far as I have found there is no automatic Spring Bean to CometD @Service Injection. You have to write a bit of glue. Here is how I do it: I'm including way more than you strictly need. I can't guarantee that there isn't a much better approach. I've meant to post my approach for some time to find out. But I kept telling myself I need to build an example that I can share. My app can not be shared. And frankly it's too big to be useful. So I cut and pasted and edited parts of my app.
I need to give back to this community so I'll wing it for you. Project committers, if I am giving bad advice, please say so.
Here is how I bridge between the SpringBoot/Spring Bean World and the CometD @Service world. Again, you may not need all of what I'm doing but I have extensions that need access to the Bayeux Server. So In essence I have Spring Beans that need to know about the BayeuxServerImpl underneath and I need the Bayeux server involved in the injection of Spring Beans into Cometd @Service beans. The @Service stereotype exists in both worlds so I'm careful to be specific.
Hope this isn't too much too fast:-) But here we go . . .
I have a boot application called CometDHubApplcation:
@SpringBootApplication(exclude = {SecurityAutoConfiguration.class})
@EnableWebSecurity
public class CometDHubApplication implements ServletContextInitializer {
private static Logger log = LoggerFactory.getLogger(CometDHubApplication.class);
// This get built under Spring control
@Autowired
BayeuxServerImpl bayeux;
/**
* To run from Maven:
* mvn spring-boot:run -Dspring-boot.run.profiles=local
*
* @param args
*/
public static void main(String[] args) {
log.info("Running {}", CometDHubApplication.class.getName());
SpringApplication.run(CometDHubApplication.class, args);
}
/*
* (non-Javadoc)
*
* @see
* org.springframework.boot.web.servlet.ServletContextInitializer#onStartup(
* javax.servlet.ServletContext)
*/
@Override
public void onStartup(ServletContext servletContext) {
/*
* This cross referencing between bayeux servlet and the servletContext
* are imperative. Don't remove.
*
* NOTE that it occurs after the bayeux is constructed. So bayeux is not
* fully initialized or running yet.
*/
/*
* ServletContext meet Bayeux. Bayeux meet servletContext.
*/
servletContext.setAttribute(BayeuxServer.ATTRIBUTE, bayeux);
/*
* This is required in order to locate Cometd Annotated "@Service"
* service instances. These are not the Spring Service stereotypes.
*
*/
bayeux.setOption(ServletContext.class.getName(), servletContext);
}
}
My application is quite involved so again I'm only including the important parts: My primary Boot configuration file is CometDHubConfiguration. I layer my config so there are others.
@Configuration
/*
* TODO Need to test scan of service to see if it has value. For now using
* Cometd scanning.
*/
@ComponentScan({"things to scan"})
public class CometDHubConfiguration {
private static Logger log = LoggerFactory.getLogger(CometDHubConfiguration.class);
@Autowired
/* Used for getting secrets only */
private Environment env;
@Value("${ws.cometdURLMapping:/cometd/*}")
private String bayeuxMapping;
// Some of our internal stuff goes here.
. . .
/**
* Factory for Jetty.
*
* @return
*/
@Bean
public ConfigurableServletWebServerFactory webServerFactory() {
JettyServletWebServerFactory servletFactory = new JettyServletWebServerFactory();
log.info("Returning JettyServletWebServerFactory");
return servletFactory;
}
/**
* Cometd servlet registration and configuration
*
* Note that this servlet should always be loaded first.
*
* @return
*/
@Bean(name = "cometd")
public ServletRegistrationBean<HttpServlet> cometdServletReg() {
ServletRegistrationBean<HttpServlet> regBean = new ServletRegistrationBean<>(annotationCometDServlet(),
bayeuxMapping);
/*
* This is somehow defaulted but I felt it best to be explicit. As we
* use it to locate the MyApp cometd servlet.
*/
regBean.setName("MyAppAnnotationCometDServlet");
/*
* TODO *Still* bothered by the fact I have to list Services here. This
* technique is used in all the examples I saw. So MyApp
* routing capability now counts on it as well. so be aware of that
* should you find a better way.
*
*
Hope this doesn confuse but I left this here to help understand why I was doing things the way I did. *
* To use the MyApp framework in a more general way, the
* "services" (essentially specifically channel configuration )
* need to be separate from the hub. Yet they are dependent on the hub
* or at least the special "routing and auth" behaviors implemented
* there. Even if there is another artifact between them. So by
* definition we can't reference the services classes directly in
* the lib module. So we make the hub a straight jar and have a boot app
* with the services such that all the references can be resolved later.
*
* I haven't included real stuff her but I hope the names help you understand.
* This init parameter is used in MyAppAnnotationCometDServlet below that extends CometD's annotation servlet.
*/
regBean.addInitParameter(MyAppCometdConstants.COMETD_SERVICES_INIT_PARAMETER_KEY,
HelloService.class.getName() + ", " + ChatService.class.getName() + ", " + EchoRPC.class.getName()
+ ", " + Monitor.class.getName() + ", " + MyAppServiceOne.class.getName());
regBean.addInitParameter("ws.cometdURLMapping", bayeuxMapping);
regBean.setAsyncSupported(true);
regBean.setLoadOnStartup(1);
return regBean;
}
/**
*
* MyAppAnnotationCometDServlet construction.
*
* In order for proper "Spring Framework" wiring to occur, servlets must be
* constructed within the context of the bean factory. This means through
* scanning or Bean annotation. To programmatically configure(vs web.xml)
* the servlet itself you have to use a ServletRegistrationBean. We felt
* this was the most straight forward way to do it in the context of a potentially
* mult-servlet web application.
*
* For services labeled with CometD's Service annotation the
* MyAppAnnotationCometDServlet uses MyAppInjectables created below to inject Beans into
* those services via the standard Inject annotation used by CometD @Service annotated services.
*
* @return
*/
@Bean()
public AnnotationCometDServlet annotationCometDServlet() {
return new MyAppAnnotationCometDServlet();
}
/**
* Born in the Bayuex!
**/
@Bean
public BayeuxServerImpl bayeuxServerImpl() {
log.info("Creating BayeuxServerImpl");
BayeuxServerImpl bayeux = new BayeuxServerImpl();
bayeux.setTransports(new WebSocketTransport(bayeux), new AsyncJSONTransport(bayeux),
new JSONPTransport(bayeux));
bayeux.setAllowedTransports("websocket", "long-polling", "callback-polling");
bayeux.setOption("ws.cometdURLMapping", bayeuxMapping);
/*
* Add extensions
*/
bayeux.addExtension(new TimesyncExtension());
bayeux.addExtension(new AcknowledgedMessagesExtension());
bayeux.addExtension(new TimestampExtension("HH:mm:ss.SSS"));
. . . More of our custom stuff.
This is more local customization stuff that I left in place because should you need it, the email referenced in the ticket may help.
/*
* Note: Per Simone Bordet Extensions can't support this properly at the moment. He
* asked me to file a bug which is #878
*
* He gave me a work around for now. See
* FilteringBroadcastToPublisherMessageListener
*/
Note that I'm not providing the methods constructing my apps internal capabilities.
// bayeux.addExtension(new ReturnToSenderExtensionOutgoing());
// bayeux.addExtension(new ReturnToSenderExtensionSend());
FilteringBroadcastToPublisherSessionListener filteringBroadcastToPublisherSessionListener = filteringBroadcastToPublisherSessionListener();
bayeux.addListener(filteringBroadcastToPublisherSessionListener);
MyAppMsgReqRespRoutingSessionListener myAppMsgReqRespRoutingSessionListener = myAppMsgReqRespRoutingSessionListener();
Note that my listener now has a reference to the Bayeux Server.
myAppMsgReqRespRoutingSessionListener.setBayeux(bayeux);
Now Bayuex will know about my listener. Circular reference but lives for the life of the app.
bayeux.addListener(myAppMsgReqRespRoutingSessionListener);
* * * End custom stuff
/*
* Add our custom security.
* Not providing custom security method called here
*/
bayeux.setSecurityPolicy(appAuthenticator());
/*
* Clamp down the security. By default we authorize NOTHING.
*/
// Deny unless granted
bayeux.createChannelIfAbsent("/**",
(ServerChannel.Initializer) channel -> channel.addAuthorizer(GrantAuthorizer.GRANT_NONE));
/*
* We *will* allow anybody to handshake. The handshake is authenticated
* by @see AppAuthenticator which delegates to individual authenticators
* per "app" basis which is basically a application in the MyApp family of
* applications.
*/
bayeux.createChannelIfAbsent(ServerChannel.META_HANDSHAKE,
(ServerChannel.Initializer) channel -> channel.addAuthorizer(GrantAuthorizer.GRANT_PUBLISH));
return bayeux;
}
* * * End of Config Class.
/**
* Note that @see MyAppInjectables takes Object... aka Object[] so you can
* inject anything you want and be sure to make them beans if you need deeply wired Spring
* objects to be injected into cometd services.
*
* @return
*/
@Bean()
public MyAppInjectables myAppInjectables() {
// I inject these 2 but they ahve other things wired into them. I prefer to keep this boundary small.
MyAppInjectables myAppInjectables = new MyAppInjectables(myAppBaseChannelAuthorizer(), myAppServiceUtil());
return myAppInjectables;
}
@Bean()
public MyAppBaseChannelAuthorizer myAppBaseChannelAuthorizer() {
MyAppBaseChannelAuthorizer myAppBaseChannelAuthorizer = new MyAppBaseChannelAuthorizer();
// Note that anywhere I call for the construction of the bayeuxServerImpl Spring will ensure it is a singleton because it's annotated as a Bean and all the deep wiring occurs.
myAppBaseChannelAuthorizer.setBayeux(bayeuxServerImpl());
return myAppBaseChannelAuthorizer;
}
@Bean()
public MyAppServiceUtil myAppServiceUtil() {
return new MyAppServiceUtil();
}
*** you don't need this Unless you want to have a separate non cometd servlet brought into the application that is Bayeux aware
/**
* InternalServlet servlet registration and configuration
*
* Note that this servlet should not be loaded before cometd.
*
* @return
*/
@Bean(name = "internal")
public ServletRegistrationBean<HttpServlet> internalServletReg() {
ServletRegistrationBean<HttpServlet> regBean = new ServletRegistrationBean<>(internalCometDServlet(),
"/internal");
regBean.setAsyncSupported(true);
regBean.setLoadOnStartup(2);
return regBean;
}
/**
*
* InternalCometDServlet construction.
*
* In order for proper spring wiring to occur, servlets must be constructed
* within the context of the bean factory. This means through scanning or
* Bean annotation. To programmatically configure(vs web.xml) the servlet
* itself you have to use a ServletRegistrationBean. We felt this was the
* most straight forward way to do it in the context of a potentially mult-servlet web
* application in Boot style
*
* @return
*/
@Bean()
public InternalCometDServlet internalCometDServlet() {
return new InternalCometDServlet();
}
*** end you do not need this }
/**
* MyApp wrapper for AnnotationCometDServlet that adds injectables from the Spring
* bean factory.
* Note that this extends CometD class
*/
public class MyAppAnnotationCometDServlet extends AnnotationCometDServlet {
private static final Logger log = LoggerFactory.getLogger(MyAppAnnotationCometDServlet.class);
private static final long serialVersionUID = 1L;
// Spring fills these in for us.
@Autowired
BayeuxServerImpl bayeux;
@Autowired
private MyAppInjectables injectables;
private ServletContext servletContext;
// Don't need but I enforce Methods that all our services must implement but this could be marker interface so I left it
private Set<MyAppReqRespService> myAppReqRespServices = new HashSet<>();
/*
* !!!!!!! important
* This is the last hook in the servlet api before the servlet container
* starts the servlet. This is the only place during startup that we can get
* at the CometD Services instantiated by the BayeuxServerImpl. the
* super.init() call insures it. Previously I had this lazy detection in the
* MyAppMsgReqRespRoutingMsgListener which had to do a lazy check on every
* onMessage call. Now anything with a reference to the BayeuxServerImpl can
* get the myAppReqRespServices set once the servlet is up and running.
*
* (non-Javadoc)
*
* @see org.cometd.annotation.AnnotationCometDServlet#init()
*/
@Override
public void init() throws ServletException {
super.init();
if (log.isDebugEnabled()) {
log.debug("super init has completed");
}
servletContext = getServletContext();
initReqRespSvcs();
bayeux.setOption(MyAppCometdConstants.REQ_RESP_SERVICES_BAYEUX_OPTION_KEY, myAppReqRespServices);
if (log.isDebugEnabled()) {
log.debug("Found {} MyAppReqRespService implementations.", myAppReqRespServices.size());
}
}
/**
* Don't need but I enforce Methods that all our services must implement but could be marker interface so I left it
* Accessor.
*
* Get the Services found that implement the MyAppReqRespService interface
*
* @return
*/
public final Set<MyAppReqRespService> getMyAppReqRespServices() {
return myAppReqRespServices;
}
/*
* This method is key for allowing stuff to be injected from Spring Beans
* factory using the Cometd injection annotations. A little bit of a hack but it
* provides the bridge between the 2 injection mechanisms. Important so you
* can run in spring boot.
*
* (non-Javadoc)
*
* @see org.cometd.annotation.AnnotationCometDServlet#
* newServerAnnotationProcessor(org.cometd.bayeux.server.BayeuxServer)
*/
@Override
protected ServerAnnotationProcessor newServerAnnotationProcessor(BayeuxServer bayeuxServer) {
return new ServerAnnotationProcessor(bayeuxServer, injectables.getInjectables());
}
protected final void initReqRespSvcs() {
/*
* The application sets this. In Configuration look for something
* likeServletRegistrationBean<HttpServlet> cometdServletReg()
*/
ServletRegistration servletReg = servletContext.getServletRegistration("myAppAnnotationCometDServlet");
String commaSepServices = servletReg.getInitParameter(MyAppCometdConstants.COMETD_SERVICES_INIT_PARAMETER_KEY);
myAppReqRespServices = Arrays.stream(commaSepServices.split(","))
.map(this::svcNmToMyAppReqRespService)
.filter(svc -> {
return svc != null;
})
.collect(Collectors.toSet());
}
// Prolly don't need this either.
private final MyAppReqRespService svcNmToMyAppReqRespService(String fullName) {
Object svc = servletContext.getAttribute(fullName.trim());
if (svc != null && svc instanceof MyAppReqRespService) {
return (MyAppReqRespService) svc;
}
return null;
}
}
/**
* Simple holder class for multiple classes that can/should be injected into the
* services. They are passed to ServerAnnotationProcessor by the
* MyAppAnnotationCometDServlet;
*/
public class MyAppInjectables {
private Object[] injectables = new Object[0];
public MyAppInjectables(Object... injectables) {
this.injectables = injectables;
}
/**
* @return
*/
public Object[] getInjectables() {
return injectables;
}
}
* * * Not required see above
public interface MyAppReqRespService {
. . . My app required methods.
}
* * * All my services extend this which has more to do with a meta protocol I created vs a real need so I hope this doesn't cause brain pain. It's included here because all this is injected by CometD not Spring. So if you don't have a base class these would be in your service class. And you could inject your DAOs into your service class by adding them to MyAppInjectables.
public abstract class MyAppServiceBase implements MyAppReqRespService {
private static final Logger log = LoggerFactory.getLogger(MyAppServiceBase.class);
@Inject
protected BayeuxServer bayeux;
@Inject
protected MyAppBaseChannelAuthorizer chanAuth;
@Session
protected LocalSession myLocalSession;
@Session
protected ServerSession myServerSession;
. . . Custom stuff omitted
. . . this could also me ommited
/**
* left here for learning exercise :-) Normal CometD services wouldn't do this:-) This is a heart beat message from the server
* For debugging mostly but could also be used for periodic server initiated
* messaging.
*
* Will only register one MessageMaker per channel. So if you have 100
* subscribers this basically returns.
*
* @param channelName
* @param message
*/
public void scheduleSendToAllServerSessionsOnChannel(String channelName, MessageMaker message) {
if (backgroundExecutor == null) {
synchronized (this) {
this.backgroundExecutor = Executors.newScheduledThreadPool(1);
}
}
if (!schedChannels.containsKey(channelName)) {
/*
* We don't know about the channel so schedule a pinger.
*/
ScheduledFuture<?> task = backgroundExecutor.scheduleAtFixedRate(() -> {
ServerChannel svrChan = bayeux.getChannel(channelName);
if (svrChan != null) {
if (log.isDebugEnabled()) {
log.debug("Channel {} exists; sending to all subscribers.", channelName);
}
ClientSessionChannel clntChan = myLocalSession.getChannel(channelName);
clntChan.publish(message.makeMessage());
} else {
if (log.isDebugEnabled()) {
log.debug("Channel {} doesn't exist; can't send to subscribers.", channelName);
}
}
}, 15, 60, TimeUnit.SECONDS);
schedChannels.put(channelName, task);
}
}
/**
* Unschedule sends.
*
* @param channelName
*/
public void unscheduleSendToAllServerSessionsOnChannel(String channelName) {
if (schedChannels.containsKey(channelName)) {
ScheduledFuture<?> task = schedChannels.get(channelName);
task.cancel(true);
schedChannels.remove(channelName);
}
}
}
The actual CometD Service Finally!!!!! Most of which is debug. Because for the most part Services are configure and forget unless you start extending things or doing RPC stuff.
@Service("myAppSvcOne")
public class MyAppServiceOne extends MyAppServiceBase implements BayeuxServer.ChannelListener {
private static final Logger log = LoggerFactory.getLogger(TinSyncService.class);
public static final String SVC_ONE_ROOT_CHANNEL = "/myapp/svcone/";
@Inject
private MyAppServiceUtil myAppServiceUtil;
public MyAppServiceOne() {
if (log.isDebugEnabled()) {
log.debug("MyAppServiceOne is being built.");
}
}
/*
* Note Parameter/template channels do not seem to be supported on Configure
*/
@Configure(SVC_ONE_ROOT_CHANNEL + "**")
public void configureSvcOnec(ConfigurableServerChannel channel) {
log.info("Configuring {} ", channel.getChannelId());
/* svc one channels shall not publish to the original sender. This is all custom stuff. */
// note using injected util class.
myAppServiceUtil.disableBroadcastToPublisherViaListener(channel.getChannelId());
. . . more stuff working with filters.
/*
* some custom auth stuff we have
*/
AppAuthorizations appAuths = AppAuthorizations.builder(channel.getChannelId())
.addPublishers("appone", "apptwo")
.addSubscribers("appone", "apptwo", "appthree")
.build();
myAppBaseChannelAuthorizer.addChannelAppAuthorizations(appAuths);
// Normal configure stuff
DataFilterMessageListener filter = new DataFilterMessageListener(new NoMarkupFilter());
channel.setPersistent(true);
channel.addListener(filter);
channel.setLazy(false);
channel.addAuthorizer(chanAuth);
bayeux.addListener(this);
}
@Listener(SVC_ONE_ROOT_CHANNEL + "{user}")
public void monitorTinSync(ServerSession sendersSvrSession, ServerMessage message, @Param("user") String user) {
if (log.isDebugEnabled()) {
log.debug("Received data: {} on {} and ext: {} from user {}", message.getData(), message.getChannel(),
message.getExt(), user);
}
// Never do this live:-) unless you like slow stuff
if (log.isDebugEnabled()) {
sendersSvrSession.getSubscriptions()
.stream()
.forEach(svrChan -> {
log.debug("Channel {}:", svrChan.getChannelId()
.getId());
svrChan.getSubscribers()
.stream()
.forEach(svrSess -> {
log.debug("\tSubscriber {}:", svrSess.getId());
});
});
}
}
/*
* not needed
* (non-Javadoc)
*
* @see org.cometd.bayeux.server.ConfigurableServerChannel.Initializer#
* configureChannel(org.cometd.bayeux.server.ConfigurableServerChannel)
*/
@Override
public void configureChannel(ConfigurableServerChannel channel) {
// Handled in @Configure annotated method above.
}
/*
* More debug stuff to help learn things.
* (non-Javadoc)
*
* @see
* org.cometd.bayeux.server.BayeuxServer.ChannelListener#channelAdded(org.
* cometd.bayeux.server.ServerChannel)
*/
@Override
public void channelAdded(ServerChannel channel) {
if (channel.isWild()) {
return;
}
// this schedules those pings in the base class. Again learning stuff:-)
if (channel.getId()
.startsWith(SVC_ONE_ROOT_CHANNEL)) {
this.scheduleSendToAllServerSessionsOnChannel(channel.getId(),
new PingMessageMaker("Ping from TinSync Service."));
}
}
/*
* (non-Javadoc)
*
* @see
* org.cometd.bayeux.server.BayeuxServer.ChannelListener#channelRemoved(java
* .lang.String)
*/
@Override
public void channelRemoved(String channelId) {
if (new ChannelId(channelId).isWild()) {
return;
}
// this schedules those pings in the base class. Again learning stuff:-)
if (channelId.startsWith(SVC_ONE_ROOT_CHANNEL)) {
log.debug("Channel {} doesn't exist; removing pinger.", channelId);
this.unscheduleSendToAllServerSessionsOnChannel(channelId);
}
}
}
*** separate servlet to live in your app if you need that sort of crazy thing:-)
/**
* This servlet demonstrates having a separate *non* comet *Service* annotated
* servlet with direct access to the BayeuxServer implementation.
*
* I(GAT) believe we don't need it in lieu of the CometD Service annotations.
* But I'm leaving it here in case we need lower level access to the Bayeux
* Server.
*
*/
@Component
public class InternalCometDServlet extends HttpServlet {
private static final long serialVersionUID = 1L;
private static final Logger logger = LoggerFactory.getLogger(InternalCometDServlet.class);
@Override
public void init() throws ServletException {
logger.info("Initializing {}", InternalCometDServlet.class.toString());
super.init();
final BayeuxServerImpl bayeux = (BayeuxServerImpl) getServletContext().getAttribute(BayeuxServer.ATTRIBUTE);
if (bayeux == null) {
throw new UnavailableException("No BayeuxServer!");
}
if (logger.isDebugEnabled()) {
logger.debug("Creating /foo/bar/baz");
}
bayeux.createChannelIfAbsent("/foo/bar/baz", new ConfigurableServerChannel.Initializer.Persistent());
if (logger.isDebugEnabled()) {
logger.debug(bayeux.dump());
}
}
}