Re: [akka-user] akka with jetty-websockets

645 views
Skip to first unread message

Patrik Nordwall

unread,
Nov 22, 2012, 3:19:20 AM11/22/12
to akka...@googlegroups.com
I don't know what the problem can be. Can you try with the Akka Camel module in Akka 2.1-RC2?
http://doc.akka.io/docs/akka/2.1.0-RC2/java/camel.html

On Thu, Nov 22, 2012 at 12:08 AM, Bob Thorman <thor...@gmail.com> wrote:
I've written an akka-1.3 UntypedProducerActor and instantiated it with a jetty-websocket server as follows:

        wsp = actorOf(new UntypedActorFactory() {
                    public Actor create() {
                        return new webSocketProvider("ws://myHost:8000/myServer");
                    }
                }).start();

I also have a .jsp that connects to the same endpoint on document.onload as connect('websocket://myHost:8000/myServer'); with the following connect function:

function connect(host) {
    try {
        var socket = new WebSocket(host);
        socket.onopen = function() {
        };
        socket.onmessage = function(msg) {
                document.getElementById('#content').append('', '<p class="even">' + msg + '</p>');
        };
        socket.onclose = function() {
        };
    } catch(exception) {
    }
}

I can see the connection established in netstat -a each time the page loads.  And the akka-dispatcher logging says the jetty server was started on myhost:8000.  But when I use the producers actorRef to send a message it never makes it to the client.  The producer logs each message during the onReceivedBeforeProduced() method, but the client never receives it.  Status in the client also says the socket is open and no errors are logged.

What am I missing? 

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To post to this group, send email to akka...@googlegroups.com.
To unsubscribe from this group, send email to akka-user+...@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user?hl=en.
 
 



--
Patrik Nordwall
Typesafe - The software stack for applications that scale
Twitter: patriknw

Raymond Roestenburg

unread,
Nov 23, 2012, 12:55:47 PM11/23/12
to akka...@googlegroups.com



On Fri, Nov 23, 2012 at 6:38 PM, Bob Thorman <thor...@gmail.com> wrote:
I udpated to akka-2.1.0-RC2 and now getting this exception.  What is the connection key?

It is a specific setting required in the web socket camel component.
WebsocketConstants.CONNECTION_KEY Sends the message to the client with the given connection key.

check:
 
82902 [SearchSystem-akka.actor.default-dispatcher-1] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.10.0 (CamelContext: SearchSystem) is starting
82902 [SearchSystem-akka.actor.default-dispatcher-1] INFO org.apache.camel.impl.DefaultCamelContext - StreamCaching is enabled on CamelContext: SearchSystem
82902 [SearchSystem-akka.actor.default-dispatcher-1] INFO org.apache.camel.management.ManagementStrategyFactory - JMX is disabled.
82902 [SearchSystem-akka.actor.default-dispatcher-1] INFO org.apache.camel.impl.DefaultCamelContext - Total 0 routes, of which 0 is started.
82903 [SearchSystem-akka.actor.default-dispatcher-1] INFO org.apache.camel.impl.DefaultCamelContext - Apache Camel 2.10.0 (CamelContext: SearchSystem) started in 0.000 seconds
[ERROR] [11/23/2012 17:29:10.737] [SearchSystem-akka.actor.default-dispatcher-4] [akka://SearchSystem/user/$a] Failed to send message to single connection; connetion key not set.
97b3c210-adbf-4dd6-882d-2e733a632ee7akka.camel.AkkaCamelException: Failed to send message to single connection; connetion key not set.
    at akka.camel.ProducerSupport$$anonfun$produce$1.applyOrElse(Producer.scala:75)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at akka.camel.javaapi.UntypedProducerActor.onReceive(UntypedProducerActor.scala:51)
    at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:159)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:420)
    at akka.actor.ActorCell.invoke(ActorCell.scala:381)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:230)
    at akka.dispatch.Mailbox.run(Mailbox.scala:212)
    at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:502)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Caused by: java.lang.IllegalArgumentException: Failed to send message to single connection; connetion key not set.
    at org.apache.camel.component.websocket.WebsocketProducer.process(WebsocketProducer.java:55)
    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:120)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:292)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:115)
    at akka.camel.ProducerSupport$ProducerChild.produce(Producer.scala:140)
    at akka.camel.ProducerSupport$ProducerChild$$anonfun$receive$1.applyOrElse(Producer.scala:113)
    ... 9 more



--
Raymond Roestenburg

Raymond Roestenburg

unread,
Nov 23, 2012, 1:30:26 PM11/23/12
to akka...@googlegroups.com
Hi Bob,

I think you just need to add it as a header, but I've never used it.
 You'll need to investigate the camel websocket component a bit further and which settings you need to set.
Most of the time getting a camel component to work properly is all about setting the right query parameters on the url to configure the component and using the right headers on the messages. 


On Fri, Nov 23, 2012 at 7:11 PM, Bob Thorman <thor...@gmail.com> wrote:
Is there a way to assign this key when I create my UntypedProducerActor?

Bob Thorman

