Question regarding rabbitmq-objc-client

475 views
Skip to first unread message

Sasi

unread,
Jun 15, 2016, 4:38:49 PM6/15/16
to rabbitmq-users
Hello guys,
 Thanks for making the objc client. In the following code, I am not seeing the log messages being printed in the subscribe and consume blocks. I have no idea what is really happening here. The connection to the server is happening fine as there are no errors being printed in the connection delegate. I am also posting the equivalent swift code that used the old librabbitmq-c library. The swift code using the librabbitmq-c prints the response, but the objc code using the latest rabbitmq-objc-client doesn't print the logs.

Objective-C code using rabbitmq-objc-client:

id<RMQChannel> channel = [self.connection createChannel];

[channel basicConsume:self.queuename options:RMQBasicConsumeNoAck handler:^(RMQMessage *message) {

   
NSLog(@"Consumed message: %@", message);
}];


RMQBasicContentType *contentType = [[RMQBasicContentType alloc] init:@"text/json"];
RMQBasicDeliveryMode *deliveryMode = [[RMQBasicDeliveryMode alloc] init:2]; // Persistent mode is required
RMQBasicPriority *priority = [[RMQBasicPriority alloc] init:1];             // Priority should be 1
RMQBasicReplyTo *replyTo = [[RMQBasicReplyTo alloc] init:self.queuename];   // Queue name and reply queue is the same
RMQBasicUserId *userId = [[RMQBasicUserId alloc] init:username];            // Server needs username to auth

NSArray *properties = @[contentType, deliveryMode, priority, replyTo, userId];

RMQQueue *offersQueue = [channel queue:self.queuename options:RMQQueueDeclareExclusive];

[channel basicPublish:inputMessage routingKey:@"" exchange:self.exchangeName properties:properties options:RMQBasicPublishNoOptions];


[offersQueue subscribe:^(RMQMessage *message) {

   
NSLog(@"Subscribed message: %@", message);
}];


Swift code using librabbitmq-c - 


let routingKey
= ""

self.channelId += 1

amqp_channel_open
(self.connection, self.channelId)

if !self.checkLastOperation("Opening channel") {

   
self.isConnected = false
   callback
(result: nil)
   
return
}

defer
{
   amqp_channel_close
(connection, self.channelId, AMQP_REPLY_SUCCESS)
}

       
// Declare a queue

let r
: UnsafeMutablePointer<amqp_queue_declare_ok_t> = amqp_queue_declare(self.connection, self.channelId, amqp_cstring_bytes(queueName), 0, 0, 1, 0, amqp_empty_table)

if r == nil {

   fatalError
("Error: queue couldn't be declared")

}

let queue
= amqp_bytes_malloc_dup(r.memory.queue)

if queue.bytes == nil {

    debugPrint
("Out of memory while copying queue name")
   
return
}

// Basic consume

        let _
= amqp_basic_consume(self.connection, self.channelId, queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table)

       
if !self.checkLastOperation("Get reply after basic consume") { return }

       

       
// Set properties

       
var props: amqp_basic_properties_t = amqp_basic_properties_t()

        props
._flags = UInt32(

            AMQP_BASIC_CONTENT_TYPE_FLAG
|

            AMQP_BASIC_DELIVERY_MODE_FLAG
|

            AMQP_BASIC_PRIORITY_FLAG
|

            AMQP_BASIC_REPLY_TO_FLAG
|

            AMQP_BASIC_USER_ID_FLAG

       
)

        props
.content_type = amqp_cstring_bytes("text/json")

        props
.delivery_mode = 2     // 2 is persistent; 1 is non-persistent

        props
.priority = 1

        props
.reply_to = queue

       

        let username
= Session.sharedSession.token?.queueUsername.cStringUsingEncoding(NSUTF8StringEncoding)

        props
.user_id = amqp_cstring_bytes(username!)

        debugPrint
("Property.userId = \(username)")

       

       
// Publish message

        let publishPassOrFail
= amqp_basic_publish(self.connection, self.channelId, amqp_cstring_bytes(exchangeName), amqp_cstring_bytes(routingKey), 0, 0, &props, amqp_cstring_bytes(inputMessage))

        amqp_bytes_free
