calling a nameko service from a Java client

236 views
Skip to first unread message

lauren...@gmail.com

unread,
Mar 2, 2018, 9:53:55 AM3/2/18
to nameko-dev

I have a simple service written with Python using Nameko based on RabbitMQ and RPC.

Here is the code of service.py

from nameko.rpc import rpc, RpcProxy

class Crawler(object):
    name = "crawler"

    @rpc
    def scrapit(self):
        return 'OK'

Then i start it using the command :

nameko run service --broker amqguest:guest@localhost

Finally i have a simple RPCCLient written in Java :

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

    private Connection connection;
    private Channel channel;
    private String requestQueueName = "rpc-crawler";
    private String replyQueueName;

    public RPCClient() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        connection = factory.newConnection();
        channel = connection.createChannel();

        replyQueueName = channel.queueDeclare().getQueue();
    }

    public String call(String message) throws IOException, InterruptedException {
        String corrId = UUID.randomUUID().toString();

        AMQP.BasicProperties props = new AMQP.BasicProperties
                .Builder()
                .correlationId(corrId)
                .replyTo(replyQueueName)
                .build();

        channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

        final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

        channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                if (properties.getCorrelationId().equals(corrId)) {
                    response.offer(new String(body, "UTF-8"));
                }
            }
        });

        return response.take();
    }

    public void close() throws IOException {
        connection.close();
    }

    //...
}

I call

 RPCClient client = new RPCClient();
 msg = client.call("crawler.scrapit()");

In my RabbitMQ management i see the message and a ack, but the client is blocking and i do not see the returned message "OK"

Any idea Thanks a lot . Laurent

Matt Yule-Bennett

unread,
Mar 4, 2018, 3:31:19 AM3/4/18
to nameko-dev
Does the service receive the request message and fire the entrypoint?

If so, do you see any message published to the RPC reply queue?

Any tracebacks anywhere?

There is a lot of detail on Nameko's AMQP-RPC protocol in http://izmailoff.github.io/architecture/rpc-revived/ and https://github.com/nameko/nameko/issues/354.

lauren...@gmail.com

unread,
Mar 4, 2018, 6:43:49 AM3/4/18
to nameko-dev
I only see a ack on the RabbitMQ management page... i see my message sent
I do not see answer in the reply queue
I think the entrypoint is hit because there is an ack,  but i do not understand why the replied message is not sent

Matt Yule-Bennett

unread,
Mar 4, 2018, 6:48:11 AM3/4/18
to nameko-dev
An ack for the request message means that it was received and processed without any error, including sending the reply.

The reply message is almost certainly being published but not routed to the appropriate reply queue. Looking at your code... is your reply queue bound to the RPC exchange? If not, that's your problem.

lauren...@gmail.com

unread,
Mar 4, 2018, 1:25:01 PM3/4/18
to nameko-dev
Yes but in my Java client i manage the replyQueue
I Use correlationId.
I started from the Java code in RabbitMQ RPC/Java/CorrelationId section https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/RPCClient.java

In my RabbitMQ management page i see the message sent from client , the Publisher confirm, and the ack now

As you say maybe it's a problem with the reply queue, and error in my client
Laurent

lauren...@gmail.com

unread,
Mar 4, 2018, 1:37:01 PM3/4/18
to nameko-dev
The problem is before the correlationId.

In the Java source code that is handleDelivery which blocks

Matt Yule-Bennett

unread,
Mar 4, 2018, 2:57:35 PM3/4/18
to nameko-dev
The RPC reply queue must be bound to the `nameko-rpc` exchange. The reply message is published to that exchange with the 'reply_to' as the routing key; if you haven't bound the reply queue to the correct exchange, the reply message will be discarded by the broker as not matching any routes.

Correlation ID has nothing to do with routing messages. It's only used to match replies up with requests so a client can have multiple requests in flight at the same time.

lauren...@gmail.com

unread,
Mar 4, 2018, 3:30:23 PM3/4/18
to nameko-dev
Thanks.

In my client, after
channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc",  "reply_to");

But same results... the client is stil blocking
In fact i retested my scrapit service , in which i've added a log, from a Python nameko client...









from nameko.standalone.rpc import ClusterRpcProxy


CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}



def scrapit():             

    with ClusterRpcProxy(CONFIG) as rpc:       

        result = rpc.crawler.scrapit()


if __name__ == '__main__':

  scrapit()

And i see the message from my consumer in the Terminal Window, all works fine

That's not the case with my Java client<.
So my consumer would not be hitten.

Are we sure the queue name is 'rpc-<service_name>'?

In my case my service is :








class Crawler(object):

    name = "crawler"

        

    @rpc

    def scrapit(self):

        print("Received a request send response")

        return 'OK'

So the queue would be 'rpc-crawler' right?

Laurent

Matt Yule-Bennett

unread,
Mar 4, 2018, 4:54:06 PM3/4/18
to nameko-dev


On Sunday, March 4, 2018 at 8:30:23 PM UTC, lauren...@gmail.com wrote:
Thanks.

In my client, after
channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

I've added this line

channel.queueBind( replyQueueName , "nameko-rpc",  "reply_to");


Have you bound your reply queue with the literal value "reply_to"? That won't work. It needs to be the value that you've specified in the "reply to" header in your request message. In your case that's replyQueueName.

 
But same results... the client is stil blocking

<snip>

So the queue would be 'rpc-crawler' right?


That is correct, yes. If it wasn't correct you would not see the request message being ack'd by your service.

lauren...@gmail.com

unread,
Mar 5, 2018, 3:27:32 AM3/5/18
to nameko-dev
I changed my client code :

First i declare the exchange:

private static final String EXCHANGE_NAME = "nameko-rpc";

Then after further tries i declare the exchange :
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.html

channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());


The message is a string to the nameko service.methode_name as it : crawler.scrapit()

I see the message in the RabbitMQ console management, no more the ack .
I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit

Matt Yule-Bennett

unread,
Mar 12, 2018, 5:41:31 AM3/12/18
to nameko-dev
I took a closer look at this and found more problems. I don't have a complete copy of your client so let me just explain in English how the RPC flow works.

* The listening service will have declared the "nameko-rpc" exchange, and a queue called "rpc-<service-name>", bound to it with the routing key "<service-name>.*" (it's a topic exchange)
* You must declare an RPC reply queue. The queue name can be anything but the nameko Python client names them "rpc.reply-<service-name>-<uuid>". It must be bound to the "nameko-rpc" exchange with a unique name; the Python client uses a uuid. 
* To make a call, you publish a message to the "nameko-rpc" exchange, with "<service-name>.<method-name>" as the routing key. The payload must be a serialised dict containing the key "args" and "kwargs", with the values being the parameters for the method call. You must specify a "reply_to" header with the routing key you've used to bind the reply queue to the RPC exchange. You should also specify a "correlation_id" header so your client can issue multiple concurrent requests and match the replies back up again. You also need to specify content-type and encoding headers so the service knows how to deserialize the incoming data. By default the listening service will be expecting JSON.
* The request message will be routed to the listening service's queue, consumed, processed and ack'd. The service will then reply by publishing to the RPC exchange using the "reply_to" as the routing key and including the "correlation_id header from the request message.
* The response message will be routed to the reply queue. Your client should consume it and use the correlation id to match it up with a pending request.

Looking at your first post, the client is not publishing the request message correctly. It's sending it directly to the service queue via the default exchange, without the correct routing key. Your service is probably consuming the message and replying with a MethodNotFound message, which was being lost because your reply queue wasn't bound correctly either. Your payload also isn't correct in the first version. It should be (assuming JSON serialization) '{"args": [], "kwargs": []}'
Message has been deleted
Message has been deleted
Message has been deleted

lauren...@gmail.com

unread,
Mar 13, 2018, 11:30:23 AM3/13/18
to nameko-dev
Thanks for this great explanation

I think the problem can come from the serialization of the payload in Java. In my code i just do a byte conversion from string (service.method)

In my last tries i sent the payload to the echange, with a reply queue (bit not named as you say)

I'm going to try what you wrote and come back

Thanks

lauren...@gmail.com

unread,
Mar 13, 2018, 11:30:46 AM3/13/18
to nameko-dev
Hi folks

How can you translate a payload : service-name-method_name in JSON with {args: [] , kwargs: []} notation?

Thanks

Laurent

lauren...@gmail.com

unread,
Mar 13, 2018, 11:31:33 AM3/13/18
to nameko-dev
Here my Nameko Java client : the sent message is "<service name>.<method name>" but should be json (args kwargs) formatted.

private static final String EXCHANGE_NAME = "nameko-rpc";
private String replyQueueName ;

private String requestQueueName = "rpc-crawler";
...
try {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
    Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);

    String corrId = UUID.randomUUID().toString();
replyQueueName = "rpc.reply-crawler-" + corrId;


AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

    channel.basicPublish( EXCHANGE_NAME, "", props, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");


channel.close();
connection.close();
} catch (TimeoutException ioe) {
System.err.println(ioe.getMessage());
}

Le mardi 13 mars 2018 16:30:23 UTC+1, lauren...@gmail.com a écrit :

Matt Yule-Bennett

unread,
Mar 14, 2018, 12:23:55 PM3/14/18
to nameko-dev
The routing_key needs to be "<service-name>.<method-name>".
The payload needs to be a serialized dict in the form: {"args": <positional-arguments>, "kwargs": <keyword-arguments>}

For example, if I had a service:

class Service:
    name = "service"

    @rpc
    def method(self, a, b, c="C", d="D"):
        print(a, b, c, d)

And I published a message to the RPC exchange with:

routing_key: service.method
payload: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}

