Hi everyone,
Is there any way to change type of received message? We want to implement transparent handling of messages with big payload (I am aware about explicit approach in MassTransit). To be more specific I'll provide more details.
We have different type of messages that can be send through MassTransit. Some of them called bulk events can contain collection of other events. And this collection can be pretty big (1-2 millions of objects):
public interface IBulkEvent<out T> where T : IEvent
{
IEnumerable<T> Data { get; }
}
So we serialize these events to JSON and store it in Amazon S3. Then we send information about storage location and actual message type using IPayloadReference interface:
public interface IPayloadReference
{
Uri Address { get; }
Type Type { get; }
}
We already implemented everything that we need in our code. But we want to separate this code from business logic code and move it somewhere in infrastructure. I spent time looking into source code of MassTransit and GreenPipes. Looks like there are something that can help me (like DynamicFilter/DynamicRouter) but actually I got stuck...
public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
{
ConsumeContext<MessageWithPayload> consumeContext;
if (!context.TryGetMessage(out consumeContext))
{
await next.Send(context);
}
else
{
var message = await _repository.RetrieveMessage<T>(consumeContext.Message.PayloadId);
await next.Send(???) <---------- The place where I stuck :)
}
}
From implementation of TransformFilter I understood that I can create new ConsumeContext:
if (result.IsNewValue)
{
var transformedContext = new MessageConsumeContext<T>(context, result.Value);
return next.Send(transformedContext);
}
But I need to change not the value but type of message.
I am relatively newbie in MassTransit and have not ever worked with it on such low level. May be I am going in wrong direction... So will appreciate any help or suggestions.
Thanks