(queue)

       

       
if publishPassOrFail == AMQP_STATUS_OK.rawValue {

            debugPrint
("Publish success")

           

           
if !self.checkLastOperation("Get reply after basic publish") { return }

           

           
var frame: amqp_frame_t = amqp_frame_t()

           

           
while true {

               
var envelope = amqp_envelope_t()

               
//amqp_maybe_release_buffers(self.connection)

               

               
var amqpReply: amqp_rpc_reply_t = amqp_consume_message(self.connection, &envelope, nil, 0)

               
if amqpReply.reply_type != AMQP_RESPONSE_NORMAL {

                   

                   
if amqpReply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION &&

                        amqpReply
.library_error == AMQP_STATUS_UNEXPECTED_STATE.rawValue {

                        debugPrint
("Response: Failed")

                       
if amqp_simple_wait_frame(self.connection, &frame) != AMQP_STATUS_OK.rawValue {

                           
return

                       
}

                       

                       
if Int32(frame.frame_type) == AMQP_FRAME_METHOD {

                            debugPrint
("Method.ID value = \(frame.payload.method.id)")

                           

                           
switch (Int(frame.payload.method.id)) {

                               

                           
case AMQPConstants.AMQP_BASIC_ACK_METHOD.rawValue:

                               
/* if we've turned publisher confirms on, and we've published a message

                                 * here is a message being confirmed

                                 */


                                debugPrint
("Our publish message has been acknowledged")

                               
return

                               

                           
case AMQPConstants.AMQP_BASIC_RETURN_METHOD.rawValue:

                               
/* if a published message couldn't be routed and the mandatory flag was set

                                 * this is what would be returned. The message then needs to be read.

                                 */


                                debugPrint
("Response: Basic return")

                               
var message: amqp_message_t = amqp_message_t()

                                amqpReply
= amqp_read_message(self.connection, frame.channel, &message, 0);

                               
if amqpReply.reply_type != AMQP_RESPONSE_NORMAL {

                                   
return

                               
}

                               

                                amqp_destroy_message
(&message);

                               
return

                               

                           
case AMQPConstants.AMQP_CHANNEL_CLOSE_METHOD.rawValue:

                               
/* a channel.close method happens when a channel exception occurs, this

                                 * can happen by publishing to an exchange that doesn't exist for example

                                 *

                                 * In this case you would need to open another channel redeclare any queues

                                 * that were declared auto-delete, and restart any consumers that were attached

                                 * to the previous channel

                                 */


                                debugPrint
("Response: Channel close")

                               
return

                               

                           
case AMQPConstants.AMQP_CONNECTION_CLOSE_METHOD.rawValue:

                               
/* a connection.close method happens when a connection exception occurs,

                                 * this can happen by trying to use a channel that isn't open for example.

                                 *

                                 * In this case the whole connection must be restarted.

                                 */


                               
self.isConnected = false
                                debugPrint("Response: Connection close")
                               
return

                           
default:
                                debugPrint
("Unexpected payload method received: \(frame.payload.method.id)")
                               
return
                           
}
                       
}
                   
}
               
}

               
else {

                    debugPrint
("Response: normal and envelope: \(envelope.message.body)")

                    let messageReplyString
: String = self.stringFromAmqpBytes(envelope.message.body)

                    let message
= self.decodeJSON(messageReplyString)

                    callback
(result: message)

                    amqp_destroy_envelope
(&envelope)

                   
break
               
}
           
}
       
}

       
else {
            debugPrint
("Publish failed")
       
}
   
}





Thanks for your help.


Michael Klishin

unread,
Jun 15, 2016, 5:05:41 PM6/15/16
to rabbitm...@googlegroups.com
Your Swift/librabbitmq-c code uses the routing key of amqp_cstring_bytes(routingKey) while
the Objective-C version uses an empty string. Unless the exchange is of type fanout, that can make all the difference in whether the message is routed or not. If messages are not routed you should be able to easily see this in the management UI.

--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.



--
MK

Staff Software Engineer, Pivotal/RabbitMQ

Sasi

unread,
Jun 15, 2016, 11:57:34 PM6/15/16
to rabbitmq-users
Thanks for the quick reply, Michael. In the Swift/librabbitmq-c code, the routingKey is empty string. Does amqp_cstring_bytes("") is different from rabbitmq-objc-client's routing key @""? I do not know what the exchange type is set to on the server side.

Michael Klishin

unread,
Jun 16, 2016, 5:47:13 AM6/16/16
to rabbitm...@googlegroups.com
Empty string means the same thing in every client.

