Excel RTD issue when uses ExcelAsyncUtil.Run

967 views
Skip to first unread message

Elvis Hsu

unread,
Jun 9, 2016, 1:06:39 PM6/9/16
to Excel-DNA
Hi, Govert,

First of all, thank you so much for your precious time on helping me out.


I have made an UDF to subscribe IExcelObservable to listen the stock ticks and it works perfectly without any Excel RTD memory because it keeps using the same topic keys
For example, =MarketDataSubscribe(Ticker, Field, SourceFeed). This allows a set of ranges to update the stock values in order for users to submit his/her own calculations to our server.
The submission is another UDF =MarketDataContribute(NewTicker, NewField, CalculatedValue, DestinationServer) where it uses the following approach

private static HttpClient client = new HttpClient(new HttpClientHandler());

// Example function
public static MarketDataContribute(string ticker, string field, double calculatedValue, string server){
   
var asyncResult = ExcelAsyncUtil.Run("MarketData.Contribute", $"{ticker}|{field}|{server}", handle=>{
         queue.Enque(()=>{
             
var result = client.PostAsJsonAsync(url, httpContentWithValue);
             handle
.SetResult(result);
         
});
   });

     return asyncResult.Equals(ExcelError.ExcelErrorNA)
         ? "#Processing..."
         : asyncResult;
}


As you can see the difference between MarketDataSubscribe and MarketDataContribute is the MarketDataSubscribe does not change its parameters but MarketDataContribute does as value changes from the result of MarketDataSubscribe
Due to the nature of MarketDataContribute UDF, the action is called when the MarketDataSubscribe result is changed (ie. =(A1+B1)/2) (the RTD will call IRtdServer.DisconnectData(int topicId) then), so a new topic is created with new GUID as key for MarketDataContribute.

The reason I want to use ExcelAsyncUtil.Run is that it will not freeze the Excel GUI as the workbook may have hundred of ranges calling MarketDataContribute and each call may take 50-100 ms.

Well, if a workbook executes 150-200 MarketDataContribute per second, then it will last about 5 to 6 hours until Excel complains out of memory. However in some cases, it may have 500-700 MarketDataContribute calls per second during the peak hour (ie. when market is just open). 

In short, in your opinion, what will be the best solution for me to continue using the existing features of ExcelDNA without implementing my own RTD server? or Should I just extend your ExcelRtdServer and implement the fix set of topicInfo?

Once again, thank you so much for your help.

Cheers,
Elvis
Message has been deleted

Elvis Hsu

unread,
Jun 10, 2016, 3:11:03 AM6/10/16
to Excel-DNA

Govert van Drimmelen

unread,
Jun 10, 2016, 6:13:25 AM6/10/16
to exce...@googlegroups.com
Hi Elvis,

With the RTD topic caching you've certainly hit on an interesting Excel quirk that I did not know about.

Thank you for the Microsoft links, particularly:

----

Regarding memory usage, there's a coming update to Excel that will make a huge difference to the amount of memory available to 32-bit instances: https://blogs.technet.microsoft.com/the_microsoft_excel_support_team_blog/2016/06/07/excel-2013-and-excel-2016-large-address-aware-updates/ 
Microsoft is officially enabling Excel 2013 and 2016 for Large Adress Aware (LAA) mode (in the past it was possible by editing the Excel binary).

About our issue:
Could you show what your MarketDataSubscribe function looks like?
I only see MarketDataContribute, so it's tricky to follow your comments.

You must be careful in your assumptions about how Excel's RTD feature works. There is no guarantee that every update gets processed. 500 updates per second might well not all be processed by Excel, even if you set the RTD update interval to a low value or 0. 
A more reliable architecture would be to build the update pipeline using Reactive Extensions or TPL DataFlow, and have Excel only 'monitor' (every second or so) the latest values.
If you do need to use Excel as the calculation engine, you should probably control the update, calculate, read cycle yourself.
But that's quite separate to the issue here.

