How to use async/await in message handlers?

497 views
Skip to first unread message

Manuel Pallier

unread,
Aug 25, 2016, 9:52:25 AM8/25/16
to Particular Software
Helpful information to include
Product name: NServiceBus
Version: 5.2.15
Stacktrace: -
Description:

How can I use async/await in IHandleMessages.Handle methods without losing context information like IBus.CurrentMessageContext (to use IBus.Reply) or Transaction.Current (the current TransactionScope)?

I already figured out that async void methods won't work because NServiceBus can't know that the message handler hasn't finished yet. Therefore I tried having the async code in a seperate method and calling Task.Wait() in the message handler. This theory was supported by the following post: https://github.com/Particular/NServiceBus/issues/2220#issuecomment-49273832

But even with this method the context is lost after the first await call in the inner method. Sample:

    public void Handle(ObjectMessage message)
    {
        HandleCore(message).Wait();
    }

    private async Task HandleCore(ObjectMessage message)
    {
        log.Info("Message received, Returning");

        await Task.Delay(100);

        bus.Reply(new ObjectResponseMessage());
    }

Here both IBus.CurrentMessageContext and Transaction.Current are null after the await call. And therefore the call to IBus.Reply throws an exception with the message "There is no current message being processed".


The only way I found to call async methods in message handlers is to always use Task.Wait or Task.Result instead of await. Is there really no better way? I know that NServiceBus 6 will solve this issue, but I can't upgrade before it is released.

Manuel Pallier

unread,
Aug 25, 2016, 11:11:07 AM8/25/16
to Particular Software
I just realized that Task.Wait or Task.Result is no solution either. If it doesn't work in the first level (directly in the Handle method), it won't work deeper down either. But that would mean that I can't have an await call anywhere in a message handler, even if it's in some other libraries code.

That can't be true, right? Because that would mean it is impossible to call async-only APIs from message handlers. I really hope that I'm missing something fundamental here.

Manuel Pallier

unread,
Aug 26, 2016, 6:46:12 AM8/26/16
to Particular Software
Seems like I found a solution. I need to handle both problems separately:

1. IBus.Reply has to be called in the non-async Handle method because that is the only way to call it on the same thread that called the Handle method.
2. The TransactionScope can be made async-aware by creating a nested TransactionScope.

Example for both:

    public void Handle(ObjectMessage message)
    {
        using (var scope = new TransactionScope(TransactionScopeOption.Required, TransactionScopeAsyncFlowOption.Enabled))
        {
            HandleCore().Wait();

            bus.Reply(new ObjectResponseMessage());

            scope.Complete();
        }
    }

    private async Task HandleCore()

    {
        log.Info("Message received, Returning");

        await Task.Delay(100);
    }


Is this solution ok? Or does anyone know any problems that could happen when handling messages this way?

Daniel Marbach

unread,
Aug 30, 2016, 3:51:22 AM8/30/16
to particula...@googlegroups.com
Hi Manuel

Async and await in v5 can be complex like you found out. That's why we are redesigning NServiceBus for v6 to be fully async. Let me give you a bit of background why it doesn't work out of the box in v5.

The bus implementation uses a ThreadLocal to stack all currently executing contexts (one of these contexts will end up containing the information of the incoming message)

ThreadLocal<Stack<BehaviorContext>> behaviorContextStack = new ThreadLocal<Stack<BehaviorContext>>(() => new Stack<BehaviorContext>());

In your first code example you wrote

        await Task.Delay(100);
        bus.Reply(new ObjectResponseMessage());

the bus.Reply part is a continuation of the previous await statement. That continuation might be scheduled on any worker thread in the worker thread pool. 

    public void Handle(ObjectMessage message)
    {
       // Thread 42

        HandleCore(message).Wait();
    }

    private async Task HandleCore(ObjectMessage message)
    {
         // Thread 42

        log.Info("Message received, Returning");
        // Thread 42 and released
        await Task.Delay(100);
        // Maybe Thread 42 or any other thread
        bus.Reply(new ObjectResponseMessage());
    }

So depending on the actual scheduling the code above might work if the continuation happens to be executed on Thread 42 (42 is just a silly number I came up with ;) ) or might not work if another Thread takes over the continuation. Because in that case the ThreadLocal I showed above will be reset to an empty stack and for NServiceBus it then looks like you are using the Reply method outside the context of a handler. This then produces the behavior you saw.

Like you rightfully pointed out you have to restructure your code to spawn transaction scopes at the top level of the handle method and synchronize access to the async methods with .Wait() or better .GetAwaiter().GetResult() since this will unwrap the aggregate exception for you.

Only then the transaction scope is flowed into the async continuations and the bus.Reply will happen on the same thread that kicked off your handler.

public void Handle(ObjectMessage message)
    {
        // Thread 42
        using (var scope = new TransactionScope(TransactionScopeOption.Required, TransactionScopeAsyncFlowOption.Enabled)) // TransactionScopeAsyncFlowOption only needed if HandleCore needs to see Transaction.Current
        {
            // Thread 42
            HandleCore().GetAwaiter().GetResult();
            // Thread 42
            bus.Reply(new ObjectResponseMessage());
            // Thread 42

            scope.Complete();
        }
    }

    private async Task HandleCore()

    {
        // Thread 42

        log.Info("Message received, Returning");

        await Task.Delay(100);
        // // Thread 42 or potentially any other thread
    }

Someone might argue that this is a bad thing of async/await. It would argue it is actually a good thing. In your case it forces you to seperate the "core" logic which is async from the actual message interaction and moves the messaging into the top level of the handler where it belongs. The code is simpler to reason about because it follows the following structure

void Handle(message) {
   // messaging side effects
   // core logic / side effects
   // messaging side effects
}

even with v6 I would structure the code that way.

Hope that helps
Daniel

Daniel Marbach

unread,
Aug 30, 2016, 4:19:17 AM8/30/16
to particula...@googlegroups.com
Hi Manual

If you, despite my arguments ;), would like to mix core logic with bus operations you can achieve that by using the AsyncPump described here

http://blogs.msdn.com/b/pfxteam/archive/2012/01/20/10259049.aspx

The async pump will force the continuations onto a single threaded synchronization context. You could use the code in your first example like this

  public void Handle(ObjectMessage message)
    {
        AsyncPump.Run(() => HandleCore(message));

    }

    private async Task HandleCore(ObjectMessage message)
    {
        log.Info("Message received, Returning");

        await Task.Delay(100);

        bus.Reply(new ObjectResponseMessage());
    }

BUT there is a huge caveat. As soon as you throw in a ConfigureAwait(false) it will no longer work. So if you'd change the above code to 

    private async Task HandleCore(ObjectMessage message)
    {
        log.Info("Message received, Returning");

        await Task.Delay(100).ConfigureAwait(false);

        bus.Reply(new ObjectResponseMessage());
    }

it will start throwing the same exception you saw. Not even the async pump can solve this.

Daniel

Manuel Pallier

unread,
Aug 30, 2016, 5:21:58 AM8/30/16
to Particular Software
Hi Daniel,

Thanks for your detailed answer! I'll definitely stay with the first approach.
Also thanks for the tip with GetAwaiter().GetResult(). Very nice to have the AggregateExcaption unwrapped automatically.
Reply all
Reply to author
Forward
0 new messages