Michael Klishin

unread,
Jun 16, 2016, 5:50:10 AM6/16/16
to rabbitm...@googlegroups.com
Well, what do you know about the topology? Do you have access to the management UI?
You can do a traffic capture and see what basic.publish frames look like on the wire:

but quite often it is easy to notice a different in message flow in the UI with message rates enabled (and they are by default).

On Thu, Jun 16, 2016 at 6:57 AM, Sasi <kiran...@gmail.com> wrote:

Sasi

unread,
Jun 16, 2016, 12:54:44 PM6/16/16
to rabbitmq-users
Thanks Michael. I will request for the management UI access on the server side. If I don't get it, I will ask for the SSL certificate / key as it's an amqps:// protocol.

Michael Klishin

unread,
Jun 16, 2016, 3:29:25 PM6/16/16
to rabbitm...@googlegroups.com
I don't see how TLS could affect routing.

Sasi

unread,
Jun 20, 2016, 1:24:43 PM6/20/16
to rabbitmq-users
Update to this: I tried doing a packet capture using Wireshark 2.0.4 using the SSL certificate. The problem is, the packet list is not very readable (at least to me). The  protocol is being detected as TCP instead of AMQP. In the Info column, I do not see Class.Method names as suggested in the dissector plugin webpage, instead I am seeing TCP ACK and other packets. This is quite hard to debug and compare. Is there anything from the client side that I could do to get this working? I have a feeling that if the protocol gets identified as AMQP, I would be able to read Basic.Publish etc. frames. The port is also set to 443 (instead of 5671 / 5672). 

Getting access to RabbitMQ management console (for me) is a bit complicated as it is behind VPN. 

Michael Klishin

unread,
Jun 20, 2016, 3:31:56 PM6/20/16
to rabbitm...@googlegroups.com
Wireshark needs a private key in order to decipher TLS-encrypted traffic and dissect the wrapped
messaging protocol.

Sasi

unread,
Jun 20, 2016, 8:28:31 PM6/20/16
to rabbitmq-users
I have the private key PEM file. Under preferences for SSL protocol, I clicked edit routes and added the host name, port number, private key path and left the password empty as there was no password for it. I restarted the client and tried it again, i still see the same result where I am not able to read the communication strings from the intercepted packet data. So, I gave this up. 

I got access to the management UI with admin permission. What I found out was that when I run the app using the old C port, I am seeing the user and the connection in the "Connections" tab. However, when I run the new objc-code, I am not seeing the user at all. This is so strange. 

Sasi

unread,
Jun 21, 2016, 12:11:09 AM6/21/16
to rabbitmq-users
On further investigating, I noticed that there is a convenience start:^{ } apart from start() method on RMQConnection. When I make the connection object call start:^ { NSLog(@"Connected") } the block is never called. 

I have to start a connection with TLS and a specific username and password combination. Hence, I have to only use the lengthy designated initialiser to initialise the connection object. I do not see any examples anywhere where the designated initialiser's usage is shown. The parameters like commandQueue, waiterFactory and heartbeatSender are pretty confusing. I set all these values to self and implemented all the methods in the respective protocols by only printing a NSLog(), still nothing prints except the enqueueOperation block. The following is my code snippet, if you happen to find any obvious silly mistake, please do let me know. I have to create a connection with a username and password and the connection should be a TLS socket with verifyPeer set to false.

NSString *urlString = @"amqps://maskedurl.com";
RMQTLSOptions *tlsOptions = [RMQTLSOptions fromURI:urlString verifyPeer:NO];

RMQTCPSocketTransport *socket = [[RMQTCPSocketTransport alloc] initWithHost:@"maskedhostname.com" port:@443 tlsOptions:tlsOptions];

RMQCredentials *credentials = [[RMQCredentials alloc] initWithUsername:username password:password];

RMQConnectionConfig *config = [[RMQConnectionConfig alloc] initWithCredentials:credentials channelMax:@(RMQChannelLimit) frameMax:@131072 heartbeat:@0 vhost:@"/" authMechanism:@"PLAIN" recovery:self];

_waiter
= [[RWaiter alloc] init];
_channelAllocator
= [[RMQMultipleChannelAllocator alloc] initWithChannelSyncTimeout:@30];

