Super Streams Message Loss - Troubleshooting help

309 views
Skip to first unread message

Ryan Riley

unread,
Jun 19, 2025, 2:31:22 PMJun 19
to rabbitmq-users
Hello,

I am looking for some guidance as to what I can do to troubleshoot some message loss. 

We have 2 Super Stream apps (both Java, using Spring so Java Stream Client 0.23.0). We see application logs from our business logic suggesting that we are publishing data to the Super Stream. However, we are finding that some messages are not present in the stream. There are no logs in the producer or RabbitMQ that would suggest that something is going wrong. 

What troubleshooting steps can I take to try to narrow down this problem? The dataloss we estimate to be 1% or less, so it's very hard to reproduce or characterize. We wonder if it has to do with message size, as some messages that are being dropped are relatively large (up to 100mb in size). 

As far as we know, publisher confirms are on by default in Streams (reading the ProducerBuilder code). 

Karl Nilsson

unread,
Jun 19, 2025, 3:14:51 PMJun 19
to rabbitm...@googlegroups.com
What version are you using? Are you using publisher deduplication?

Karl Nilsson


--
You received this message because you are subscribed to the Google Groups "rabbitmq-users" group.
To unsubscribe from this group and stop receiving emails from it, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/rabbitmq-users/94431173-9b17-4838-a979-0b138066341an%40googlegroups.com.

Ryan Riley

unread,
Jun 19, 2025, 3:31:37 PMJun 19
to rabbitmq-users
RabbitMQ version is 4.1.0. No, we are not using publisher deduplication to my knowledge, though I admit I had to look that up and was surprised to find that setting a producer name enabled de-duplication. Nevertheless, I don't think we set a producer name.

Arnaud Cogoluègnes

unread,
Jun 20, 2025, 2:47:34 AMJun 20
to rabbitmq-users
Please use the latest stream Java client version (1.1.0 as of today).

100 mb is big for messages, you should consider storing the message content in a database and storing only references in RabbitMQ.

How do handle publish confirms (retry, timeout, etc)?

kjnilsson

unread,
Jun 20, 2025, 5:33:02 AMJun 20
to rabbitmq-users
We tested sending a 100MB message an by default it should throw an exception in the client unless the max frame size is negotiated to be 100MB+.  Please can you share the exact configuration in place for the stream (segment size, retention etc) as well as how you are handling confirm callbacks and exceptions in your client applications. What frame size does your client negotiate?

Ryan Riley

unread,
Jun 20, 2025, 10:19:25 AMJun 20
to rabbitmq-users
We're absolutely considering storing messages in some other medium, and we know large messages are not good for performance. Thanks for the advice.

Server-side:
  • Super Stream, 40 paritions
  • Retention policy is max-days 7D,  x-max-length-bytes 1_000_000_000_000, and  x-max-length-bytes 1_000_000_000
  • This stream is smaller than anticipated, hovering around 2-3 segments
  • Our consumer_offset_lag is never high. The consuming app is up to date, and consumes as fast as messages are published

Client-side
  • We use Spring Integration AMQP (dependencies provided by Spring Boot 3.5.1, which uses Stream Client 0.23), using an inbound and outbound adapter in both the producing and consuming app
  • Note: We tried manually bumping the Stream Client to latest, but Spring does not support it in Boot 3.5.1.
  • We don't use producer de-duplication
  • Essentially, we create a default RabbitStreamTemplate from Spring, and then we have our own "customizeStreamTemplate" method that uses our own Spring Boot @ConfigurationProperties to adjust some fields in the produder. If no properties are set (which none are), then all the defaults are used. In short, we are using Spring defaults, which basically use the Java Stream Client defaults. 

Producer code:
    @Bean
    public RabbitStreamTemplate streamTemplate(
            Environment environment,
            Function<Message, String> streamRoutingFunction,
            RabbitStreamTemplateConfigurer configurer,
            MyMessageConverter myMessageConverter,
            RabbitOutboundStreamProperties properties) {
        RabbitStreamTemplate rabbitStreamTemplate = new RabbitStreamTemplate(
                environment, properties.getStreamDefinition().getName());
        customizeStreamTemplate(
                rabbitStreamTemplate,
                properties.getProducerConfig(),
                configurer,
                streamRoutingFunction,
                axAMQPMessageConverter);
        return rabbitStreamTemplate;
    }
@Bean
    public Function<Message, String> streamRoutingFunction() {
        return message -> {
            String headerVal= (String) message.getApplicationProperties().get("SomeHeaderHere");
            Assert.notNull(
                    headerVal,
                    "SuperStream routing cannot be determined because Header was not present in the AMQP message");
            return headerVal;
        };
    }

@Bean
    StreamAdmin hypiStreamAdmin(Environment environment, RabbitOutboundStreamProperties properties) {
        return createStreamAdmin(environment, properties.getHypi().getStreamDefinition());
    }
