--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/41b3b66d-5d7d-4f8c-9eda-4f864479088e%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
Hi Chris,
Thanks for the thoughts, definitely helpful in my investigations. There isn't a static type in the sending system that I can use, I wished to give the message a particular type name dynamically for the consumers. Both the sender and receiver are .net, fundamentally what I want to do is override the default behaviour in the sender to specify a message type / contract name instead of the one masstransit defaults / derives from the .net type of the message object provided to the mass transit publish method.
The docs on masstransit interop talk about the masstransit message wrapper field "messageType" which the .net consumers use during consumption / de-serialisation:
http://docs.masstransit-project.com/en/latest/advanced/interop.html
{
"destinationAddress": "rabbitmq://localhost/input_queue",
"headers": {},
"message": {
"value": "Some Value",
"customerId": 27
},
"messageType": [
"urn:message:MassTransit.Tests:ValueMessage"
]
}
Below is some example code sending a message, masstransit automatically includes two values in the message type header (note that RequestContext.Headers is different to the top level headers/wrapper in the raw message pushed to RabbitMQ):
If I modify the code below to explicitly cast messageObject to IMyMessageObjectInterface masstransit omits the concrete type name from the message type header and only includes the interface name.
But there appears no way to override this behaviour (to specify a completely different / customised or additional message type value), the commented out line is what I want to do, but there is no access provided to modify messageType.
var request = await this.bus.PublishRequest(
messageObject,
requestContext =>
{
// requestContext.MessageType = new[] { "urn:message:Computershare.PetShopOneSvc.Contracts:IPetShopOneRequest" };
requestContext.CorrelationId = Guid.NewGuid();
response = requestContext.Handle<IResponseMessage>();
exception = requestContext.Handle<IExceptionMessage>();
fault = requestContext.Handle<Fault>();
requestContext.Durable = false;
requestContext.Timeout = TimeSpan.FromSeconds(10);
requestContext.TimeToLive = TimeSpan.FromSeconds(60);
}).ConfigureAwait(false);
await request.Task;
Thanks and Regards,
Mhano
var requestTask = await DynamicRequestPublisher.Publish(
"Name.Space.DynamicInterface." + contextSpecificInterfaceName,
bus, messageObject, requestContext =>
{
requestContext.CorrelationId = Guid.NewGuid();
response = requestContext.Handle<IResponseInterface>();
exception = requestContext.Handle<IExceptionInterface>();
fault = requestContext.Handle<Fault>();
requestContext.Durable = false;
requestContext.Timeout = TimeSpan.FromSeconds(10);
requestContext.TimeToLive = TimeSpan.FromSeconds(60);
});
await requestTask;
public class DynamicRequestPublisher
{
private readonly Action<RequestContext> _callback;
private DynamicRequestPublisher(Action<RequestContext> callback)
{
_callback = callback;
}
public static async Task<Task> Publish(
string messageTypeName,
IBus bus,
GenericExtensibleMessageBaseObject message,
Action<RequestContext> callback)
{
// TODO: implement caching of all this reflection/type emission for performance
var aName = new AssemblyName("TemporayAssembly_" + Guid.NewGuid().ToString("N"));
var appDomain = Thread.GetDomain();
var aBuilder = appDomain.DefineDynamicAssembly(aName, AssemblyBuilderAccess.Run);
var mBuilder = aBuilder.DefineDynamicModule(aName.Name);
// Create a type with a specific name derrived from GenericExtensibleMessageBaseObject
var tBuilder = mBuilder.DefineType(messageTypeName, TypeAttributes.Public | TypeAttributes.Class);
tBuilder.SetParent(typeof(GenericExtensibleMessageBaseObject));
var generatedType = tBuilder.CreateType();
var messageObject = Activator.CreateInstance(generatedType);
var nt = (GenericExtensibleMessageBaseObject)messageObject;
// copy data elements to new object with specific dynamically named type
nt.prop1 = message.prop1;
nt.Data = message.Data;
var callBackWrapper = new DynamicRequestPublisher(callback);
var callbackTypeInner = typeof(RequestContext<>).MakeGenericType(messageObject.GetType());
var callbackType = typeof(Action<>).MakeGenericType(typeof(RequestContext<>).MakeGenericType(messageObject.GetType()));
var mtype = typeof(DynamicRequestPublisher).GetMethod("DynamicDelegateActionCallBack").MakeGenericMethod(callbackTypeInner);
// target is the instance of the object for which methodName will be invoked
var del = Delegate.CreateDelegate(callbackType, callBackWrapper, mtype);
var mtRequestExtensionMethodType = typeof(RequestExtensions);
var mtPublishRequestMethod = mtRequestExtensionMethodType.GetMethod("PublishRequest");
var genericMtPublishRequestMethod = mtPublishRequestMethod.MakeGenericMethod(messageObject.GetType());
var sendRequestTask = ((Task)genericMtPublishRequestMethod.Invoke(null, new[] { bus, messageObject, del, null }));
var awaitConfiguredTask = sendRequestTask.ConfigureAwait(false);
await awaitConfiguredTask;
var sendResultTask =
sendRequestTask.GetType().GetProperty("Result").GetValue(sendRequestTask);
var resultType = typeof(Request<>);
var genTypeDef = resultType.GetGenericTypeDefinition();
var genType = genTypeDef.MakeGenericType(messageObject.GetType());
var property = genType.GetRuntimeProperty("Task");//.GetValue(task3, null);
var requestTask = (Task)property.GetValue(sendResultTask);
return requestTask;
}
public void DynamicDelegateActionCallBack<T>(T conext)
{
_callback((RequestContext)conext);
}
}
--
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-dis...@googlegroups.com.
To post to this group, send email to masstrans...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/2d6348e5-ca62-44a2-80e3-5fd75bd96af6%40googlegroups.com.
You don't even need a type you can publish an interface and pass in an anonymous object whose properties match and it will initialize them for you.It's one of several overloads to Send and Publish.__
Chris Patterson
On Sun, Jan 29, 2017 at 2:49 AM -0800, "Mhano Harkness" <mhano....@gmail.com> wrote:
Thanks Alexey.--It's not a limited number of interfaces, the publisher/sender in this case is a plugin to a third party system, in that system users name and define custom data structures. These data structures make up a part of the message the plugin we will send.It is the messages from this plugin and the custom data structures within that I would like developers of .net message consumers to be able to simply implement a set of interfaces to de-serialize the specific messages they are interested in.
You received this message because you are subscribed to the Google Groups "masstransit-discuss" group.
To unsubscribe from this group and stop receiving emails from it, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/2d6348e5-ca62-44a2-80e3-5fd75bd96af6%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
--
You received this message because you are subscribed to a topic in the Google Groups "masstransit-discuss" group.
To unsubscribe from this topic, visit https://groups.google.com/d/topic/masstransit-discuss/ruvwXR0AqnA/unsubscribe.
To unsubscribe from this group and all its topics, send an email to masstransit-discuss+unsub...@googlegroups.com.
To post to this group, send email to masstransit-discuss@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/masstransit-discuss/28202EC55A35A71B.D9F6BBD7-7CB3-4F28-85C9-41AEB33A6ADA%40mail.outlook.com.
public class DynamicJsonMessageSerializer : IMessageSerializer { private static readonly Lazy<Encoding> _encoding = new Lazy<Encoding>(() => new UTF8Encoding(false, true), LazyThreadSafetyMode.PublicationOnly);
private readonly string _destinationExchange;
public ContentType ContentType => JsonMessageSerializer.JsonContentType;
public DynamicJsonMessageSerializer(string destinationExchange) { _destinationExchange = destinationExchange; }
public void Serialize<T>(Stream stream, SendContext<T> context) where T : class { try { context.ContentType = ContentType;
var envelope = new JsonMessageEnvelope(context, context.Message, new[] {"urn:message:" + _destinationExchange}); // this is where the magic happens
using (var writer = new StreamWriter(stream, _encoding.Value, 1024, true)) using (var jsonWriter = new JsonTextWriter(writer)) { jsonWriter.Formatting = Formatting.Indented;
JsonMessageSerializer.Serializer.Serialize(jsonWriter, envelope, typeof(MessageEnvelope));
jsonWriter.Flush(); writer.Flush(); } } catch (SerializationException) { throw; } catch (Exception ex) { throw new SerializationException("Failed to serialize message", ex); } } }
string destinationExchange = "My.Namespace:MyMessage"; _bus.Publish(ereignisObj, context => { var rabbitMqContext = (BasicPublishRabbitMqSendContext<JToken>) context;
rabbitMqContext.Serializer = new DynamicJsonMessageSerializer(destinationExchange);
rabbitMqContext.DestinationAddress = new Uri(context.DestinationAddress.ToString().Replace("Newtonsoft.Json.Linq:JToken", destinationExchange));
// Hack, because rabbitMqContext.Exchange has no Setter var exchangeField = typeof(BasicPublishRabbitMqSendContext<JToken>).GetField("<Exchange>k__BackingField", BindingFlags.Instance | BindingFlags.NonPublic); exchangeField.SetValue(rabbitMqContext, destinationExchange);
});
string destinationExchange = "My.Namespace:MyMessage";
var se = await _bus.GetSendEndpoint(new Uri("exchange:" + destinationExchange));
await se.Send(ereignisObj, context =>
{ var rabbitMqContext = (BasicPublishRabbitMqSendContext<JToken>) context; rabbitMqContext.Serializer = new DynamicJsonMessageSerializer(destinationExchange);
});