need help on DeQueueListener

21 views
Skip to first unread message

Ram C

unread,
Oct 15, 2019, 1:49:23 PM10/15/19
to cometd-users
Hi Simon,

I tried creating a listener implementing the ServerSession.DeQueueListener and then add that listener to the server session. But noticed that the listener is never called.

Here is the listener code printing messages in Queue.


public class BayeuxSessionDeQueueListener implements DeQueueListener {
   
    private static final Logger logger = Logger.getLogger(BayeuxSessionDeQueueListener.class.getName());

    @Override
    public void deQueue(ServerSession session, Queue<ServerMessage> queue) {
        logger.info("About to send message to client " + session.toString());
        logger.info("Messages in the queue%n");
        queue.forEach(msg -> {
            logger.info(msg.toString());
        });
    }
}


And the @Service code that adds listener.


@Service
public class MyEventService implements Listener {
   
    private static final Logger logger = Logger.getLogger(MyEventService.class.getName());
   
    @Inject
    BayeuxServer bayeuxServer;

    @Session
    private LocalSession sender;


public void attachDeQueueListener() {
        List<ServerSession> serverSessions = bayeuxServer.getSessions();
        BayeuxSessionDeQueueListener deqlistener = null;
        if (!serverSessions.isEmpty()) {
            deqlistener = new BayeuxSessionDeQueueListener();
            for (ServerSession svrSession: serverSessions) {
                sender.getServerSession().removeListener(deqlistener);
                sender.getServerSession().addListener(deqlistener);
                logger.info("DeQueueListener added");
            }
        }
    }


}


the attachDeQueueListener() has been called from the servlet code that is set to load on start.

  <servlet>
    <servlet-name>initializer</servlet-name>
    <servlet-class>com.example.cometd.Initializer</servlet-class>
    <load-on-startup>2</load-on-startup>
  </servlet>

public class Initializer extends GenericServlet {

    @Override
    public void init() throws ServletException
    {
       
        // Retrieve the CometD service instantiated by AnnotationCometdServlet
        MyEventService service = (MyEventService)getServletContext().getAttribute(MyEventService.class.getName());
       
        // Register the service as a listener of the emitter
        EventReceiver.getListeners().add(service);
        service.attachDeQueueListener();
    }
}
 
Am I missing something? I have tried sender.getServerSession().addListener() call.


thank you!.

Ram

   


Simone Bordet

unread,
Oct 15, 2019, 3:45:12 PM10/15/19
to cometd-users
Hi,

On Tue, Oct 15, 2019 at 7:49 PM Ram C <time...@gmail.com> wrote:
>
> Hi Simon,
>
> I tried creating a listener implementing the ServerSession.DeQueueListener and then add that listener to the server session. But noticed that the listener is never called.

Do you publish messages on channels for which that session is subscribed to?

--
Simone Bordet
---
Finally, no matter how good the architecture and design are,
to deliver bug-free software with optimal performance and reliability,
the implementation technique must be flawless. Victoria Livschitz

Ram C

unread,
Oct 15, 2019, 4:47:31 PM10/15/19
to cometd-users

yes. here is the publish code. publishMsg method gets called when MDB receive a JMS message.


    public Future<Void> publishMsg(String channelName, short msgId, Object msg) {

        logger.fine("publishMsg: " + Thread.currentThread().getId() + ", msgId: " + msgId);
       
        bayeuxServer.createChannelIfAbsent(channelName, new ServerChannel.Initializer()
        {
            public void configureChannel(ConfigurableServerChannel channel)
            {
                channel.setPersistent(true);
                channel.setLazy(false); // Sets the channel to respond in configured maxLazyTimeout period.
            }
        });
       
       
        ObjectMapper mapper = new ObjectMapper();
        EventMessage evMsg = new EventMessage();
        evMsg.setMsg(msg);
        evMsg.setMsgId(msgId);
        Map<String, Object> data = null;
        TypeReference<Map<String, Object>> ref = new TypeReference<Map<String, Object>>() {};
        String jsonData = "";
        try {
            jsonData = mapper.writeValueAsString(evMsg);
            data = mapper.readValue(jsonData, ref);
        } catch (IOException e) {
            e.printStackTrace();
        }

        logger.info("publishMsg: JSON MSG is\n" + jsonData);
       
        // Publish to all subscribers
        ServerChannel channel = bayeuxServer.getChannel(channelName);
        channel.publish(sender, data);
       
        logger.info("publishMsg: message published " + Utils.getEventName(msgId));
       
        return new AsyncResult<Void>(null);
    }


On Tuesday, October 15, 2019 at 3:45:12 PM UTC-4, Simone Bordet wrote:
Hi,

Simone Bordet

unread,
Oct 15, 2019, 4:59:24 PM10/15/19
to cometd-users
Hi,

On Tue, Oct 15, 2019 at 10:47 PM Ram C <time...@gmail.com> wrote:
>
>
> yes. here is the publish code. publishMsg method gets called when MDB receive a JMS message.

My guess is there are no subscribers to that channel.

--
Simone Bordet
----
http://cometd.org
http://webtide.com
Developer advice, training, services and support
from the Jetty & CometD experts.