unread,
Nov 24, 2012, 9:48:50 AM11/24/12
to akka...@googlegroups.com
I managed to get the websocket connection key inserted into the header just as you suggested.  Thanks Raymond.  But now I have an exception each time I send a message saying the body is null.  I created the message like this...

Map<String, Object> headers = new HashMap<String,Object>();
headers.put(WebsocketConstants.CONNECTION_KEY, WebsocketConstants.SEND_TO_ALL);

CamelMessage cm = new CamelMessage("<p>Hello Client!</p>", headers);

Any ideas?

On Friday, November 23, 2012 12:36:35 PM UTC-6, Bob Thorman wrote:
Yeah I found the attribute in the WebSocketServletConnection class but haven't figured out how to get the instance that's created by the akka-camel integration.  Working on finding that when the connection is established...

Bob Thorman

unread,
Nov 24, 2012, 12:06:16 PM11/24/12
to akka...@googlegroups.com
Here's the exception I get with my akka producer sending a CamelMessage to the jetty.websocket endpoint...

[ERROR] [11/24/2012 17:03:25.163] [SearchSystem-akka.actor.default-dispatcher-7] [akka://SearchSystem/user/$b] No body available of type: java.lang.String on: Message: [Body is null]. Caused by: No type converter available to convert from type: null to the required type: java.lang.String with value null. Exchange[Message: [Body is null]]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: null to the required type: java.lang.String with value null]
c2ec7f48-3232-496f-b36b-c98216070ae1akka.camel.AkkaCamelException: No body available of type: java.lang.String on: Message: [Body is null]. Caused by: No type converter available to convert from type: null to the required type: java.lang.String with value null. Exchange[Message: [Body is null]]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: null to the required type: java.lang.String with value null]

    at akka.camel.ProducerSupport$$anonfun$produce$1.applyOrElse(Producer.scala:75)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at akka.camel.javaapi.UntypedProducerActor.onReceive(UntypedProducerActor.scala:51)
    at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:159)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:420)
    at akka.actor.ActorCell.invoke(ActorCell.scala:381)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:230)
    at akka.dispatch.Mailbox.run(Mailbox.scala:212)
    at akka.dispatch.ForkJoinExecutorConfigurator$MailboxExecutionTask.exec(AbstractDispatcher.scala:502)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:262)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:975)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1478)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:104)
Caused by: org.apache.camel.InvalidPayloadException: No body available of type: java.lang.String on: Message: [Body is null]. Caused by: No type converter available to convert from type: null to the required type: java.lang.String with value null. Exchange[Message: [Body is null]]. Caused by: [org.apache.camel.NoTypeConversionAvailableException - No type converter available to convert from type: null to the required type: java.lang.String with value null]
    at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:101)
    at org.apache.camel.component.websocket.WebsocketProducer.process(WebsocketProducer.java:43)

    at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
    at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
    at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:120)
    at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:292)
    at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:115)
    at akka.camel.ProducerSupport$ProducerChild.produce(Producer.scala:140)
    at akka.camel.ProducerSupport$ProducerChild$$anonfun$receive$1.applyOrElse(Producer.scala:113)
    ... 9 more
Caused by: org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: null to the required type: java.lang.String with value null
    at org.apache.camel.impl.converter.BaseTypeConverterRegistry.mandatoryConvertTo(BaseTypeConverterRegistry.java:169)
    at org.apache.camel.impl.MessageSupport.getMandatoryBody(MessageSupport.java:99)
    ... 17 more

Raymond Roestenburg

unread,
Nov 24, 2012, 3:26:09 PM11/24/12
to akka...@googlegroups.com
I got a weird hunch, try to set oneway to true on the producer, see if that works.


On Sat, Nov 24, 2012 at 6:06 PM, Bob Thorman <thor...@gmail.com> wrote:
No body available of type: java.lang.String on: Message: [Body is null]



Veebs

unread,
Nov 24, 2012, 4:46:07 PM11/24/12
to akka...@googlegroups.com
If you want to broadcast to all connections, this worked for me

val header = Map((WebsocketConstants.SEND_TO_ALL, "true"))

So you might want to try:

headers.put(WebsocketConstants.SEND_TO_ALL, "true");

Vibul

Bob Thorman

unread,
Nov 26, 2012, 2:40:58 PM11/26/12
to akka...@googlegroups.com
I got the WebsocketConstants.SEND_TO_ALL to work, any idea how I get the connection key out of the UntypedProducerActor so that I can use the WebsocketConstants.CONNECTION_KEY option?

√iktor Ҡlang

unread,
Nov 26, 2012, 3:28:23 PM11/26/12
to Akka User List
Hi!

Have you tried getting the WebsocketConstants.CONNECTION_KEY out of the headers of the inbound CamelMessage?

Cheers,
Viktor Klang

Akka Tech Lead
Typesafe - The software stack for applications that scale

Twitter: @viktorklang

Bob Thorman

unread,
Nov 26, 2012, 5:17:23 PM11/26/12
to akka...@googlegroups.com
Its an InOnly exchange pattern that I'm using.  But I guess I can send the server a "Hello Server" message to see if I can get that?
Reply all
Reply to author
Forward
0 new messages