@Bean
public Function<Message, String> streamRoutingFunction() {
return message -> {
String headerVal= (String) message.getApplicationProperties().get("SomeHeaderHere");
Assert.notNull(
headerVal,
"SuperStream routing cannot be determined because Header was not present in the AMQP message");
return headerVal;
};
}
@Bean
StreamAdmin hypiStreamAdmin(Environment environment, RabbitOutboundStreamProperties properties) {
return createStreamAdmin(environment, properties.getHypi().getStreamDefinition());
}
private static void customizeStreamTemplate(
RabbitStreamTemplate rabbitStreamTemplate,
RabbitOutboundStreamProperties.ProducerConfig producerConfig,
RabbitStreamTemplateConfigurer configurer,
Function<Message, String> streamRoutingFunction,
MyMessageConverter myMessageConverter) {
configurer.configure(rabbitStreamTemplate);
rabbitStreamTemplate.setSuperStreamRouting(streamRoutingFunction);
rabbitStreamTemplate.setMessageConverter(messageConverter);
rabbitStreamTemplate.setProducerCustomizer((s, producerBuilder) -> {
Optional.ofNullable(producerConfig.getBatchSize()).ifPresent(producerBuilder::batchSize);
Optional.ofNullable(producerConfig.getBatchPublishingDelay())
.ifPresent(producerBuilder::batchPublishingDelay);
Optional.ofNullable(producerConfig.getCompression()).ifPresent(producerBuilder::compression);
Optional.ofNullable(producerConfig.getConfirmTimeout()).ifPresent(producerBuilder::confirmTimeout);
Optional.ofNullable(producerConfig.getEnqueueTimeout()).ifPresent(producerBuilder::enqueueTimeout);
Optional.ofNullable(producerConfig.getMaxUnconfirmedMessages())
.ifPresent(producerBuilder::maxUnconfirmedMessages);
Optional.ofNullable(producerConfig.getSubEntrySize()).ifPresent(producerBuilder::subEntrySize);
});
}
private static StreamAdmin createStreamAdmin(
Environment environment, RabbitOutboundStreamProperties.StreamDefinition properties) {
return new StreamAdmin(environment, streamCreator -> streamCreator
.name(properties.getName())
.superStream()
.partitions(properties.getPartitionCount())
.creator()
.maxAge(properties.getMaxAge())
.maxLengthBytes(properties.getMaxLength())
.maxSegmentSizeBytes(properties.getMaxSegmentSize())
.create());
}