producer - 4 cores , 8 gb (EC2) which includes also the rabbitmq installation
consumer - 16 core , 30gb (EC2)
im using the java api with qos of (0,0,true) for each channel and 5 threads on a fixed thread pool per connection and auto ack.
durable queues, exchanges.
messages are non persistant.
12 fanout exchanges , each with about 3 queues , each with 3 bindings routing keys , channel per binding.
cat /etc/rabbitmq/rabbitmq-env.confexport RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+A 128"
cat /etc/rabbitmq/rabbitmq.config
[
{kernel,[
{inet_default_connect_options, [{nodelay, true}]},
{inet_default_listen_options, [{nodelay, true}]}
]
},
{rabbit,[
{hipe_compile,true},
{tcp_listen_options, [
{backlog, 128},
{nodelay, true},
{sndbuf, 196608},
{recbuf, 196608}
]
}
]
}
].--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
public void send(AbstractMessage msg) {
String routingKey = msg.getRoutingKey().getRoutingKeyString();String msgJson = gson.toJson(msg);if (!this.connection.isOpen()) { logger.debug("Connection has died. creating new connection"); this.connection = buildConnection();}if (!this.channel.isOpen()) { this.logger.debug("Channel is closed. creating new channel."); this.channel = this.connection.createChannel(); this.channel.exchangeDeclare(this.exchange, "topic", true);}this.channel.basicPublish(this.exchange, routingKey, MessageProperties.TEXT_PLAIN, msgJson.getBytes());
}
public Connection buildConnection() {
return factory.newConnection(Executors.newFixedThreadPool(3), addressArr);
}public void listen(String routingKey) {
Connection connection = buildConnection();
Channel channel = this.connection.createChannel(); channel.exchangeDeclare(this.exchange, "topic", true); channel.queueDeclare(this.queueName, false, false, true, null); channel.basicQos(0, 0, true); channel.queueBind(this.queueName, this.exchange, routingKey); channel.basicConsume(this.queueName, true, buildConsumer());}
private DefaultConsumer buildConsumer(Channel channel) { return new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgJson = new String(body); AbstractMessage absMsg = gson.fromJson(msgJson, AbstractMessage.class); handler.handleMessage(absMsg); }
};
}--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To post to this group, send email to rabbitm...@googlegroups.com.