_connection
= [[RMQConnection alloc] initWithTransport:socket config:config handshakeTimeout:@30 channelAllocator:self.channelAllocator frameHandler:self delegate:self commandQueue:self waiterFactory:self heartbeatSender:self];

[_connection start:^{
   
NSLog(@"Connection started");
   
// Sadly, this block is never executed.
}];


// RWaiter is nothing but the following


@interface RWaiter: NSObject <RMQWaiter>


@property (assign) BOOL doneCalled;

@property (assign) BOOL timesOutCalled;


@end

@implementation RWaiter


- (id)init {

   self = [super init];

   if (self) {

       _doneCalled = NO;

       _timesOutCalled = NO;

   }

   return self;

}


#pragma mark - RMQWaiter


- (void)done {

   self.doneCalled = YES;

}


- (BOOL)timesOut {

   self.timesOutCalled = YES;

   return NO;

}


@end


Any pointers on how to debug this issue? I see no connection being established on management console either. 

Thanks.

Michael Klishin

unread,
Jun 21, 2016, 6:57:32 AM6/21/16
to rabbitm...@googlegroups.com
Do you have access to server logs? Can you post them?

Sasi

unread,
Jun 21, 2016, 10:25:53 AM6/21/16
to rabbitmq-users
The ones that have the filenames "rab...@Dev-Rabbit-A-sasl.log", "rab...@Dev-Rabbit-B-sasl.log", "rab...@Dev-Rabbit-C-sasl.log", "rab...@Dev-Rabbit-D-sasl.log" ? I am not sure which ones. When I checked these logs, I only see a couple of crash reports dated back to March and nothing else from the last 2 days. 

Thanks.

Sasi

unread,
Jun 21, 2016, 11:47:09 AM6/21/16
to rabbitmq-users
Update: I was able to solve the connection start() issues. I am able to successfully start the connections now. The issue was that I was passing self for commandQueue, waitorFactory, heartbeatSender etc. I had to create new objects and pass it on. 

Now, I'm back to the original problem where I am not seeing a response after a publish. When I monitored closely both the new code and old code, it looks like the old code is creating a queue and consumer and then destroying it. In the new code, the new queue and consumer are not being shown on the management console. I guess I have to look and play more with the queue and consumer creation part. 

Thanks again for the help.

Sasi

unread,
Jun 21, 2016, 12:56:29 PM6/21/16
to rabbitmq-users
On further investigation, I found that the rabbitmq-objc-client code has no consumer for the channel that it is creating while the librabbitmq-c code is showing a consumer attached to the channel. 

I am calling 

[channel basicPublish:inputMessage routingKey:routingKey exchange:self.exchangeName properties:properties options:RMQBasicPublishNoOptions];

and still I do not understand why there is no consumer attached to the channel. Any idea what it could be?

Thanks.

Michael Klishin

unread,
Jun 21, 2016, 2:20:57 PM6/21/16
to rabbitm...@googlegroups.com
Why do you expect a consumer to appear after you call basic.publish? Consumers are registered
with the basic.consume method, the easiest way to do this is via RMQQueue#subscribe.

I'd recommend reading http://www.rabbitmq.com/tutorials/amqp-concepts.html because the Objective-C client
has some API sugar that still builds on the standard set of protocol methods (operations) and there effectively no
additional abstractions, unlike with some plugins not developed by our team.

Sasi

unread,
Jun 21, 2016, 3:16:43 PM6/21/16
to rabbitmq-users
I am calling subscribe first and then basicPublish. The channel's "consumers" array / dictionary is also empty before subscribing and also after subscribing. 

In the handle frameset delegate method, I am seeing a channelOK response followed by queueDeclareOK response. 

NSString *inputMessage = [self encodeJSON:dictionary];
NSString *routingKey = @"";

id
<RMQChannel> channel = [self.connection createChannel];
[channel activateWithDelegate:self];

RMQQueue *queue = [channel queue:self.queuename options:RMQQueueDeclareExclusive];

[queue subscribe:RMQBasicConsumeNoAck handler:^(RMQMessage *message) {
   
NSLog(@"Subscribed message: %@", message);
}];