If you want to make your own RTD server, I would still suggest that you base it on the ExcelRtdServer base class. This base class makes for a simpler interface, handles all the tricky threading issues, and does not interfere with the Excel topic caching that is the issue we're concerned with here.

On top of this, or on top of the existing IObservable support, one might implement something like the Excel-DNA async, but reusing the topic ids.

I'd even be happy to improve the Excel-DNA async to re-use async topic Guids. We'd keep track of TopicIds in use, and when they are disconnected we put them in a pool for reuse later?

-Govert





From: exce...@googlegroups.com [exce...@googlegroups.com] on behalf of Elvis Hsu [elvis...@gmail.com]
Sent: 09 June 2016 07:06 PM
To: Excel-DNA
Subject: [ExcelDna] Excel RTD issue when uses ExcelAsyncUtil.Run

--
You received this message because you are subscribed to the Google Groups "Excel-DNA" group.
To unsubscribe from this group and stop receiving emails from it, send an email to exceldna+u...@googlegroups.com.
To post to this group, send email to exce...@googlegroups.com.
Visit this group at https://groups.google.com/group/exceldna.
For more options, visit https://groups.google.com/d/optout.

Govert van Drimmelen

unread,
Jun 10, 2016, 6:19:17 AM6/10/16
to exce...@googlegroups.com
Yes - that does look like the same issue!

-Govert


Sent: 10 June 2016 09:11 AM
To: Excel-DNA
Subject: [ExcelDna] Re: Excel RTD issue when uses ExcelAsyncUtil.Run

Elvis Hsu

unread,
Jun 10, 2016, 8:16:49 AM6/10/16
to Excel-DNA
Hi, Govert,

I have updated a new version of the VS solution as attached. The attachment also contains two test workbooks which use two different implementations for contributions.

The MarketDataContribution1 uses "ExcelAsyncUtil.Run" overload
The MarketDataContribution2 uses my custom RTD to update the range values

The interesting part is even I set the fixed set of parameters (the ticket/field combination) in ExcelAsyncUtil.Run's parameters, the topic is still get disconnected when the next call is executed, hence a new GUID is generated?
However, when I use the MarketDataContribution2 by calling XlCall.RTD with ticket/field combination as topic info, the DisconnectData never gets called which is what I expected.

Well, I am still digging around your code but haven't figured out the reason why ExcelDna keeps generating a new topic even the same AsyncInfo is provided?

Cheers,
Elvis


--- The following piece of code shows you the concept of the implementation that I am doing.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using ExcelDna.Integration;
using ExcelDna.Integration.Rtd;
using System.Threading;

namespace ExcelDnaMemoryLeak
{
    public class Main: IExcelAddIn
    {
        public void AutoClose()
        {
            //throw new NotImplementedException();
        }

        public void AutoOpen()
        {
            //throw new NotImplementedException();
        }
    }

    public static class Functions
    {
        private static readonly Random _random = new Random();
        private static readonly ConcurrentDictionary<string, IObserver<object>> _observers = new ConcurrentDictionary<string, IObserver<object>>();
        private static readonly Timer _fakeMarketDataTicker = new Timer(FakeMarketDataTicking, null, 0, 500);
        private static long _demoCount;


        [ExcelFunction(Name = "MarketDataSubscibe")]
        public static object MarketDataSubscibe(string ticker, string field)
        {
            return RxExcel.Observe("MarketDataSubscription", $"{ticker}|{field}", GetObserver($"{ticker}|{field}"));
        }

        [ExcelFunction(Name = "MarketDataContribution1")]
        public static object MarketDataContribution1(string ticker, string field, object value)
        {
            var key = $"{ticker}|{field}";
            // field1 should be the key because it never changes
            var asyncResult = ExcelAsyncUtil.Run("MarketDataContribution", key, h =>
            {
                ProcessContributionTask(h, value);
            });

