activemq/camel: accumulating advisory queue connections

449 views
Skip to first unread message

Matthias L. Jugel

unread,
Sep 12, 2010, 8:10:52 AM9/12/10
to Akka User List
akka 0.10, activemq 5.4.0, camel-jms 2.4.0

I am not sure this is the right place to ask this, but I do not seem
to have any influence on these connections and I am using akka with
the camel add-on.

The systems structure is as follows.

I am querying a database through activemq by sending messages via
activemq to a query responder via a queue. The messages are sent using
a pool of query actors:

actor = pool.borrowObject
for{result: Any <- (actor !! query)} yield result
result.match { ... do something }
pool.returnObject(a)

From my understanding I create a pool of actors that I take out, send
a message, wait for reply or timeout and return too the pool. This
works well until I get "Too many open files" errors at the activemq
broker side. I have turned on the web console to look what happens and
as soon as I start using my query actors I am accumulating "advisory"
connections from between the host that sends the queries and the
activemq server. These connections are persistent and have one single
message enqueued, but never dequeued.

For some reason, even though I do re-use the actors, new queues are
created for these advisory messages. They look like this:

ActiveMQ.Advisory.Consumer.Queue.ID:champloo-59990-1284290690986-0:1342:1

Do I have something wrong in my activemq configuration (i use the
default configuration right now). What I don't understand that each
request creates a new advisory temporary queue. I do not do much after
returning the actor to the pool unless it had an error, but from my
logs this does not seem to be the case.

This is the pool configuration:

private val actorPool = new TSoftReferenceObjectPool[ActorRef](new
TPoolableObjectFactory[ActorRef] {
def passivateObject(a: ActorRef) = {}
def activateObject(a: ActorRef) = if (!a.isRunning && !
a.isBeingRestarted) a.start
def validateObject(a: ActorRef) = !a.isShutdown
def destroyObject(a: ActorRef) = if (!a.isShutdown) a.stop
def makeObject = Actor.actorOf[Q]
})

Any ideas?

Leo.

Martin Krasser

unread,
Sep 12, 2010, 9:34:34 AM9/12/10
to akka...@googlegroups.com
Try to turn off advisory messages as explained in
http://activemq.apache.org/advisory-message.html (section 'Disabling
advisory messages').

> def activateObject(a: ActorRef) = if (!a.isRunning&& !


> a.isBeingRestarted) a.start
> def validateObject(a: ActorRef) = !a.isShutdown
> def destroyObject(a: ActorRef) = if (!a.isShutdown) a.stop
> def makeObject = Actor.actorOf[Q]
> })
>
> Any ideas?
>
> Leo.
>
>


--
Martin Krasser

blog: http://krasserm.blogspot.com
code: http://github.com/krasserm
twitter: http://twitter.com/mrt1nz

Matthias L. Jugel

unread,
Sep 12, 2010, 10:21:42 AM9/12/10
to Akka User List
Before anyone digs deeper into this I am investigating a problem in
the creation of actors. Actually the pool may not be functioning as
expected. Also, turning off advisories led to communication errors as
the temp queues to replies where gone when the the answer was sent.

Leo.

On Sep 12, 3:34 pm, Martin Krasser <krass...@googlemail.com> wrote:
> Try to turn off advisory messages as explained inhttp://activemq.apache.org/advisory-message.html(section 'Disabling

Matthias L. Jugel

unread,
Sep 13, 2010, 3:00:26 AM9/13/10
to Akka User List
I fell prey to the non-singletoness of jersey. Now that the pool works
it works well. I am going to test it now with the many uncached
requests from our main application.

Thanks,

Leo.

On Sep 12, 4:21 pm, "Matthias L. Jugel" <matthias.ju...@gmail.com>
wrote:
Reply all
Reply to author
Forward
0 new messages