Issue with Camel HTTP producer when HTTP server not available

1,049 views
Skip to first unread message

Raymond Roestenburg

unread,
Oct 1, 2010, 7:40:15 AM10/1/10
to Akka User List
Hi HAkkers,

I'm on Akka (Master), specifically commit
2c522673c114aba985afaa3d56a0b61a1b593229
I think I see an error in the Camel HTTP Producer. We are testing what
happens if the HTTP server we are sending data to is not available or
sends back errors.

The following happens when the HTTP server we are sending to is not
available:

(I removed some of the specific server and urls and the message in the
stacktrace since its client specific)

2623 [qtp13198090-12] WARN org.eclipse.jetty.util.log - CONNECTION
FAILED JettyContentExchange@1253877=POST//[server-name-removed]:8282/
[url-removed]
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:574)
at org.eclipse.jetty.io.nio.SelectorManager
$SelectSet.doSelect(SelectorManager.java:684)
at
org.eclipse.jetty.io.nio.SelectorManager.doSelect(SelectorManager.java:
195)
at
org.eclipse.jetty.client.SelectConnector.run(SelectConnector.java:180)
at org.eclipse.jetty.util.thread.QueuedThreadPool
$2.run(QueuedThreadPool.java:436)
at java.lang.Thread.run(Thread.java:619)
2648 [akka:event-driven:dispatcher:global-3] ERROR
se.scalablesolutions.akka.actor.Actor$ - Exception when invoking
actor [Actor[actorname removed]
with message
[FailureResult(Failure(org.apache.camel.CamelExchangeException:
JettyClient failed cause by: Connection refused. Exchange[Message:
[message removed]
Caused by: [java.net.ConnectException - Connection refused],Map()))]
scala.MatchError:
FailureResult(Failure(org.apache.camel.CamelExchangeException:
JettyClient failed cause by: Connection refused. Exchange[Message:
[Message removed]
Caused by: [java.net.ConnectException - Connection refused],Map()))
at se.scalablesolutions.akka.actor.Actor$$anonfun
$2.apply(Actor.scala:465)
at se.scalablesolutions.akka.actor.Actor$$anonfun
$2.apply(Actor.scala:465)
at se.scalablesolutions.akka.actor.Actor$class.apply(Actor.scala:
461)
at [apply at my Actor, removed reference]

Looks like I'm getting a match error on FailureResult, but that class
seems to be private, so I guess I cant match on it? the Failure case
class is not private (i was expecting to get that one).

// from code in akka source
private[camel] case class FailureResult(failure: Failure)

Looks like it's here in the async handling (from akka source):

def done(doneSync: Boolean): Unit = {
(doneSync, exchange.isFailed) match {
case (true, true) =>
dispatchSync(exchange.toFailureMessage(cmsg.headers(headersToCopy)))
case (true, false) =>
dispatchSync(exchange.toResponseMessage(cmsg.headers(headersToCopy)))
case (false, true) =>
dispatchAsync(FailureResult(exchange.toFailureMessage(cmsg.headers(headersToCopy))))
case (false, false) =>
dispatchAsync(MessageResult(exchange.toResponseMessage(cmsg.headers(headersToCopy))))
}
}


Am I doing something wrong or is this a bug?
This is how I try to get the response:

override protected def receiveAfterProduce = {
case msg: Message => {
log.info("########## response = %s", msg.bodyAs[String])
val headers = msg.headers
for ((key, value) <- headers) {
log.info("key = %s, value=%s",key,value)
}
}
case f: Failure => {
log.info("########## failure = %s",f.cause.getMessage)
val headers = f.headers
for ((key, value) <- headers) {
log.info("key = %s, value=%s",key,value)
}
}
case _ => log.info("######## Unexpected message");
}

Martin Krasser

unread,
Oct 1, 2010, 8:17:20 AM10/1/10
to akka...@googlegroups.com
Hi Raymond,

just tried this locally:

class TempProducer extends Actor with Producer {
def endpointUri = "http://localhost:8765/blah" // also tried
jetty:http://localhost:8765/blah

override def receiveAfterProduce = {
case message: Message => println("received message")
case failure: Failure => println("received failure")
}
}

Usage:

CamelServiceManager.startCamelService
val producer = actorOf[TempProducer].start
producer ! "blah"

This prints

"received failure"

to stdout as expected (failure cause was a 'connection refused'). Can
you share your producer actor code with me so that I can reproduce your
problem?

Thanks,
Martin

Am 01.10.10 13:40, schrieb Raymond Roestenburg:


--
Martin Krasser

blog: http://krasserm.blogspot.com
code: http://github.com/krasserm
twitter: http://twitter.com/mrt1nz

Raymond Roestenburg

unread,
Oct 1, 2010, 10:02:20 AM10/1/10
to Akka User List
Hi Martin,

it's code for a client so that is a bit tricky.

We got it to work by not using the "jetty:" prefix.

When we do the below, it doesn't work with the jetty url (the receive
method is basically the only thing different from your example):

@Test def test {
CamelServiceManager.startCamelService
val producer = actorOf[TempProducer].start

producer ! "blah"
}

class TempProducer() extends Actor with Producer {
def endpointUri = "jetty:http://localhost:8765/blah" // Doesn't work
//def endpointUri = "http://localhost:8765/blah" // works
override def receive = {
case msg:String => {
produce(msg)
}
}
override protected def receiveAfterProduce = {
case message: Message =>
println("##########################received message")
case failure: Failure =>
println("##########################received failure")

Martin Krasser

unread,
Oct 1, 2010, 12:27:08 PM10/1/10
to akka...@googlegroups.com
Why do you override Producer.receive? Producer actors should use the
default receive implementation from the Producer trait and do any custom
processing with receiveBeforeProduce and receiveAfterProduce.

Background: the jetty producer endpoint supports asynchronous
request/reply. Responses are added to the producer actor's mailbox
wrapped inside MessageResult/FailureResult. These are special wrappers
handled by Producer.receive to seperate them from request messages sent
to the producer actor. The http producer endpoint only supports
synchronous request/reply and the response is directly passed to
receiveAfterProduce (without adding it to the producer actor's mailbox).
That's why you see this issue only with the jetty endpoint URI.

Am 01.10.10 16:02, schrieb Raymond Roestenburg:

Raymond Roestenburg

unread,
Oct 1, 2010, 1:27:07 PM10/1/10
to akka...@googlegroups.com
Ah, didn't know I should have used receiveBeforeProduce. Thanks!

--
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/akka-user?hl=en.




--
Raymond Roestenburg

Martin Krasser

unread,
Oct 2, 2010, 2:48:50 AM10/2/10
to akka...@googlegroups.com
Maybe the akka-camel docu wasn't very clear about that. I tried to make it more clear in

http://doc.akkasource.org/camel#x-Produce%20messages-Producer%20trait

Let me know if it needs further improvement.

Am 01.10.10 19:27, schrieb Raymond Roestenburg:
Reply all
Reply to author
Forward
0 new messages