            return asyncResult.Equals(ExcelError.ExcelErrorNA)
                ? "#Processing..."
                : asyncResult;
        }

        [ExcelFunction(Name = "MarketDataContribution2")]
        public static object MarketDataContribution2(string ticker, string field, object value)
        {
            var key = $"{ticker}|{field}";
            MarketDataContriButionRtdServer.TopicInfoHolder[key] = value; // we mimic updating the value
            return XlCall.RTD("ExcelDnaMemoryLeak.MarketDataContriButionRtdServer", null, $"{ticker}|{field}");
        }

        private static void ProcessContributionTask(ExcelAsyncHandle handle, object value)
        {
            Task.Run(async () =>
            {
                // mimic some long run task. For example, http request
                await Task.Delay(_random.Next(200, 997));
                var result = $"LEAK OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})";
                handle.SetResult(result);
            });
        }

        public static Func<IObservable<object>> GetObserver(string key)
        {
            return () => Observable.Create<object>(observer =>
            {
                IObserver<object> obsEx;
                if (!_observers.TryGetValue(key, out obsEx))
                {
                    //Logger.Trace($"{LogCaption} Adding observer for: {key}");

                    if (_observers.TryAdd(key, observer))
                    {
                        observer.OnNext("#Loading...");
                    }
                }

                return Disposable.Create(() =>
                {
                    IObserver<object> obs;

                    if (_observers.TryRemove(key, out obs))
                    {
                        obs.OnCompleted();
                    }
                });
            });
        }

        private static void FakeMarketDataTicking(object state)
        {
            foreach (var observer in _observers)
            {
                if((int)(_random.NextDouble() * 100) % 5 == 0) // we just want some random ticks
                    observer.Value.OnNext(_random.NextDouble());
            }

            if (_demoCount == 0) // give it initial values
            {
                foreach (var observer in _observers)
                {
                    observer.Value.OnNext(_random.NextDouble());
                }
            }
            _demoCount++;
        }

    }

    [ComVisible(true)]
    public class MarketDataContriButionRtdServer : ExcelRtdServer
    {
        private readonly ConcurrentDictionary<int, Topic> _topics;
        private Timer _timer;
        private readonly Random _rand;
        public static ConcurrentDictionary<string, object> TopicInfoHolder = new ConcurrentDictionary<string, object>();
        public static ConcurrentDictionary<int, string> TopicIdHolder = new ConcurrentDictionary<int, string>();

        public MarketDataContriButionRtdServer()
        {
            _topics = new ConcurrentDictionary<int, Topic>();
            _rand = new Random();
            _timer = new Timer(delegate
            {
                foreach (var topic in _topics)
                {
                    string key;
                    if (TopicIdHolder.TryGetValue(topic.Key, out key))
                    {
                        object value;
                        if (TopicInfoHolder.TryGetValue(key, out value))
                        {
                            var result = $"NOLEAK OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})";
                            topic.Value.UpdateValue(result);
                        }
                    }
                }
            }, null, 0, 500);
        }

        protected override Topic CreateTopic(int topicId, IList<string> topicInfo)
        {
            TopicIdHolder[topicId] = topicInfo[0];
            return base.CreateTopic(topicId, topicInfo);
        }

        protected override object ConnectData(Topic topic, IList<string> topicInfo, ref bool newValues)
        {
            Debug.WriteLine("ConnectData: {0} - {{{1}}}", topic.TopicId, string.Join(", ", topicInfo));
            _topics[topic.TopicId] = topic;
            return ExcelErrorUtil.ToComError(ExcelError.ExcelErrorNA);
        }

        protected override void DisconnectData(Topic topic)
        {
            Topic t;
            if (_topics.TryRemove(topic.TopicId, out t))
            {
                string key;
                if (TopicIdHolder.TryRemove(topic.TopicId, out key))
                {
                    object value;
                    TopicInfoHolder.TryRemove(key, out value);
                }
            }
            Debug.WriteLine("DisconnectData: {0}", topic.TopicId);
        }
    }
    