Ram C

unread,
Oct 15, 2019, 10:52:48 PM10/15/19
to cometd-users
dojo client subscribes to a channel with /event/{id} where id is dynamic room id. And the below publish message creates channel by using createIfAbsent call. so channel is created at runtime and there are subscriber exist. I do have message flows between cometd and dojo client. Just want to see all messages in the queue before it gets published so I try to use the DeQueueListner.

Thanks.

On Tuesday, October 15, 2019 at 4:59:24 PM UTC-4, Simone Bordet wrote:
Hi,

Simone Bordet

unread,
Oct 16, 2019, 2:56:03 AM10/16/19
to cometd-users
Hi,

On Tue, Oct 15, 2019 at 7:49 PM Ram C <time...@gmail.com> wrote:
> public void attachDeQueueListener() {
> List<ServerSession> serverSessions = bayeuxServer.getSessions();
> BayeuxSessionDeQueueListener deqlistener = null;
> if (!serverSessions.isEmpty()) {
> deqlistener = new BayeuxSessionDeQueueListener();
> for (ServerSession svrSession: serverSessions) {
> sender.getServerSession().removeListener(deqlistener);
> sender.getServerSession().addListener(deqlistener);

Here you are only adding the listener to "sender", not to "svrSession".

Ram C

unread,
Oct 16, 2019, 1:21:04 PM10/16/19
to cometd-users
Hi,

yes I fix those two lines but still the listener code not called.

svrSession.addListener(deqlistener);



On Wednesday, October 16, 2019 at 2:56:03 AM UTC-4, Simone Bordet wrote:
Hi,

Simone Bordet

unread,
Oct 16, 2019, 1:37:00 PM10/16/19
to cometd-users
Hi,

On Wed, Oct 16, 2019 at 7:21 PM Ram C <time...@gmail.com> wrote:
>
> Hi,
>
> yes I fix those two lines but still the listener code not called.
>
> svrSession.addListener(deqlistener);

All right.
CometD obviously work since the acknowledgement extension uses the
DeQueueListener and if that would not work we would have tons of test
failures and tons of complaints from users.

You have to pack a reproducible test case that I can run myself that
shows your issue.
If you can reproduce it in an isolated test case, please open an issue
at https://github.com/cometd/cometd/issues and attach there the test
case.
Full code buildable and runnable - not partial snippets of code.

Thanks!

Ram C

unread,
Oct 18, 2019, 12:27:11 PM10/18/19
to cometd-users
Hi,

I debug the application and noticed the flush method never reach takeQueue() -> notifyDeQueue(). it schedules the delivery and returns. (lines 557-560 below).

thank you!


On Wednesday, October 16, 2019 at 1:37:00 PM UTC-4, Simone Bordet wrote:
Hi,

Simone Bordet

unread,
Oct 18, 2019, 1:39:41 PM10/18/19
to cometd-users
Hi,

On Fri, Oct 18, 2019 at 6:27 PM Ram C <time...@gmail.com> wrote:
Hi,

I debug the application and noticed the flush method never reach takeQueue() -> notifyDeQueue(). it schedules the delivery and returns. (lines 557-560 below).

Yes that is expected.
The message is published on the server side, and scheduling the delivery will wake up the /meta/connect that will call deQueue() and deliver the messages.

Ram C

unread,
Oct 21, 2019, 10:25:34 AM10/21/19
to cometd-users
yes, the deQueue() get called. I further debug the app. the session object that passed to takeQueue does not have the listener. Please check the below screen captures.

thanks.



On Friday, October 18, 2019 at 1:39:41 PM UTC-4, Simone Bordet wrote:
Hi,

Simone Bordet

unread,
Oct 21, 2019, 12:07:59 PM10/21/19
to cometd-users
Hi,

On Mon, Oct 21, 2019 at 4:25 PM Ram C <time...@gmail.com> wrote:
>
> yes, the deQueue() get called.

Good.

> I further debug the app. the session object that passed to takeQueue does not have the listener.

You should add it then.

The best place to add listeners to session is from a session listener.
Please see how the acknowledgement extension does it, since it also
adds a DeQueueListener.

Ram C

unread,
Oct 22, 2019, 2:00:38 PM10/22/19
to cometd-users
Hi Simon,

When debugging the app, at line scheduler.schedule() it shows the listener exist on session object. But continue debugging, process controller lands on takeQueue where the session object does not have the listener.

may be I'm doing something wrong.

thank you.


On Monday, October 21, 2019 at 12:07:59 PM UTC-4, Simone Bordet wrote:
Hi,

Simone Bordet

unread,
Oct 22, 2019, 2:12:13 PM10/22/19
to cometd-users
Hi,

On Tue, Oct 22, 2019 at 8:00 PM Ram C <time...@gmail.com> wrote:
> may be I'm doing something wrong.

Pack a reproducible test case that shows the issue.
Debugging and the pauses that involves stopping at breakpoints will
just make things worse because of the various timeouts that will
expire in the meanwhile.
Reply all
Reply to author
Forward
0 new messages