A couple of questions about immutant.messaging request/response

76 views
Skip to first unread message

Anders Rye

unread,
Oct 29, 2015, 12:20:56 PM10/29/15
to Immutant
I'm running Immutant 2.1.0 in a Wildfly 9.0.1.Final container.

First: running request on a HornetQ thread.

I have a listener processing messages from a service, and I was trying to run request against another service within that message handler. The responder never seems to get the request, and the thread hangs waiting for the response.

This example hangs for me:

(let [q1 (queue "Queue1")
      q2
(queue "Queue2")]
 
(respond q2 inc)
 
(listen q1 #(do (println "q1 got" %) (println "q2 returned" @(request q2 %))))
 
(publish q1 1))
q1 got
1
=> nil

From the container log: 

16:08:56,221 INFO  [org.hornetq.core.server] (ServerService Thread Pool -- 15) HQ221003: trying to deploy queue jms.queue.Queue1
16:08:56,221 INFO  [org.jboss.as.messaging] (ServerService Thread Pool -- 15) WFLYMSG0002: Bound messaging object to jndi name java:/jms/queue/Queue1
16:08:56,232 INFO  [org.hornetq.core.server] (ServerService Thread Pool -- 15) HQ221003: trying to deploy queue jms.queue.Queue2
16:08:56,233 INFO  [org.jboss.as.messaging] (ServerService Thread Pool -- 15) WFLYMSG0002: Bound messaging object to jndi name java:/jms/queue/Queue2
16:08:56,246 INFO  [org.projectodd.wunderboss.messaging] (nREPL-worker-0) Starting listener for 'Queue2' concurrency=8
16:08:56,256 INFO  [org.projectodd.wunderboss.messaging] (nREPL-worker-0) Starting listener for 'Queue1' concurrency=8
16:08:56,286 INFO  [org.projectodd.wunderboss.messaging] (Thread-0 (HornetQ-client-global-threads-997942435)) Starting listener for 'Queue2' concurrency=8

Then later, during shutdown:

16:14:59,953 WARN  [org.hornetq.core.client] (ServerService Thread Pool -- 97) HQ212002: Timed out waiting for handler to complete processing

Is this something you are not supposed to do?

If I wrap the request in another future, it works fine:

(let [q1 (queue "Queue1")
      q2
(queue "Queue2")]
 
(respond q2 inc)
 
(listen q1 #(do (println "q1 got" %) (println "q2 returned" @(future @(request q2 %)))))
 
(publish q1 1))
q1 got
1
=> nil
q2 returned
2


Second: passing a local context to request

(with-open [c (context)]
 
(let [q (queue "TestQueue")]
   
(respond q inc)
   
@(request q 1 :context c)))

Throws IllegalArgumentException Listening only accepts a remote context.  org.projectodd.wunderboss.messaging.jms.JMSDestination.listen (JMSDestination.java:62)

I was hoping this would work like it does with publish (in fact the doc says it takes the same options as publish) as I wish to reduce overhead while making a large number of requests.


Anders

Toby Crawley

unread,
Oct 29, 2015, 1:13:12 PM10/29/15
to Anders Rye, Immutant
On Thu, Oct 29, 2015 at 11:51 AM, Anders Rye <ander...@gmail.com> wrote:
>
> I have a listener processing messages from a service, and I was trying to
> run request against another service within that message handler. The
> responder never seems to get the request, and the thread hangs waiting for
> the response.
>
> This example hangs for me:
>
> (let [q1 (queue "Queue1")
> q2 (queue "Queue2")]
> (respond q2 inc)
> (listen q1 #(do (println "q1 got" %) (println "q2 returned" @(request q2
> %))))
> (publish q1 1))
> q1 got 1
> => nil
>

There are two things happening here that, together, cause this to hang:

1) By default, there is a local JMS transaction around the listener's
function - this allows us to roll back the receipt of the message
and any publishes that use the context that is active during the
call.

2) Any publish within the listener's function will use that active
context (primarily for transactionality), and the publish won't
succeed until the context is committed, once the listener function
returns. `request` uses `publish`, so the deref hangs, because the
request message won't be published until the context is committed,
which won't happen until the listener's function returns.

If you need to escape the implicit transaction inside the listener,
you can set the `:mode` of the listener to something other than
`:transacted`:

(listen q1
#(do (println "q1 got" %)
(println "q2 returned" @(request q2 %)))
:mode :auto-ack)

>
> Second: passing a local context to request
>
> (with-open [c (context)]
> (let [q (queue "TestQueue")]
> (respond q inc)
> @(request q 1 :context c)))
>
> Throws IllegalArgumentException Listening only accepts a remote context.
> org.projectodd.wunderboss.messaging.jms.JMSDestination.listen
> (JMSDestination.java:62)

This happens because if there is a context provided to `request`, that
context is reused to create a listener to get the respose to the
request. I'm pretty sure this is a bug - we should only be reusing
that context if it is a *remote* context, which I believe was the
intent for adding this behaviour.

I've filed two issues in JIRA for this [1] to improve the
documentation around listen to clarify the use of a local transaction,
and [2] to only reuse the context in `request` if it's remote,
assuming that we don't have another reason for reusing the context.

[1]: https://issues.jboss.org/browse/IMMUTANT-588
[2]: https://issues.jboss.org/browse/IMMUTANT-589

- Toby

Anders Rye

unread,
Oct 29, 2015, 5:15:05 PM10/29/15
to Immutant
Aah, I see. Thanks for the swift reply!

Anders

Toby Crawley

unread,
Nov 3, 2015, 3:20:40 PM11/3/15
to Anders Rye, Immutant
Anders:

The issue with not being able to pass a :context to request has been
fixed, and is available in incremental build #670 and newer if you
want to give it a try (see http://immutant.org/builds/2x/ for details
on using an incremental build). This will be in the next versioned
release, which I hope to have out in the next couple of weeks.

- Toby
Reply all
Reply to author
Forward
0 new messages