Override messageType - Publish messages from a dynamic typed system to statically typed consumers

3,564 views
Skip to first unread message

Mhano Harkness

unread,
Jan 27, 2017, 7:51:50 AM1/27/17
to masstransit-discuss
Looking for a way to specify messageType / interface name on a message published from a dynamic system (.net / C#).

The idea is to publish messages with dynamic aspects coming from a dynamically typed source (where the data structure is defined), but allow the consuming clients/services to simply implement a more strongly defined data structure / consumer for specific message types.

If we could override or add to the messageType header, we could specify a class/interface name specific to the type of message coming from the dynamic source system (where the data structures are defined and named externally to the code), consuming services could simply implement an interface to consume the messages/aspects of the messages they are interested in.

Any thoughts welcome.

Cheers,
Mhano

Chris Patterson

unread,
Jan 27, 2017, 1:17:18 PM1/27/17
to masstrans...@googlegroups.com
If I'm reading this correctly, it's exactly how MassTransit works. At least with transports that support it nicely (RabbitMQ).

If you publish a type (class) that implements several interfaces, all of those interfaces are included in the message body. Any subscribed consumers who are consuming any of those interfaces will get a copy of the message, and process it accordingly. It's polymorphic messaging, and it's very flexible.


This explains it in terms of versioning interface contracts:




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/41b3b66d-5d7d-4f8c-9eda-4f864479088e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Message has been deleted

Mhano Harkness

unread,
Jan 28, 2017, 8:18:27 AM1/28/17
to masstransit-discuss

Hi Chris,

 

Thanks for the thoughts, definitely helpful in my investigations. There isn't a static type in the sending system that I can use, I wished to give the message a particular type name dynamically for the consumers. Both the sender and receiver are .net, fundamentally what I want to do is override the default behaviour in the sender to specify a message type / contract name instead of the one masstransit defaults / derives from the .net type of the message object provided to the mass transit publish method.

 

The docs on masstransit interop talk about the masstransit message wrapper field "messageType" which the .net consumers use during consumption / de-serialisation:

 

http://docs.masstransit-project.com/en/latest/advanced/interop.html

{

    "destinationAddress": "rabbitmq://localhost/input_queue",

    "headers": {},

    "message": {

        "value": "Some Value",

        "customerId": 27

    },

    "messageType": [

        "urn:message:MassTransit.Tests:ValueMessage"

    ]

}

 

Below is some example code sending a message, masstransit automatically includes two values in the message type header (note that RequestContext.Headers is different to the top level headers/wrapper in the raw message pushed to RabbitMQ):

  • urn:message:name.space:MyMessageObject
  • urn:message:name.space:IMyMessageObjectInterface

If I modify the code below to explicitly cast messageObject to IMyMessageObjectInterface masstransit omits the concrete type name from the message type header and only includes the interface name.

 

But there appears no way to override this behaviour (to specify a completely different / customised or additional message type value), the commented out line is what I want to do, but there is no access provided to modify messageType.

 

var request = await this.bus.PublishRequest(

    messageObject,

    requestContext =>

    {

        // requestContext.MessageType = new[] { "urn:message:Computershare.PetShopOneSvc.Contracts:IPetShopOneRequest" };

        requestContext.CorrelationId = Guid.NewGuid();

        response = requestContext.Handle<IResponseMessage>();

        exception = requestContext.Handle<IExceptionMessage>();

        fault = requestContext.Handle<Fault>();

        requestContext.Durable = false;

        requestContext.Timeout = TimeSpan.FromSeconds(10);

        requestContext.TimeToLive = TimeSpan.FromSeconds(60);

    }).ConfigureAwait(false);

await request.Task;

 

Thanks and Regards,

Mhano

 

Alexey Zimarev

unread,
Jan 28, 2017, 1:44:12 PM1/28/17
to masstransit-discuss
There are Publish<T>(object message) and Send<T>(object message) overloads that allow you to specify any time you want and use anonymous object as parameter.

Mhano Harkness

unread,
Jan 29, 2017, 1:32:23 AM1/29/17
to masstransit-discuss
Thanks Alexey,

Publish without a compiled type/interface throws an error (either from the passed object or specifically added in the call of the method).

On the below examples assume messageObject is something like: var messageObject = new { anonTypeParams.... } or a concrete class with base properties plus some dynamic / dictionary properties.

Publish(messageObject):

{"Message":"An error has occurred.","ExceptionMessage":"Anonymous types are not valid message types","ExceptionType":"MassTransit.MessageException","StackTrace":" at MassTransit.RabbitMqTransport.RabbitMqPublishEndpointProvider.CreateSendEndpoint(Type messageType)\r\n at System.Lazy`1.CreateValue()\r\n at System.Lazy`1.LazyInitValue()\r\n at MassTransit.RabbitMqTransport.RabbitMqPublishEndpointProvider.d__10.MoveNext()\r\n--- End of stack trace from previous location where ...

Explicitly calling Publish with an interface or type and an anon type/dynamic object works:

Publish<InterfaceName>(messageObject)

Using this doesn't solve the problem, as masstransit still needs a compiled type/interface with a specific name.

I did build a work-around with reflection.emit (dynamically create the type) and a lot of reflection to call masstransit publish with that dynamically created type argument (about 100 lines of horrible reflection code). Still looking for a cleaner solution :-)

Cheers,
Mhano

Usage:

var requestTask = await DynamicRequestPublisher.Publish(

    "Name.Space.DynamicInterface." + contextSpecificInterfaceName,

    bus, messageObject, requestContext =>

    {

        requestContext.CorrelationId = Guid.NewGuid();

        response = requestContext.Handle<IResponseInterface>();

        exception = requestContext.Handle<IExceptionInterface>();

        fault = requestContext.Handle<Fault>();

        requestContext.Durable = false;

        requestContext.Timeout = TimeSpan.FromSeconds(10);

        requestContext.TimeToLive = TimeSpan.FromSeconds(60);

    });

await requestTask;


Helper Class:

public class DynamicRequestPublisher

{

    private readonly Action<RequestContext> _callback;

 

    private DynamicRequestPublisher(Action<RequestContext> callback)

    {

        _callback = callback;

    }

 

    public static async Task<Task> Publish(

            string messageTypeName,

            IBus bus,

            GenericExtensibleMessageBaseObject message,

            Action<RequestContext> callback)

    {

        // TODO: implement caching of all this reflection/type emission for performance

 

        var aName = new AssemblyName("TemporayAssembly_" + Guid.NewGuid().ToString("N"));

        var appDomain = Thread.GetDomain();

        var aBuilder = appDomain.DefineDynamicAssembly(aName, AssemblyBuilderAccess.Run);

        var mBuilder = aBuilder.DefineDynamicModule(aName.Name);

 

        // Create a type with a specific name derrived from GenericExtensibleMessageBaseObject

        var tBuilder = mBuilder.DefineType(messageTypeName, TypeAttributes.Public | TypeAttributes.Class);

        tBuilder.SetParent(typeof(GenericExtensibleMessageBaseObject));

           

        var generatedType = tBuilder.CreateType();

 

        var messageObject = Activator.CreateInstance(generatedType);

        var nt = (GenericExtensibleMessageBaseObject)messageObject;

 

        // copy data elements to new object with specific dynamically named type

        nt.prop1 = message.prop1;

        nt.Data = message.Data;

 

        var callBackWrapper = new DynamicRequestPublisher(callback);

 

        var callbackTypeInner = typeof(RequestContext<>).MakeGenericType(messageObject.GetType());

        var callbackType = typeof(Action<>).MakeGenericType(typeof(RequestContext<>).MakeGenericType(messageObject.GetType()));

 

        var mtype = typeof(DynamicRequestPublisher).GetMethod("DynamicDelegateActionCallBack").MakeGenericMethod(callbackTypeInner);

        // target is the instance of the object for which methodName will be invoked

        var del = Delegate.CreateDelegate(callbackType, callBackWrapper, mtype);

 

        var mtRequestExtensionMethodType = typeof(RequestExtensions);

        var mtPublishRequestMethod = mtRequestExtensionMethodType.GetMethod("PublishRequest");

        var genericMtPublishRequestMethod = mtPublishRequestMethod.MakeGenericMethod(messageObject.GetType());

        var sendRequestTask = ((Task)genericMtPublishRequestMethod.Invoke(null, new[] { bus, messageObject, del, null }));

        var awaitConfiguredTask = sendRequestTask.ConfigureAwait(false);

 

        await awaitConfiguredTask;

 

        var sendResultTask = sendRequestTask.GetType().GetProperty("Result").GetValue(sendRequestTask);

        var resultType = typeof(Request<>);

        var genTypeDef = resultType.GetGenericTypeDefinition();

        var genType = genTypeDef.MakeGenericType(messageObject.GetType());

        var property = genType.GetRuntimeProperty("Task");//.GetValue(task3, null);

        var requestTask = (Task)property.GetValue(sendResultTask);

           

        return requestTask;

    }

 

    public void DynamicDelegateActionCallBack<T>(T conext)

    {

        _callback((RequestContext)conext);

    }

}

 


Alexey Zimarev

unread,
Jan 29, 2017, 4:12:15 AM1/29/17
to masstransit-discuss
I would like to get back to the nature of the problem. From what I remember you want to do, I understand that you have specific interfaces that consumers use. This means that you have a limited number of interfaces that are available or can be made available for the publisher only.

If this is the case, it reminds me on something that I recently did. Instead of diving into reflections and dynamic stuff, I have made a message factory that builds messages of a specific type. The factory itself uses a dictionary/map with some discriminator value as the key and either message interface type (so reflection can be used to create an instance) or simple Func<IBaseMessageType> or Func<object> factory function.

If you have a limited number of such types, like, I don't know, 20, it works fine, the only effect in the code is this map initialisation.

Mhano Harkness

unread,
Jan 29, 2017, 5:49:15 AM1/29/17
to masstransit-discuss
Thanks Alexey.

It's not a limited number of interfaces, the publisher/sender in this case is a plugin to a third party system, in that system users name and define custom data structures. These data structures make up a part of the message the plugin we will send.

It is the messages from this plugin and the custom data structures within that I would like developers of .net message consumers to be able to simply implement a set of interfaces to de-serialize the specific messages they are interested in.

Chris Patterson

unread,
Jan 29, 2017, 6:17:31 PM1/29/17
to masstrans...@googlegroups.com
You don't even need a type you can publish an interface and pass in an anonymous object whose properties match and it will initialize them for you.

It's one of several overloads to Send and Publish. 

__
Chris Patterson




--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/2d6348e5-ca62-44a2-80e3-5fd75bd96af6%40googlegroups.com.

Mhano Harkness

unread,
Feb 5, 2017, 8:22:23 PM2/5/17
to masstrans...@googlegroups.com
Thanks Chris,

The closest I could find to what you describe is below, it still requires a concrete type or a compile time interface.

Moving away from request/reply and using publish did simplify some things, but still ultimately needed to use Reflection.Emit to create a type/interface with a name based on the user supplied input (from system that this plugin will sit in).

Cheers,
Mhano


public static System.Threading.Tasks.Task Publish(this MassTransit.IPublishEndpoint endpoint, object message, System.Type messageType, System.Action<MassTransit.PublishContext> callback, [System.Threading.CancellationToken cancellationToken = null])
    Member of MassTransit.PublishContextExecuteExtensions

Summary:
Publish a message, using a callback to modify the publish context instead of building a pipe from scratch

Parameters:
endpoint: The endpoint to send the message
message: The message
messageType: The message type to send the object as
callback: The callback for the send context
cancellationToken: To cancel the send from happening

On 30 January 2017 at 10:17, Chris Patterson <ch...@phatboyg.com> wrote:
You don't even need a type you can publish an interface and pass in an anonymous object whose properties match and it will initialize them for you.

It's one of several overloads to Send and Publish. 

__
Chris Patterson




On Sun, Jan 29, 2017 at 2:49 AM -0800, "Mhano Harkness" <mhano....@gmail.com> wrote:

Thanks Alexey.

It's not a limited number of interfaces, the publisher/sender in this case is a plugin to a third party system, in that system users name and define custom data structures. These data structures make up a part of the message the plugin we will send.

It is the messages from this plugin and the custom data structures within that I would like developers of .net message consumers to be able to simply implement a set of interfaces to de-serialize the specific messages they are interested in.

--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.

--
You received this message because you are subscribed to a topic in the Google Groups "masstransit-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/masstransit-discuss/ruvwXR0AqnA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/28202EC55A35A71B.D9F6BBD7-7CB3-4F28-85C9-41AEB33A6ADA%40mail.outlook.com.

Phillip

unread,
May 15, 2020, 4:18:56 AM5/15/20
to masstransit-discuss
I ran into the same issue the other day and came up with a solution that does not require dynamically created types.
Basically I'm overriding the Serializer to pass in the actual messageType:

public class DynamicJsonMessageSerializer : IMessageSerializer
    {
        private static readonly Lazy<Encoding> _encoding = new Lazy<Encoding>(() => new UTF8Encoding(false, true), LazyThreadSafetyMode.PublicationOnly);

        private readonly string _destinationExchange;

        public ContentType ContentType => JsonMessageSerializer.JsonContentType;

        public DynamicJsonMessageSerializer(string destinationExchange)
        {
            _destinationExchange = destinationExchange;
        }

        public void Serialize<T>(Stream stream, SendContext<T> context)
            where T : class
        {
            try
            {
                context.ContentType = ContentType;

                var envelope = new JsonMessageEnvelope(context, context.Message, new[] {"urn:message:" + _destinationExchange}); // this is where the magic happens

                using (var writer = new StreamWriter(stream, _encoding.Value, 1024, true))
                using (var jsonWriter = new JsonTextWriter(writer))
                {
                    jsonWriter.Formatting = Formatting.Indented;

                    JsonMessageSerializer.Serializer.Serialize(jsonWriter, envelope, typeof(MessageEnvelope));

                    jsonWriter.Flush();
                    writer.Flush();
                }
            }
            catch (SerializationException)
            {
                throw;
            }
            catch (Exception ex)
            {
                throw new SerializationException("Failed to serialize message", ex);
            }
        }
    }

Then, I tell MassTransit to use this Serializer in the Publish call:

            string destinationExchange = "My.Namespace:MyMessage";
            
            _bus.Publish(ereignisObj, context =>
            {
                var rabbitMqContext = (BasicPublishRabbitMqSendContext<JToken>) context;

                rabbitMqContext.Serializer = new DynamicJsonMessageSerializer(destinationExchange);

                rabbitMqContext.DestinationAddress = new Uri(context.DestinationAddress.ToString().Replace("Newtonsoft.Json.Linq:JToken", destinationExchange));

                // Hack, because rabbitMqContext.Exchange has no Setter
                var exchangeField = typeof(BasicPublishRabbitMqSendContext<JToken>).GetField("<Exchange>k__BackingField", BindingFlags.Instance | BindingFlags.NonPublic);
                exchangeField.SetValue(rabbitMqContext, destinationExchange);

            });

The only thing I'd consider a bit hacky is that I need to use reflection to set the read-only Property Exchange. In my opinion this property should be made writable, because DestinationAddress is also writeable and setting one without the other does not make sense.

Best

Phillip

unread,
Jun 6, 2020, 7:01:50 AM6/6/20
to masstransit-discuss
Meanwhile I've realized there's no need to use Publish. It can be achieved with Send much easier, eliminating all the hacky-ness.

            string destinationExchange = "My.Namespace:MyMessage";

            var se = await _bus.GetSendEndpoint(new Uri("exchange:" + destinationExchange));

            await se.Send(ereignisObj, context =>
            {
                var rabbitMqContext = (BasicPublishRabbitMqSendContext<JToken>) context;
                rabbitMqContext.Serializer = new DynamicJsonMessageSerializer(destinationExchange);
            });


Reply all
Reply to author
Forward
0 new messages