private static void customizeStreamTemplate(
            RabbitStreamTemplate rabbitStreamTemplate,
            RabbitOutboundStreamProperties.ProducerConfig producerConfig,
            RabbitStreamTemplateConfigurer configurer,
            Function<Message, String> streamRoutingFunction,
            MyMessageConverter myMessageConverter) {
        configurer.configure(rabbitStreamTemplate);
        rabbitStreamTemplate.setSuperStreamRouting(streamRoutingFunction);
        rabbitStreamTemplate.setMessageConverter(messageConverter);
        rabbitStreamTemplate.setProducerCustomizer((s, producerBuilder) -> {
            Optional.ofNullable(producerConfig.getBatchSize()).ifPresent(producerBuilder::batchSize);
            Optional.ofNullable(producerConfig.getBatchPublishingDelay())
                    .ifPresent(producerBuilder::batchPublishingDelay);
            Optional.ofNullable(producerConfig.getCompression()).ifPresent(producerBuilder::compression);
            Optional.ofNullable(producerConfig.getConfirmTimeout()).ifPresent(producerBuilder::confirmTimeout);
            Optional.ofNullable(producerConfig.getEnqueueTimeout()).ifPresent(producerBuilder::enqueueTimeout);
            Optional.ofNullable(producerConfig.getMaxUnconfirmedMessages())
                    .ifPresent(producerBuilder::maxUnconfirmedMessages);
            Optional.ofNullable(producerConfig.getSubEntrySize()).ifPresent(producerBuilder::subEntrySize);
        });
    }

    private static StreamAdmin createStreamAdmin(
            Environment environment, RabbitOutboundStreamProperties.StreamDefinition properties) {
        return new StreamAdmin(environment, streamCreator -> streamCreator
                .name(properties.getName())
                .superStream()
                .partitions(properties.getPartitionCount())
                .creator()
                .maxAge(properties.getMaxAge())
                .maxLengthBytes(properties.getMaxLength())
                .maxSegmentSizeBytes(properties.getMaxSegmentSize())
                .create());
    }


Consumer code is very similar, mostly defaults from Spring. Let me know if you need to see that.

Ryan Riley

unread,
Jun 20, 2025, 10:32:32 AMJun 20
to rabbitmq-users
Karl sorry I didn't answer some of your questions:


"What frame size does your client negotiate" - I wasn't aware there was a negotiated frame size. Where would I find this? 

Ryan Riley

unread,
Jun 20, 2025, 11:17:38 AMJun 20
to rabbitmq-users
Sorry I also wanted to add - we are seeing message loss on messages in the range of 50-500Kb. I know I mentioned large messages earlier, but it might be a misleading piece of information. The dataloss is not exclusive to a given size of message.

I'm still working to try to characterize the lost messages and see if there's a pattern, but we cannot find one. We send around 8000 messages per second, and every so often we seem to lose "some number of them". We look in Prometheus metrics and there is no obvious dip in messages produced vs consumed (ie. it's not like we're constantly losing 20% messages).

Arnaud Cogoluègnes

unread,
Jun 20, 2025, 12:24:02 PMJun 20
to rabbitmq-users
Thanks for the extra info. The default max frame size is 1 MB, so large messages should not go through if you did not change the default.

The max frame size the client requests cannot exceed the one set on the broker side, see [1].

Spring AMQP stream support returns a Future, what do you do with it in your application? The underlying stream Java client throws an exception right away if a message cannot fit in a frame, but the Spring support catches the exception and fails the future. If you ignore the future in your application, you won't know about messages that are too large for the system.

Ryan Riley

unread,
Jun 20, 2025, 3:18:07 PMJun 20
to rabbitmq-users
The part of Spring we are using is Spring Integration, which has "wrappers" for AMQP clients. The one that wraps all these operations is here: https://github.com/spring-projects/spring-integration/blob/f121637bb67758ed92292ac9441ae58ab576f97f/spring-integration-amqp/src/main/java/org/springframework/integration/amqp/outbound/RabbitStreamMessageHandler.java#L208

I'll be honest, I was letting the framework handle it. Now that I look at the source code, I'm not sure what to expect. I was hoping errors would be logged (we see no such failures in logs).

After digging through what logs we do have though... it looks like some messages over 500KB are getting dropped. I confirm this by seeing the producer sending messages larger than 500KB, but then they are not present when we run a consumer on the Super Stream. 

We have not altered/configured anything with regard to frame size, so I suspect our server is set to 1mb as you say. Do batches have to fit inside a single frame, and thus the batch is subject to the max frame size? If we batch multiple messages and they result in 1.1mb, would the server throw an error?

I'm trying to dig through Spring Integration source to see if the errors are potentially being silently lost.

Ryan Riley