The service would print:

hello world foo bar

lauren...@gmail.com

unread,
Mar 14, 2018, 1:01:26 PM3/14/18
to nameko-dev

Thanks

I found how to set the routing key. I missed it



Now with my Routing key, i have something new , i see a message queued

I check for the payload.

Thanks again

lauren...@gmail.com

unread,
Mar 14, 2018, 1:03:42 PM3/14/18
to nameko-dev
And i got a consumer ack.

lauren...@gmail.com

unread,
Mar 15, 2018, 3:11:35 AM3/15/18
to nameko-dev

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

 channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes("UTF-8"));

Now i got a returned message like this

"Returned message from service:  {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo

Laurent

Matt Yule-Bennett

unread,
Mar 15, 2018, 5:56:58 AM3/15/18
to nameko-dev
Congratulations, your client is working.


On Thursday, March 15, 2018 at 7:11:35 AM UTC, Laurent Bois M/45/174/90/CF-L1 wrote:

Trying to play with payload.
I have a consumer ack already

my service is now
class Crawler(object):
    name = "crawler"
        
    @rpc
    def scrapit(self, a, b, c='C', d='D'):
        return (a,b,c,d)

and in my Java Client

 channel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes("UTF-8"));

Now i got a returned message like this

"Returned message from service:  {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}

Is it normal? Or is it just an echo


This is what you've asked your service to return. The RPC entrypoint will serialize (if it can) whatever is returned from the decorated method and return that to the client.

lauren...@gmail.com

unread,
Mar 15, 2018, 6:04:33 AM3/15/18
to nameko-dev
My last question

Is the answer from the consumer (Nameko service in Python) formatted in JSON/dict?

Laurent

lauren...@gmail.com

unread,
Mar 15, 2018, 9:02:25 AM3/15/18
to nameko-dev
My client or my server seems not to work

Here is my server , a simple code doing a find against mongodb

class Mongo(object):
    name = "mongo"

    @rpc
    def findStudent(self, username):
        client = MongoClient('mongodb://admin:XXXX@localhost:2707?authSource=admin')

        db = client.local

        col = db.school
        
        result = col.find_one({'username': username})
        
        return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')
      
my client Java call routing kay : mongo.findStudent
Arguments  : "{'args': ['lbois']}"

And the returned response is an echo of the call:
{"content":"Returned message from service:  {'args': ['lbois']} !"}

I'm waiting about the database content and not the echo of my call

I think that doesn't work

Laurent

Matt Yule-Bennett

unread,
Mar 15, 2018, 1:01:37 PM3/15/18
to nameko-dev
Can you post your complete client?

lauren...@gmail.com

unread,
Mar 15, 2018, 1:10:29 PM3/15/18
to nameko-dev
Here it is

The class :



