Trying to consume stream messages

147 views
Skip to first unread message

Danilo

unread,
Oct 17, 2022, 4:40:57 PM10/17/22
to rabbitmq-users
Hello!
I'm using a client application with spring-boot and a cluster (using docker-compose), when I publish new messages, it's fine, but if I try to consume messages, I get this error:

17:29:33.878 [main] DEBUG com.rabbitmq.stream.impl.Client - Trying to create stream connection to localhost:5555
17:29:33.899 [main] DEBUG com.rabbitmq.stream.impl.Client - Connection tuned with max frame size 1048576 and heartbeat 60
17:29:33.900 [main] DEBUG com.rabbitmq.stream.impl.Utils - Expected client server7:5557, got server5:5555: failure
17:29:33.900 [main] DEBUG com.rabbitmq.stream.impl.Client - Closing client
17:29:33.901 [main] DEBUG com.rabbitmq.stream.impl.Client - Closing Netty channel

this is my consumer source code:  
        public static void main(String[] args) throws Exception {
            log("Connecting...");
            Address entryPoint = new Address("localhost", 5555);
            try (Environment environment = Environment.builder().host(entryPoint.host()).port(entryPoint.port())
                    .username("rabbit_admin").password(".123-321.").addressResolver(address -> entryPoint).build()) {


                log("Connected");

                AtomicInteger messageConsumed = new AtomicInteger(0);
                long start = System.currentTimeMillis();
                log("Start consumer...");
                Consumer consumer = environment.consumerBuilder().stream("finance.eletronics")
                        .offset(OffsetSpecification.offset(0))
                        .messageHandler((context, message) -> {
                            messageConsumed.incrementAndGet();
                            System.out.println("Received: "+new String(message.getBodyAsBinary()));
                        })
                        .build();
                Utils.waitAtMost(60, () -> messageConsumed.get() >= 1_000_000);
                log("Consumed %,d messages in %s ms", messageConsumed.get(), (System.currentTimeMillis() - start));
                log("Closing environment...");
            }
            log("Environment closed");
        }
    }

my rabbitmq.conf in each server (change only ports and the number in the name):
this is the example used int server2 server:
loopback_users.guest = true
stream.listeners.tcp.1 = 5552
stream.advertised_host = server2
stream.advertised_port = 5552
management.tcp.port = 15672
prometheus.tcp.port = 15692
listeners.tcp.default = 5672


a snippet of my dockere-compose.yml in server2:  
version: "3.2"
services:
server2:
image: rabbitmq:3.10.9-management
hostname: server2
container_name: 'server2'
ports:
- "5672:5672"
- "15672:15672"
- "5552:5552"
- "15692:15692"

Arnaud Cogoluègnes

unread,
Oct 18, 2022, 3:11:13 AM10/18/22
to rabbitmq-users
You don't need to use ".addressResolver(address -> entryPoint)" if you're not using a load balancer. See the Java client documentation [1] for more information on when you do need to set an AddressResolver.

Danilo

unread,
Oct 18, 2022, 2:57:49 PM10/18/22
to rabbitmq-users
It isn't the problem.
I removed this code and I'm still get this message: Expected client server4:5554, got server7:5557: failure

Danilo

unread,
Oct 18, 2022, 3:37:42 PM10/18/22
to rabbitmq-users
I have found the problem.
In my rabbitmq.conf archives, in all brokers, I need to configure the same server:
stream.advertised_host = server5
stream.advertised_port = 5555
I'm still studying how to configure one for publishing and another for consuming.

Arnaud Cogoluègnes

unread,
Oct 19, 2022, 2:37:16 AM10/19/22
to rabbitmq-users
The "connecting to streams" blog post [1] should help you figure out which solution is best for you.

Using the same settings for the advertised host and port is not usually a good solution, the explanation is covered in the blog post.

Reply all
Reply to author
Forward
0 new messages