unread,
Jun 20, 2025, 3:51:45 PMJun 20
to rabbitmq-users
Aha!! In our app, we send several messages related to 1 "event" in our business logic. Filtering for all messages related to that single event, I can compare producer logs to consumer logs. I see messages that were successful to be sizes ranging from 4823 to 535_415 bytes. But we lost messages from 569_607 to 1_372_103 bytes. 

So it does seem like size is the culprit, as there was 0 message loss in that lower ranger and 100% message loss in the larger messages.

I will look to increase the max frame size. I am still curious about my above question though: Do batches have to fit inside a single frame, and thus the batch is subject to the max frame size? If we batch multiple messages and they result in 1.1mb, would the server throw an error?

Arnaud Cogoluègnes

unread,
Jun 23, 2025, 3:01:00 AMJun 23
to rabbitmq-users
I don't know what you call "a batch" in this context.

A message is rejected right away if it cannot fit in a single thread. The stream Java client accumulates messages and sends them asynchronously. When accumulated messages are flushed to the broker, the client accumulates as many messages as possible in publish frames and makes sure the frames do not exceed the max frame size.

You can increase the max frame size but you should also make sure to know when a message does not make it to the broker because of its size.

Feel free to write a standalone and simple project that reproduces the issue and share it with us.

Ryan Riley

unread,
Jun 23, 2025, 1:58:28 PMJun 23
to rabbitmq-users
Here is a standalone app, you can test yourself. This roughly resembles our code. https://github.com/metron2/streams-demo/tree/main

You can see the messages don't publish to Rabbit, nor are there any logs about it. We also tried making a cluster with  stream.frame_max = 135000000 and it didn't make a difference.

Ryan Riley

unread,
Jun 23, 2025, 9:50:57 PMJun 23
to rabbitmq-users
So in that linked project, we've played around with a few things.

First off, we found an issue with Spring Integration AMQP (which we will report to them) where they are wrapping and catching the errors from the Rabbit Stream Java Client and then sending them nowhere. If you follow the code path here, it attempts to resolve a channel but it ends up being a null key lookup in a Map. So this explains us not seeing the error. When we add an errorChannel in Spring manually, we can see the Stack Trace (again, all using the linked project above).

2025-06-23T14:51:47.178-04:00  INFO 159209 --- [demo] [   scheduling-1] c.example.demo.DemoWriterConfiguration   : Message sent! Size: 5242880B
2025-06-23T14:51:47.182-04:00 ERROR 159209 --- [demo] [   scheduling-1] o.s.integration.handler.LoggingHandler   : java.lang.IllegalArgumentException: Message too big to fit in one frame: 5243004
        at com.rabbitmq.stream.impl.Client.checkMessageFitsInFrame(Client.java:479)
        at com.rabbitmq.stream.impl.ProducerUtils$MessageAccumulatorHelper.entity(ProducerUtils.java:301)
        at com.rabbitmq.stream.impl.DynamicBatchMessageAccumulator.add(DynamicBatchMessageAccumulator.java:133)
        at com.rabbitmq.stream.impl.StreamProducer.doSend(StreamProducer.java:402)
        at com.rabbitmq.stream.impl.StreamProducer.send(StreamProducer.java:385)
        at com.rabbitmq.stream.impl.SuperStreamProducer.send(SuperStreamProducer.java:120)
        at org.springframework.rabbit.stream.producer.RabbitStreamTemplate.observeSend(RabbitStreamTemplate.java:276)
        at org.springframework.rabbit.stream.producer.RabbitStreamTemplate.send(RabbitStreamTemplate.java:265)
        at org.springframework.integration.amqp.outbound.RabbitStreamMessageHandler.handleMessageInternal(RabbitStreamMessageHandler.java:208)
        at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
        at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:436)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:343)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:312)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:487)
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.messageReceived(SourcePollingChannelAdapter.java:267)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:473)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:423)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:358)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:56)
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:54)
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$5(AbstractPollingEndpoint.java:351)
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:96)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)

We have been trying to set the stream.frame_max configuration property in Rabbit, but it doesn't seem to be working. Perhaps we're making a mistake in the way we set it. Does the client need to be configured for frame size as well, or is this "negotiated" with the server?

Arnaud Cogoluègnes

unread,
Jun 24, 2025, 3:00:21 AMJun 24
to rabbitmq-users

Ryan Riley

unread,
Jun 24, 2025, 10:12:23 AMJun 24
to rabbitm...@googlegroups.com
Hi Arnaud,

We set the "stream.frame_max" size (as per the doc you linked) but this didn't seem to work. What did work was setting the frame_max size (no "streams." prefix). Is there perhaps a bug with the "stream.frame_max" setting not working?

We have not yet altered the client frame_max as you instructed.