import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class EmitNameko {


private static final String EXCHANGE_NAME = "nameko-rpc";
    private String replyQueueName ;

public String call(String routingKey, String args)
throws java.io.IOException, InterruptedException, TimeoutException {


ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
            Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
            replyQueueName = channel.queueDeclare().getQueue();
channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);


String corrId = UUID.randomUUID().toString();

AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

            channel.basicPublish( EXCHANGE_NAME, routingKey, props, args.getBytes("UTF-8"));
//System.out.println(" [x] Sent '" + message + "'");

final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});

return response.take();


}
    //...
}

I call then "call" method with routingKey an args

client = new EmitNameko();
msg = client.call("mongo.findStudent", "{'args': ['lbois']}");

My service in Python is

Matt Yule-Bennett

unread,
Mar 20, 2018, 8:24:47 AM3/20/18
to nameko-dev
The problem is that you have bound your reply queue to the rpc exchange with the same routing key as the service . Your reply consumer is consuming the request message instead of the service.

The reply queue needs to be bound to the rpc exchange with a unique identifier (and that same identifier passed as the reply_to when publishing the request message). Change the line:

channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);

to

channel.queueBind(replyQueueName, EXCHANGE_NAME, replyQueueName);

and it'll probably work.

lauren...@gmail.com

unread,
Mar 20, 2018, 11:19:02 AM3/20/18
to nameko-dev
Thanks it works

i see the error message {"content":"Returned message from service: {\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\", \"value\": \"string indices must be integers\", \"exc_type\": \"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"} coming from my Py server

Matt Yule-Bennett

unread,
Mar 21, 2018, 12:49:33 PM3/21/18
to nameko-dev
Your service is throwing an error. You should be able to see a traceback in the service logs.

I guess that "result" on the line below is a string, not a dict as you're assuming.

return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')

Also note that the RPC entrypoint will attempt to serialize whatever your service method returns. So you don't need to do this string concatenation at all.

lauren...@gmail.com

unread,
Mar 22, 2018, 4:45:38 AM3/22/18
to nameko-dev
Hello

I increased the log level in the console with this config file

AMQP_URI: 'pyamqp://guest:guest@localhost'

WEB_SERVER_ADDRESS: '0.0.0.0:8000'

rpc_exchange: 'nameko-rpc'

max_workers: 10

parent_calls_tracked: 10


LOGGING:

    version: 1

    handlers:

        console:

            class: logging.StreamHandler

    root:

        level: DEBUG

        handlers: [console]


Here is what i see when i hit my service from my Java client 

Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL.  See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2018 Pivotal Software, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'connection.blocked': True, u'authentication_failure_close': True, u'direct_reply_to': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@my-rabbit', u'platform': u'Erlang/OTP 20.2.2', u'version': u'3.7.3'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']

Open OK!

using channel_id: 1

Channel open


So i do not see any error messages


My service is simple


from nameko.rpc import rpc, RpcProxy

import json


from StringIO import StringIO


class Crawler(object):

    name = "crawler"

        

    @rpc

    def scrapit(self, username):

        

        return ('message', username)


My Client do a call on crawler.scrapit using args/kwargs "{'args': ['lbois']}"


And my resulting error is :
{"content":"Returned message from service:  {\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\", \"value\": \"string indices must be integers\", \"exc_type\": \"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}

I think it comes from the args/kwargs not the returned value

Hope this helps

Matt Yule-Bennett

unread,
Mar 22, 2018, 12:37:59 PM3/22/18
to nameko-dev
Well this has been a fun exercise in me remembering how to program in Java...

You're right that the problem is with the input. There are three problems:

1. You're not specifying a content type. The message was treated as a string rather than being JSON-decoded back into an object.
2. Your payload is incomplete. You have to specify both "args" and "kwargs" keys.
3. Your payload is not valid JSON, which requires keys to be surrounded with double quotes.

The reason that you don't see a traceback in the service is that the RPC entrypoint handles these malformed request errors and returns them in the reply, rather than blowing up.

lauren...@gmail.com

unread,
Mar 22, 2018, 12:51:05 PM3/22/18
to nameko-dev
Matt

I thank you for the fix. 
Yes now it works

Happy to communicate with a Py service from Java.. I continue to explore

Thanks a lot
Reply all
Reply to author
Forward
0 new messages