    public static class RxExcel
    {
        public static IExcelObservable ToExcelObservable<T>(this IObservable<T> observable)
        {
            return new ExcelObservable<T>(observable);
        }

        public static object Observe<T>(string functionName, object parameters, Func<IObservable<T>> observableSource)
        {
            return ExcelAsyncUtil.Observe(functionName, parameters, () => observableSource().ToExcelObservable());
        }
    }

    public class ExcelObservable<T> : IExcelObservable
    {
        private readonly IObservable<T> _observable;

        public ExcelObservable(IObservable<T> observable)
        {
            _observable = observable;
        }

        public virtual IDisposable Subscribe(IExcelObserver observer)
        {
            return _observable.Subscribe(value => observer.OnNext(value), observer.OnError, observer.OnCompleted);
        }
    }
}


ExcelDnaMemoryLeak.zip

Govert van Drimmelen

unread,
Jun 10, 2016, 9:18:20 AM6/10/16
to exce...@googlegroups.com
Hi Elvis,

ExcelAsyncUtil.Run is implemented as a single-result topic on top of the ExcelRtdObserver / IObservable implementation.
The implementation details are in the ThreadPoolDelegateObservable class (in ExcelRtdObserver.cs). There you see the work that is done looks like this:
                object result = _func();
                observer.OnNext(result);
                observer.OnCompleted();

The OnCompleted() is what makes the IObservable and hence the RTD topic eventually Disconnect.
A future call then becomes a new Topic, even with the same AsyncInfo, as you observe.
So that part is working as expected - every ExcelAsyncUtil.Run call makes a new topic.

-Govert



Sent: 10 June 2016 02:16 PM
To: Excel-DNA
Subject: Re: [ExcelDna] Excel RTD issue when uses ExcelAsyncUtil.Run

Elvis Hsu

unread,
Jun 10, 2016, 12:37:48 PM6/10/16
to Excel-DNA
Hi, Govert,

I think I have found a workaround which implements RxExcel.Observe with the the GetObserver function I mentioned at the above (and the following) piece of code.
The key is NOT to return other results (if you have some validation logics) BEFORE calling RxExcel.Observe as this will force Excel to call DisconnectData which forces ExcelDna to create a new topic GUID.

Well, everything works perfectly as the topics are reused by Excel :)

Once again, thank you Govert for pointing me out the good direction. Do you thing we should put this piece of trick on the Git Wiki? Or is there any information regarding this topic already exists?

Cheers,
Elvis

-- The Working version (MarketDataContribution3 is the working one in my case)
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using ExcelDna.Integration;
using ExcelDna.Integration.Rtd;
using System.Threading;

namespace ExcelDnaMemoryLeak
{
    public class Main: IExcelAddIn
    {
        public void AutoClose()
        {
            //throw new NotImplementedException();
        }

        public void AutoOpen()
        {
            //throw new NotImplementedException();
        }
    }

    public static class Functions
    {
        private static readonly Random _random = new Random();
        private static readonly ConcurrentDictionary<string, IObserver<object>> _observers = new ConcurrentDictionary<string, IObserver<object>>();
        private static readonly Timer _fakeMarketDataTicker = new Timer(FakeMarketDataTicking, null, 0, 500);
        private static readonly ConcurrentDictionary<string, object> _processing = new ConcurrentDictionary<string, object>();
        private static long _demoCount;


        [ExcelFunction(Name = "MarketDataSubscibe")]
        public static object MarketDataSubscibe(string ticker, string field)
        {
            return RxExcel.Observe("MarketDataSubscription", $"{ticker}|{field}", GetObserver($"{ticker}|{field}"));
        }