This electronic communication and the information and any files transmitted with it, or attached to it, are confidential and are intended solely for the use of the individual or entity to whom it is addressed and may contain information that is confidential, legally privileged, protected by privacy laws, or otherwise restricted from disclosure to anyone else. If you are not the intended recipient or the person responsible for delivering the e-mail to the intended recipient, you are hereby notified that any use, copying, distributing, dissemination, forwarding, printing, or copying of this e-mail is strictly prohibited. If you received this e-mail in error, please return the e-mail to the sender, delete it from your computer, and destroy any printed copy of it.

--
You received this message because you are subscribed to a topic in the Google Groups "rabbitmq-users" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/rabbitmq-users/ZRZx2KKLtKs/unsubscribe.
To unsubscribe from this group and all its topics, send an email to rabbitmq-user...@googlegroups.com.
To view this discussion visit https://groups.google.com/d/msgid/rabbitmq-users/1258de28-1a02-4a33-9f32-7768468deb5bn%40googlegroups.com.

Arnaud Cogoluègnes

unread,
Jun 24, 2025, 10:53:06 AMJun 24
to rabbitmq-users
Please use the stream.frame_max setting on the server side AND set the value as well on the client side. You can use the following command to check the setting is enforced when the client application is connected:

rabbitmqctl list_stream_connections frame_max

I just gave it a try locally with stream-perf-test and it is working as expected.

Arnaud Cogoluègnes

unread,
Jul 2, 2025, 9:14:05 AMJul 2
to rabbitmq-users
Hi, Ryan.

Any feedback? Did you manage to increase the max frame size?
Message has been deleted
Message has been deleted

Ryan Riley

unread,
Jul 3, 2025, 8:39:45 AMJul 3
to rabbitmq-users
Hi Arnaud,

I had sent a message but it said it's been deleted. Using the project we linked above, setting the stream.frame_max did not seem to help. We tried increasing the regular frame_max (which impacts AMQP traffic) and that seemed to make the Streams work, which seems odd.

But then, we're not sure why, we had 0% message loss on our larger messages but around 50% message loss on our smaller messages with no error logs.

We ended up rolling back our change from Classic Queues to Streams. We'll keep poking at this.

Have you been able to use the linked project above and set the stream.frame_max size and have large messages sent successfully?

Ryan Riley

unread,
Jul 3, 2025, 8:39:49 AMJul 3
to rabbitmq-users
Hi Arnaud,

We ended up rolling back the Streams changes and went back to our traditional Queues for now. 

As I mentioned above, we tried setting the stream.max_frame size and that did not seem to work. Using our example project we linked, the messages would be dropped by the client which threw the error noting the large frame size.

What did seem to work was setting the max_frame setting, but I understand this affects regular AMQP as well so it's not ideal. Large messages could then be sent. However, this (or something else) resulted in us experiencing ~50% data loss in our smaller messages in the stream. We will try to dig into this and get more details for you.

Question: Using our sample project, are you able to set the stream.max_frame size and successfully send large messages?

Side note - I sent a commit to Spring Integration to solve the hidden client error messages. Prior to this PR, Spring was silently dropping errors thrown by the Java Streams Client.  GH-10163: Fallback to default errorChannel in async mode · spring-projects/spring-integration@f4670e5

Thanks for following up with us.

Arnaud Cogoluègnes

unread,
Jul 4, 2025, 3:34:49 AMJul 4
to rabbitmq-users
> Question: Using our sample project, are you able to set the stream.max_frame size and successfully send large messages?

I am. The max frame size is not set on the Environment, it has to be, I repeated it several times. I simplified the project a bit (local broker, no TLS, etc).

Declare an environment customizer bean:

@Bean
EnvironmentBuilderCustomizer sslEnvironmentBuilderCustomizer() {
    return builder -> builder.requestedMaxFrameSize((int) DataSize.ofMegabytes(6L).toBytes());
}

Create configuration files for a Docker container:
mkdir rabbitmq-configuration
echo '[rabbitmq_stream].' > rabbitmq-configuration/enabled_plugins
echo "loopback_users=none
stream.frame_max=6000000" > rabbitmq-configuration/rabbitmq.conf

Run the broker:

docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 \
    -v "${PWD}"/rabbitmq-configuration:/etc/rabbitmq \
    -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \
    rabbitmq:4.1

Then run the modified program.

Confirm the max frame size setting is picked up:

docker exec rabbitmq rabbitmqctl list_stream_connections frame_max
Listing stream connections ...
┌───────────┐
│ frame_max │
├───────────┤
│ 6000000   │
├───────────┤
│ 6000000   │
└───────────┘

Confirm messages end up in the stream:

docker exec rabbitmq rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages
DemoStream-1    49
DemoStream-3    0
DemoStream-4    0
DemoStream-2    0
DemoStream-0    0

I confirm messages were not published without those changes. The exception is swallowed on the client side, as you mentioned.
Reply all
Reply to author
Forward
0 new messages