id<RMQChannel> channel = [self.connection createChannel];
[channel basicConsume:self.queuename options:RMQBasicConsumeNoAck handler:^(RMQMessage *message) {
NSLog(@"Consumed message: %@", message);
}];
RMQBasicContentType *contentType = [[RMQBasicContentType alloc] init:@"text/json"];
RMQBasicDeliveryMode *deliveryMode = [[RMQBasicDeliveryMode alloc] init:2]; // Persistent mode is required
RMQBasicPriority *priority = [[RMQBasicPriority alloc] init:1]; // Priority should be 1
RMQBasicReplyTo *replyTo = [[RMQBasicReplyTo alloc] init:self.queuename]; // Queue name and reply queue is the same
RMQBasicUserId *userId = [[RMQBasicUserId alloc] init:username]; // Server needs username to auth
NSArray *properties = @[contentType, deliveryMode, priority, replyTo, userId];
RMQQueue *offersQueue = [channel queue:self.queuename options:RMQQueueDeclareExclusive];
[channel basicPublish:inputMessage routingKey:@"" exchange:self.exchangeName properties:properties options:RMQBasicPublishNoOptions];
[offersQueue subscribe:^(RMQMessage *message) {
NSLog(@"Subscribed message: %@", message);
}];Swift code using librabbitmq-c -
let routingKey = ""
self.channelId += 1
amqp_channel_open(self.connection, self.channelId)
if !self.checkLastOperation("Opening channel") {
self.isConnected = false
callback(result: nil)
return
}
defer {
amqp_channel_close(connection, self.channelId, AMQP_REPLY_SUCCESS)
}
// Declare a queue
let r: UnsafeMutablePointer<amqp_queue_declare_ok_t> = amqp_queue_declare(self.connection, self.channelId, amqp_cstring_bytes(queueName), 0, 0, 1, 0, amqp_empty_table)
if r == nil {
fatalError("Error: queue couldn't be declared")
}
let queue = amqp_bytes_malloc_dup(r.memory.queue)
if queue.bytes == nil {
debugPrint("Out of memory while copying queue name")
return
}
// Basic consume
let _ = amqp_basic_consume(self.connection, self.channelId, queue, amqp_empty_bytes, 0, 1, 0, amqp_empty_table)
if !self.checkLastOperation("Get reply after basic consume") { return }
// Set properties
var props: amqp_basic_properties_t = amqp_basic_properties_t()
props._flags = UInt32(
AMQP_BASIC_CONTENT_TYPE_FLAG |
AMQP_BASIC_DELIVERY_MODE_FLAG |
AMQP_BASIC_PRIORITY_FLAG |
AMQP_BASIC_REPLY_TO_FLAG |
AMQP_BASIC_USER_ID_FLAG
)
props.content_type = amqp_cstring_bytes("text/json")
props.delivery_mode = 2 // 2 is persistent; 1 is non-persistent
props.priority = 1
props.reply_to = queue
let username = Session.sharedSession.token?.queueUsername.cStringUsingEncoding(NSUTF8StringEncoding)
props.user_id = amqp_cstring_bytes(username!)
debugPrint("Property.userId = \(username)")
// Publish message
let publishPassOrFail = amqp_basic_publish(self.connection, self.channelId, amqp_cstring_bytes(exchangeName), amqp_cstring_bytes(routingKey), 0, 0, &props, amqp_cstring_bytes(inputMessage))
amqp_bytes_free(queue)
if publishPassOrFail == AMQP_STATUS_OK.rawValue {
debugPrint("Publish success")
if !self.checkLastOperation("Get reply after basic publish") { return }
var frame: amqp_frame_t = amqp_frame_t()
while true {
var envelope = amqp_envelope_t()
//amqp_maybe_release_buffers(self.connection)
var amqpReply: amqp_rpc_reply_t = amqp_consume_message(self.connection, &envelope, nil, 0)
if amqpReply.reply_type != AMQP_RESPONSE_NORMAL {
if amqpReply.reply_type == AMQP_RESPONSE_LIBRARY_EXCEPTION &&
amqpReply.library_error == AMQP_STATUS_UNEXPECTED_STATE.rawValue {
debugPrint("Response: Failed")
if amqp_simple_wait_frame(self.connection, &frame) != AMQP_STATUS_OK.rawValue {
return
}
if Int32(frame.frame_type) == AMQP_FRAME_METHOD {
debugPrint("Method.ID value = \(frame.payload.method.id)")
switch (Int(frame.payload.method.id)) {
case AMQPConstants.AMQP_BASIC_ACK_METHOD.rawValue:
/* if we've turned publisher confirms on, and we've published a message
* here is a message being confirmed
*/
debugPrint("Our publish message has been acknowledged")
return
case AMQPConstants.AMQP_BASIC_RETURN_METHOD.rawValue:
/* if a published message couldn't be routed and the mandatory flag was set
* this is what would be returned. The message then needs to be read.
*/
debugPrint("Response: Basic return")
var message: amqp_message_t = amqp_message_t()
amqpReply = amqp_read_message(self.connection, frame.channel, &message, 0);
if amqpReply.reply_type != AMQP_RESPONSE_NORMAL {
return
}
amqp_destroy_message(&message);
return
case AMQPConstants.AMQP_CHANNEL_CLOSE_METHOD.rawValue:
/* a channel.close method happens when a channel exception occurs, this
* can happen by publishing to an exchange that doesn't exist for example
*
* In this case you would need to open another channel redeclare any queues
* that were declared auto-delete, and restart any consumers that were attached
* to the previous channel
*/
debugPrint("Response: Channel close")
return
case AMQPConstants.AMQP_CONNECTION_CLOSE_METHOD.rawValue:
/* a connection.close method happens when a connection exception occurs,
* this can happen by trying to use a channel that isn't open for example.
*
* In this case the whole connection must be restarted.
*/
self.isConnected = false
debugPrint("Response: Connection close")
return
default:
debugPrint("Unexpected payload method received: \(frame.payload.method.id)")
return
}
}
}
}
else {
debugPrint("Response: normal and envelope: \(envelope.message.body)")
let messageReplyString: String = self.stringFromAmqpBytes(envelope.message.body)
let message = self.decodeJSON(messageReplyString)
callback(result: message)
amqp_destroy_envelope(&envelope)
break
}
}
}
else {
debugPrint("Publish failed")
}
}
Thanks for your help.
--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
NSString *urlString = @"amqps://maskedurl.com";
RMQTLSOptions *tlsOptions = [RMQTLSOptions fromURI:urlString verifyPeer:NO];
RMQTCPSocketTransport *socket = [[RMQTCPSocketTransport alloc] initWithHost:@"maskedhostname.com" port:@443 tlsOptions:tlsOptions];
RMQCredentials *credentials = [[RMQCredentials alloc] initWithUsername:username password:password];
RMQConnectionConfig *config = [[RMQConnectionConfig alloc] initWithCredentials:credentials channelMax:@(RMQChannelLimit) frameMax:@131072 heartbeat:@0 vhost:@"/" authMechanism:@"PLAIN" recovery:self];
_waiter = [[RWaiter alloc] init];
_channelAllocator = [[RMQMultipleChannelAllocator alloc] initWithChannelSyncTimeout:@30];
_connection = [[RMQConnection alloc] initWithTransport:socket config:config handshakeTimeout:@30 channelAllocator:self.channelAllocator frameHandler:self delegate:self commandQueue:self waiterFactory:self heartbeatSender:self];
[_connection start:^{
NSLog(@"Connection started");
// Sadly, this block is never executed.
}];
// RWaiter is nothing but the following
@interface RWaiter: NSObject <RMQWaiter>
@property (assign) BOOL doneCalled;
@property (assign) BOOL timesOutCalled;
@end
@implementation RWaiter
- (id)init {
self = [super init];
if (self) {
_doneCalled = NO;
_timesOutCalled = NO;
}
return self;
}
#pragma mark - RMQWaiter
- (void)done {
self.doneCalled = YES;
}
- (BOOL)timesOut {
self.timesOutCalled = YES;
return NO;
}
@end
[channel basicPublish:inputMessage routingKey:routingKey exchange:self.exchangeName properties:properties options:RMQBasicPublishNoOptions];NSString *inputMessage = [self encodeJSON:dictionary];
NSString *routingKey = @"";
id<RMQChannel> channel = [self.connection createChannel];
[channel activateWithDelegate:self];
RMQQueue *queue = [channel queue:self.queuename options:RMQQueueDeclareExclusive];
[queue subscribe:RMQBasicConsumeNoAck handler:^(RMQMessage *message) {
NSLog(@"Subscribed message: %@", message);
}];
RMQBasicContentType *contentType = [[RMQBasicContentType alloc] init:@"text/json"];
RMQBasicDeliveryMode *deliveryMode = [[RMQBasicDeliveryMode alloc] init:2]; // Persistent mode required
RMQBasicPriority *priority = [[RMQBasicPriority alloc] init:1]; // Priority should be 1
RMQBasicReplyTo *replyTo = [[RMQBasicReplyTo alloc] init:self.queuename]; // Queue name and reply queue is the same
RMQBasicUserId *userId = [[RMQBasicUserId alloc] init:username]; // Server needs username to auth
NSArray *properties = @[contentType, deliveryMode, priority, replyTo, userId];
[channel basicPublish:inputMessage routingKey:routingKey exchange:self.exchangeName properties:properties options:RMQBasicPublishNoOptions];
RMQConnection *conn = [[RMQConnection alloc] initWithUri:@"amqps://myusername:mypas...@my.host.place:1234" verifyPeer:YES delegate:myDelegate];id<RMQChannel> ch = [conn createChannel];
q = [ch queue:@"my-queue"];
[q publish:@"foo"]; // or other method beginning with 'publish'To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-users+unsubscribe@googlegroups.com.
To post to this group, send email to rabbitmq-users@googlegroups.com.
--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/D5s1gqtFd5o/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.