RMQBasicContentType *contentType = [[RMQBasicContentType alloc] init:@"text/json"];
RMQBasicDeliveryMode *deliveryMode = [[RMQBasicDeliveryMode alloc] init:2]; // Persistent mode required
RMQBasicPriority *priority = [[RMQBasicPriority alloc] init:1];             // Priority should be 1
RMQBasicReplyTo *replyTo = [[RMQBasicReplyTo alloc] init:self.queuename];   // Queue name and reply queue is the same
RMQBasicUserId *userId = [[RMQBasicUserId alloc] init:username];            // Server needs username to auth

NSArray *properties = @[contentType, deliveryMode, priority, replyTo, userId];

[channel basicPublish:inputMessage routingKey:routingKey exchange:self.exchangeName properties:properties options:RMQBasicPublishNoOptions];

Michael Klishin

unread,
Jun 21, 2016, 3:51:18 PM6/21/16
to rabbitm...@googlegroups.com
Are you sure you need to use RMQChannel#activateWithDelegate?
Our tutorials only use createChannel, for example:

Michael Klishin

unread,
Jun 21, 2016, 3:51:53 PM6/21/16
to rabbitm...@googlegroups.com
Also, would you *please* post log files or traffic capture? Guessing is a very inefficient debugging technique.

Andrew Bruce

unread,
Jun 21, 2016, 4:47:00 PM6/21/16
to rabbitmq-users
Hi Kiran!

Firstly, apologies for not getting to this sooner - I had set up a filter that was effectively hiding everything on this list. D'oh!

The init methods you're using, as well as activateWithDelegate, are meant to be implementation details and should not normally be used.

The usual way to set username and password is via the URI, which will be parsed into RMQCredentials etc (again, implementation detail).

So, you could try the following init to get a connection instance:

RMQConnection *conn = [[RMQConnection alloc] initWithUri:@"amqps://myusername:mypas...@my.host.place:1234" verifyPeer:YES delegate:myDelegate];

I'd also recommend performing publishes from a queue or exchange instance obtained from a channel, e.g:

id<RMQChannel> ch = [conn createChannel];
q
= [ch queue:@"my-queue"];
[q publish:@"foo"]; // or other method beginning with 'publish'

...as that way you can't get the routing key wrong.

As everything is async, there's a good chance that you don't see anything happen because something terminates the process before it gets to work. Are you using this in an app with a GUI?

If you're still having problems tomorrow, please bug me in the RabbitMQ Slack channel: https://rabbitmq-slack.herokuapp.com/ - I work 9-6 BST.

P.S. you should feel free to use this from Swift. We're targeting Swift users, and all the tests are written in Swift!
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.

For more options, visit https://groups.google.com/d/optout.

Sasi

unread,
Jun 22, 2016, 10:58:13 AM6/22/16
to rabbitmq-users
Hello Michael and Andrew, 
 Since I do not own the server side, getting things done from that end is quite tedious. The server logs that were shared to me had only crash reports dated March and nothing regarding the recent requests / responses. When I tried to create a trace, it fails to start no matter what input I provide (wildcard or method name). 

Our project is a Swift project where librabbitmq-c was being used as a static library. I started this in an example project in Obj-C only to get the integration part done with the new rabbitmq-objc-client. 

The only reason I added activateWithDelegate:_ is because I was able to see log messages with replies such as socketOpenOK and queueDeclareOK. 

I still do not see a consumer attached to the queue even though there is a [queue subscribe:]. I also changed [channel basicPublish:] to [queue publish:] and see there is no difference. 

Ultimately, all this will happen in an app with UI. The example project has no UI though. 

Once again, thanks so much. I will hit you up on Slack channel.

Michael Klishin

unread,
Jun 22, 2016, 11:17:31 AM6/22/16
to rabbitm...@googlegroups.com
Is there anything in the log files? We cannot reason about what's going on without log files or a libpcap/Wireshark capture.
We have asked for logs several times before and every time this request is being ignored. Sorry but I'm running out of guesses
as to what else you may want to try.

Sasi kiran

unread,
Jun 22, 2016, 12:22:10 PM6/22/16
to rabbitm...@googlegroups.com
Hello Michael,
 I realised why the wireshark capture isn't working. The log had 
"ssl_decrypt_pre_master_secret: session uses Diffie-Hellman key exchange (cipher suite 0xC013 TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA) and cannot be decrypted using a RSA private key file."

I will ask the server side team to change it to use only RSA only cipher suites.

Thanks.

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/D5s1gqtFd5o/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.
Reply all
Reply to author
Forward
0 new messages