I have a simple service written with Python using Nameko based on RabbitMQ and RPC.
Here is the code of service.py
from nameko.rpc import rpc, RpcProxy
class Crawler(object):
name = "crawler"
@rpc
def scrapit(self):
return 'OK'
Then i start it using the command :
nameko run service --broker amqguest:guest@localhost
Finally i have a simple RPCCLient written in Java :
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
public class RPCClient {
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc-crawler";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
}
public String call(String message) throws IOException, InterruptedException {
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException {
connection.close();
}
//...
}
I call
RPCClient client = new RPCClient();
msg = client.call("crawler.scrapit()");
In my RabbitMQ management i see the message and a ack, but the client is blocking and i do not see the returned message "OK"
Any idea Thanks a lot . Laurent
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
channel.queueBind( replyQueueName , "nameko-rpc", "reply_to");
But same results... the client is stil blocking
In fact i retested my scrapit service , in which i've added a log, from a Python nameko client...
from nameko.standalone.rpc import ClusterRpcProxy
CONFIG = {'AMQP_URI': "amqp://guest:guest@localhost"}
def scrapit():
with ClusterRpcProxy(CONFIG) as rpc:
result = rpc.crawler.scrapit()
if __name__ == '__main__':
scrapit()
And i see the message from my consumer in the Terminal Window, all works fine
That's not the case with my Java client<.
So my consumer would not be hitten.
Are we sure the queue name is 'rpc-<service_name>'?
In my case my service is :
class Crawler(object):
name = "crawler"
@rpc
def scrapit(self):
print("Received a request send response")
return 'OK'
So the queue would be 'rpc-crawler' right?
Laurent
Thanks.In my client, afterchannel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();I've added this linechannel.queueBind( replyQueueName , "nameko-rpc", "reply_to");
But same results... the client is stil blocking
<snip> So the queue would be 'rpc-crawler' right?
private static final String EXCHANGE_NAME = "nameko-rpc";
Then after further tries i declare the exchange :
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);Finally i publish my message directly to the exchange and not the requestQueueName ...found the code on RabbitMQ for Publish/Subscribe , section EmitLog client https://www.rabbitmq.com/tutorials/tutorial-three-java.htmlchannel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());The message is a string to the nameko service.methode_name as it : crawler.scrapit()I see the message in the RabbitMQ console management, no more the ack .I have a log in my nameko service but nothing prints in the Terminal... as if the service is not hit
private static final String EXCHANGE_NAME = "nameko-rpc";
private String replyQueueName ;
private String requestQueueName = "rpc-crawler";
...
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
String corrId = UUID.randomUUID().toString();
replyQueueName = "rpc.reply-crawler-" + corrId;
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish( EXCHANGE_NAME, "", props, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
} catch (TimeoutException ioe) {
System.err.println(ioe.getMessage());
}
"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}
Is it normal? Or is it just an echo
Laurent
Trying to play with payload.I have a consumer ack alreadymy service is nowclass Crawler(object):name = "crawler"@rpcdef scrapit(self, a, b, c='C', d='D'):return (a,b,c,d)and in my Java Clientchannel.basicPublish( EXCHANGE_NAME, "crawler.scrapit", props, "{'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]}".getBytes("UTF-8"));Now i got a returned message like this"Returned message from service: {'args': ['hello', 'world', 'kwargs': {'c':'foo', 'd':'bar'}]} !"}Is it normal? Or is it just an echo
{"content":"Returned message from service: {'args': ['lbois']} !"}
I'm waiting about the database content and not the echo of my call
I think that doesn't work
Laurent
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class EmitNameko {
private static final String EXCHANGE_NAME = "nameko-rpc";
private String replyQueueName ;
public String call(String routingKey, String args)
throws java.io.IOException, InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
replyQueueName = channel.queueDeclare().getQueue();
channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish( EXCHANGE_NAME, routingKey, props, args.getBytes("UTF-8"));
//System.out.println(" [x] Sent '" + message + "'");
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
//...
}
I call then "call" method with routingKey an args
client = new EmitNameko();
msg = client.call("mongo.findStudent", "{'args': ['lbois']}");My service in Python is
channel.queueBind(replyQueueName, EXCHANGE_NAME, routingKey);
channel.queueBind(replyQueueName, EXCHANGE_NAME, replyQueueName);
return ('{ username: "' + result['username'] + '", age: "' + str(result['age']) + '"}')
AMQP_URI: 'pyamqp://guest:guest@localhost'
WEB_SERVER_ADDRESS: '0.0.0.0:8000'
rpc_exchange: 'nameko-rpc'
max_workers: 10
parent_calls_tracked: 10
LOGGING:
version: 1
handlers:
console:
class: logging.StreamHandler
root:
level: DEBUG
handlers: [console]
Start from server, version: 0.9, properties: {u'information': u'Licensed under the MPL. See http://www.rabbitmq.com/', u'product': u'RabbitMQ', u'copyright': u'Copyright (C) 2007-2018 Pivotal Software, Inc.', u'capabilities': {u'exchange_exchange_bindings': True, u'connection.blocked': True, u'authentication_failure_close': True, u'direct_reply_to': True, u'basic.nack': True, u'per_consumer_qos': True, u'consumer_priorities': True, u'consumer_cancel_notify': True, u'publisher_confirms': True}, u'cluster_name': u'rabbit@my-rabbit', u'platform': u'Erlang/OTP 20.2.2', u'version': u'3.7.3'}, mechanisms: [u'AMQPLAIN', u'PLAIN'], locales: [u'en_US']
Open OK!
using channel_id: 1
Channel open
So i do not see any error messages
My service is simple
from nameko.rpc import rpc, RpcProxy
import json
from StringIO import StringIO
class Crawler(object):
name = "crawler"
@rpc
def scrapit(self, username):
return ('message', username)
My Client do a call on crawler.scrapit using args/kwargs "{'args': ['lbois']}"
{"content":"Returned message from service: {\"result\": null, \"error\": {\"exc_path\": \"exceptions.TypeError\", \"value\": \"string indices must be integers\", \"exc_type\": \"TypeError\", \"exc_args\": [\"string indices must be integers\"]}} !"}
I think it comes from the args/kwargs not the returned value
Hope this helps