Hello!
I'm using a client application with spring-boot and a cluster (using docker-compose), when I publish new messages, it's fine, but if I try to consume messages, I get this error:
17:29:33.878 [main] DEBUG com.rabbitmq.stream.impl.Client - Trying to create stream connection to localhost:5555
17:29:33.899 [main] DEBUG com.rabbitmq.stream.impl.Client - Connection tuned with max frame size 1048576 and heartbeat 60
17:29:33.900 [main] DEBUG com.rabbitmq.stream.impl.Utils -
Expected client server7:5557, got server5:5555: failure17:29:33.900 [main] DEBUG com.rabbitmq.stream.impl.Client - Closing client
17:29:33.901 [main] DEBUG com.rabbitmq.stream.impl.Client - Closing Netty channel
this is my consumer source code:
public static void main(String[] args) throws Exception {
log("Connecting...");
Address entryPoint = new Address("localhost", 5555);
try (Environment environment = Environment.builder().host(entryPoint.host()).port(entryPoint.port())
.username("rabbit_admin").password(".123-321.").addressResolver(address -> entryPoint).build()) {
log("Connected");
AtomicInteger messageConsumed = new AtomicInteger(0);
long start = System.currentTimeMillis();
log("Start consumer...");
Consumer consumer = environment.consumerBuilder().stream("finance.eletronics")
.offset(OffsetSpecification.offset(0))
.messageHandler((context, message) -> {
messageConsumed.incrementAndGet();
System.out.println("Received: "+new String(message.getBodyAsBinary()));
})
.build();
Utils.waitAtMost(60, () -> messageConsumed.get() >= 1_000_000);
log("Consumed %,d messages in %s ms", messageConsumed.get(), (System.currentTimeMillis() - start));
log("Closing environment...");
}
log("Environment closed");
}
}
my rabbitmq.conf in each server (change only ports and the number in the name):
this is the example used int server2 server:
loopback_users.guest = true
stream.listeners.tcp.1 = 5552
stream.advertised_host = server2
stream.advertised_port = 5552
management.tcp.port = 15672
prometheus.tcp.port = 15692
listeners.tcp.default = 5672
a snippet of my dockere-compose.yml in server2:
version: "3.2"
services:
server2:
image: rabbitmq:3.10.9-management
hostname: server2
container_name: 'server2'
ports:
- "5672:5672"
- "15672:15672"
- "5552:5552"
- "15692:15692"