Change message type

48 views
Skip to first unread message

Volodymyr Kostochka

unread,
Oct 31, 2019, 9:15:53 AM10/31/19
to masstransit-discuss
Hi everyone,

Is there any way to change type of received message? We want to implement transparent handling of messages with big payload (I am aware about explicit approach in MassTransit). To be more specific I'll provide more details.

We have different type of messages that can be send through MassTransit. Some of them called bulk events can contain collection of other events. And this collection can be pretty big (1-2 millions of objects):

public interface IBulkEvent<out T> where T : IEvent
{
    IEnumerable<T> Data { get; }
}

So we serialize these events to JSON and store it in Amazon S3. Then we send information about storage location and actual message type using IPayloadReference interface:

public interface IPayloadReference
{
    Uri Address { get; }

    Type Type { get; }
}

We already implemented everything that we need in our code. But we want to separate this code from business logic code and move it somewhere in infrastructure. I spent time looking into source code of MassTransit and GreenPipes. Looks like there are something that can help me (like DynamicFilter/DynamicRouter) but actually I got stuck...


        public async Task Send(ConsumeContext<T> context, IPipe<ConsumeContext<T>> next)
        {
            ConsumeContext<MessageWithPayload> consumeContext;
            if (!context.TryGetMessage(out consumeContext))
            {
                await next.Send(context);
            }
            else
            {
                var message = await _repository.RetrieveMessage<T>(consumeContext.Message.PayloadId);

                await next.Send(???) <---------- The place where I stuck :)
            }
        }

From implementation of TransformFilter I understood that I can create new ConsumeContext:

if (result.IsNewValue)
{
    var transformedContext = new MessageConsumeContext<T>(context, result.Value);

    return next.Send(transformedContext);
}

But I need to change not the value but type of message.

I am relatively newbie in MassTransit and have not ever worked with it on such low level. May be I am going in wrong direction... So will appreciate any help or suggestions.

Thanks

Chris Patterson

unread,
Oct 31, 2019, 11:08:21 PM10/31/19
to masstrans...@googlegroups.com
You would need to intercept with a filter at the ConsumeContext and replace it with a consume context that supports TryGetMessage for your custom message type. That way, your consumer of that message type would be able to get the message and process it. I don't have a sample off hand, but if you look at how the JSON deserializer does it, you may be able to figure it out.


--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/0cf5a863-9d0a-4015-af4d-67c2c61ff645%40googlegroups.com.
Message has been deleted

Volodymyr Kostochka

unread,
Nov 1, 2019, 4:11:51 AM11/1/19
to masstransit-discuss
Thank you Chris. Your suggestion narrowed the area of research.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Volodymyr Kostochka

unread,
Nov 6, 2019, 1:19:50 PM11/6/19
to masstransit-discuss
Hi Chris.

I spent a lot of time trying understand you previous point but looks like I didn't get. I looked into JsonMessageDeserializerJsonConsumeContextMessageConsumeContext, etc. But I can't understand what exactly I should do.

Few more details. As a basis I took sample of middleware from here https://masstransit-project.com/MassTransit/advanced/middleware/custom.html (just renamed it to ) and registered it in this way:

services.AddMassTransit(x =>
{
    x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
    {
        var host = cfg.Host(new Uri(messagingConfig.Uri), hst =>
        {
            hst.Username(messagingConfig.UserName);
            hst.Password(messagingConfig.Password);
        });

        cfg.UsePayloadReferenceHandler();

        //...
    }));
});

'Send' method looks like this:

public async Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
{
    var result = context.TryGetMessage<IPayloadReference>(out var message);
    if (result)
    {
        var payloadType = GetPayloadType(message.Message);
        var value = repository.Get(message.Message.Address, payloadType);

        var newContext = (ConsumeContext)Activator.CreateInstance(typeof(MessageConsumeContext<>).MakeGenericType(payloadType), context, value);

        await next.Send(newContext);
    }
    else
    {
        await next.Send(context);
    }
}

The problem that if I inspect next filters I see that the last one is 'ConsumerMessageFilter<EventConsumer<IPayloadReference>, IPayloadReference>'. So when I am trying to execute 'next.Send(newContext)' I anyway get into the 'IPayloadReference' consumer instead of consumer of type 'payloadType'.

I understand that I missed something but can't understand what exactly. I will be really appreciate if you could provide more details when you will have time.

Thanks.

Chris Patterson

unread,
Nov 6, 2019, 1:30:33 PM11/6/19
to masstrans...@googlegroups.com
You're really close. The issue is that TryGetMessage() needs to be overloaded so that you can return the newly created message type when requested. The default MessageConsumeContext just passes that call back to the JsonConsumeContext - which doesn't know about your payload message type. You handle that yourself and it will properly dispatch to the correct consumer.


To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/2626050b-1bc2-46f3-976c-c1d9d7f4e2e2%40googlegroups.com.

Volodymyr Kostochka

unread,
Nov 6, 2019, 2:15:43 PM11/6/19
to masstransit-discuss
I tried this but without success. I copied MessageConsumeContext into my project, renamed it to 'MessageConsumeContext1' and then used it:

var newContext = (ConsumeContext)Activator.CreateInstance(typeof(MessageConsumeContext1<>).MakeGenericType(payloadType), context, value);
await next.Send(newContext);

I put breakpoint at the beginning of the 'TryGetMessage' method but execution was not get to this method. I put breakpoints on all properties/methods in 'MessageConsumeContext1' to investigate execution flow and got this list of what was executed:

  1. PipeContext.CancellationToken
  2. bool PipeContext.TryGetPayload<T>(out T payload)
  3. bool PipeContext.TryGetPayload<T>(out T payload)
  4. T PipeContext.GetOrAddPayload<T>(PayloadFactory<T> payloadFactory)
  5. PipeContext.CancellationToken
  6. ConsumeContext.ReceiveContext
  7. bool PipeContext.TryGetPayload<T>(out T payload)
  8. bool PipeContext.TryGetPayload<T>(out T payload)
  9. bool PipeContext.TryGetPayload<T>(out T payload)
  10. bool PipeContext.TryGetPayload<T>(out T payload)
  11. bool PipeContext.TryGetPayload<T>(out T payload)
  12. bool PipeContext.TryGetPayload<T>(out T payload)
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.

Chris Patterson

unread,
Nov 6, 2019, 3:56:55 PM11/6/19
to masstrans...@googlegroups.com
This is the method you need to implement, it's what the pipeline uses to dispatch message types to consumers:



To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/4283b7b5-b371-4e8f-9436-6c276d837732%40googlegroups.com.
Reply all
Reply to author
Forward
0 new messages