        // This one will force ExcelDna to call OnCompleted() and then Excel will not resue the RTD tpoics
        [ExcelFunction(Name = "MarketDataContribution1")]
        public static object MarketDataContribution1(string ticker, string field, object value)
        {
            var key = $"{ticker}|{field}";
            // field1 should be the key because it never changes
            var asyncResult = ExcelAsyncUtil.Run("MarketDataContribution", key, h =>
            {
                ProcessContributionTask1(h, value);
            });

            return asyncResult.Equals(ExcelError.ExcelErrorNA)
                ? "#Processing..."
                : asyncResult;
        }

        // This one works perfectly but need to implement your own RtdServer
        [ExcelFunction(Name = "MarketDataContribution2")]
        public static object MarketDataContribution2(string ticker, string field, object value)
        {
            var key = $"{ticker}|{field}";
            MarketDataContriButionRtdServer.TopicInfoHolder[key] = value; // we mimic updating the value
            return XlCall.RTD("ExcelDnaMemoryLeak.MarketDataContriButionRtdServer", null, $"{ticker}|{field}");
        }
        
        // This one works perfectly but a little trick is needed
        [ExcelFunction(Name = "MarketDataContribution3")]
        public static object MarketDataContribution3(string ticker, string field, object value)
        {
            // put a "Contribution" prefix to avoid messing up with other observers
            var key = $"Contribution|{ticker}|{field}";
            var asyncResult = RxExcel.Observe("MarketDataContribution3", key, GetObserver(key));

            ////////////////////////////////////////////////////////////////////////////////////////////////////////
            //// If you have some validation logic, you have to put it after RxExcel.Observe
            //// Otherwise the Excel will call DisconnectData hence ExcelDan will call Dispose to kill the observer
            ////////////////////////////////////////////////////////////////////////////////////////////////////////
            if (((double)value) < 0)
            {
                return "#Value cannot be less than zero";
            }

            if (!_processing.ContainsKey(key))
                ProcessContributionTask3(key, value);

            return asyncResult.Equals(ExcelError.ExcelErrorNA)
                ? "#Processing..."
                : asyncResult;
        }

        private static void ProcessContributionTask1(ExcelAsyncHandle handle, object value)
        {
            Task.Run(async () =>
            {
                // mimic some long run task. For example, http request
                await Task.Delay(_random.Next(200, 997));
                var result = $"LEAK OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})";
                handle.SetResult(result);
            });
        }

        private static void ProcessContributionTask3(string key, object value)
        {
            Task.Run(async () =>
            {
                // mimic some long run task. For example, http request
                IObserver<object> obs;
                if (_observers.TryGetValue(key, out obs))
                {
                    _processing[key] = null;
                    await Task.Delay(_random.Next(500, 997));
                    var result = $"Observer OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})";
                    obs.OnNext(result);

                    await Task.Delay(1);

                    object o;
                    _processing.TryRemove(key, out o);
                }
            });
        }

        public static Func<IObservable<object>> GetObserver(string key)
        {
            return () => Observable.Create<object>(observer =>
            {
                IObserver<object> obsEx;
                if (!_observers.TryGetValue(key, out obsEx))
                {
                    if (_observers.TryAdd(key, observer))
                    {
                        observer.OnNext(ExcelError.ExcelErrorNA);
                    }
                }

                return Disposable.Create(() =>
                {
                    IObserver<object> obs;

                    if (_observers.TryRemove(key, out obs))
                    {
                        obs.OnCompleted(); // This is not necessary but may help?
                    }
                });
            });
        }

        private static void FakeMarketDataTicking(object state)
        {
            foreach (var observer in _observers)
            {
                if((int)(_random.NextDouble() * 100) % 5 == 0) // we just want some random ticks
                    if (!observer.Key.StartsWith("Contribution"))
                        observer.Value.OnNext(_random.NextDouble());
            }

            if (_demoCount == 0) // give all initial values
            {
                foreach (var observer in _observers)
                {
                    if(!observer.Key.StartsWith("Contribution"))
Reply all
Reply to author
Forward
0 new messages