private static HttpClient client = new HttpClient(new HttpClientHandler());
// Example function
public static MarketDataContribute(string ticker, string field, double calculatedValue, string server){
var asyncResult = ExcelAsyncUtil.Run("MarketData.Contribute", $"{ticker}|{field}|{server}", handle=>{ queue.Enque(()=>{
var result = client.PostAsJsonAsync(url, httpContentWithValue);
handle.SetResult(result);
}); });
return asyncResult.Equals(ExcelError.ExcelErrorNA) ? "#Processing..." : asyncResult;}
using System;using System.Collections.Concurrent;using System.Collections.Generic;using System.Diagnostics;using System.Reactive.Disposables;using System.Reactive.Linq;using System.Runtime.InteropServices;using System.Threading.Tasks;using ExcelDna.Integration;using ExcelDna.Integration.Rtd;using System.Threading;
namespace ExcelDnaMemoryLeak{ public class Main: IExcelAddIn { public void AutoClose() { //throw new NotImplementedException(); }
public void AutoOpen() { //throw new NotImplementedException(); } }
public static class Functions { private static readonly Random _random = new Random(); private static readonly ConcurrentDictionary<string, IObserver<object>> _observers = new ConcurrentDictionary<string, IObserver<object>>(); private static readonly Timer _fakeMarketDataTicker = new Timer(FakeMarketDataTicking, null, 0, 500); private static long _demoCount;
[ExcelFunction(Name = "MarketDataSubscibe")] public static object MarketDataSubscibe(string ticker, string field) { return RxExcel.Observe("MarketDataSubscription", $"{ticker}|{field}", GetObserver($"{ticker}|{field}")); }
[ExcelFunction(Name = "MarketDataContribution1")] public static object MarketDataContribution1(string ticker, string field, object value) { var key = $"{ticker}|{field}"; // field1 should be the key because it never changes var asyncResult = ExcelAsyncUtil.Run("MarketDataContribution", key, h => { ProcessContributionTask(h, value); });
return asyncResult.Equals(ExcelError.ExcelErrorNA) ? "#Processing..." : asyncResult; }
[ExcelFunction(Name = "MarketDataContribution2")] public static object MarketDataContribution2(string ticker, string field, object value) { var key = $"{ticker}|{field}"; MarketDataContriButionRtdServer.TopicInfoHolder[key] = value; // we mimic updating the value return XlCall.RTD("ExcelDnaMemoryLeak.MarketDataContriButionRtdServer", null, $"{ticker}|{field}"); }
private static void ProcessContributionTask(ExcelAsyncHandle handle, object value) { Task.Run(async () => { // mimic some long run task. For example, http request await Task.Delay(_random.Next(200, 997)); var result = $"LEAK OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})"; handle.SetResult(result); }); }
public static Func<IObservable<object>> GetObserver(string key) { return () => Observable.Create<object>(observer => { IObserver<object> obsEx; if (!_observers.TryGetValue(key, out obsEx)) { //Logger.Trace($"{LogCaption} Adding observer for: {key}");
if (_observers.TryAdd(key, observer)) { observer.OnNext("#Loading..."); } }
return Disposable.Create(() => { IObserver<object> obs;
if (_observers.TryRemove(key, out obs)) { obs.OnCompleted(); } }); }); }
private static void FakeMarketDataTicking(object state) { foreach (var observer in _observers) { if((int)(_random.NextDouble() * 100) % 5 == 0) // we just want some random ticks observer.Value.OnNext(_random.NextDouble()); }
if (_demoCount == 0) // give it initial values { foreach (var observer in _observers) { observer.Value.OnNext(_random.NextDouble()); } } _demoCount++; }
}
[ComVisible(true)] public class MarketDataContriButionRtdServer : ExcelRtdServer { private readonly ConcurrentDictionary<int, Topic> _topics; private Timer _timer; private readonly Random _rand; public static ConcurrentDictionary<string, object> TopicInfoHolder = new ConcurrentDictionary<string, object>(); public static ConcurrentDictionary<int, string> TopicIdHolder = new ConcurrentDictionary<int, string>();
public MarketDataContriButionRtdServer() { _topics = new ConcurrentDictionary<int, Topic>(); _rand = new Random(); _timer = new Timer(delegate { foreach (var topic in _topics) { string key; if (TopicIdHolder.TryGetValue(topic.Key, out key)) { object value; if (TopicInfoHolder.TryGetValue(key, out value)) { var result = $"NOLEAK OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})"; topic.Value.UpdateValue(result); } } } }, null, 0, 500); }
protected override Topic CreateTopic(int topicId, IList<string> topicInfo) { TopicIdHolder[topicId] = topicInfo[0]; return base.CreateTopic(topicId, topicInfo); }
protected override object ConnectData(Topic topic, IList<string> topicInfo, ref bool newValues) { Debug.WriteLine("ConnectData: {0} - {{{1}}}", topic.TopicId, string.Join(", ", topicInfo)); _topics[topic.TopicId] = topic; return ExcelErrorUtil.ToComError(ExcelError.ExcelErrorNA); }
protected override void DisconnectData(Topic topic) { Topic t; if (_topics.TryRemove(topic.TopicId, out t)) { string key; if (TopicIdHolder.TryRemove(topic.TopicId, out key)) { object value; TopicInfoHolder.TryRemove(key, out value); } } Debug.WriteLine("DisconnectData: {0}", topic.TopicId); } } public static class RxExcel { public static IExcelObservable ToExcelObservable<T>(this IObservable<T> observable) { return new ExcelObservable<T>(observable); }
public static object Observe<T>(string functionName, object parameters, Func<IObservable<T>> observableSource) { return ExcelAsyncUtil.Observe(functionName, parameters, () => observableSource().ToExcelObservable()); } }
public class ExcelObservable<T> : IExcelObservable { private readonly IObservable<T> _observable;
public ExcelObservable(IObservable<T> observable) { _observable = observable; }
public virtual IDisposable Subscribe(IExcelObserver observer) { return _observable.Subscribe(value => observer.OnNext(value), observer.OnError, observer.OnCompleted); } }}using System;using System.Collections.Concurrent;using System.Collections.Generic;using System.Diagnostics;using System.Reactive.Disposables;using System.Reactive.Linq;using System.Runtime.InteropServices;using System.Threading.Tasks;using ExcelDna.Integration;using ExcelDna.Integration.Rtd;using System.Threading;
namespace ExcelDnaMemoryLeak{ public class Main: IExcelAddIn { public void AutoClose() { //throw new NotImplementedException(); }
public void AutoOpen() { //throw new NotImplementedException(); } }
public static class Functions { private static readonly Random _random = new Random(); private static readonly ConcurrentDictionary<string, IObserver<object>> _observers = new ConcurrentDictionary<string, IObserver<object>>(); private static readonly Timer _fakeMarketDataTicker = new Timer(FakeMarketDataTicking, null, 0, 500); private static readonly ConcurrentDictionary<string, object> _processing = new ConcurrentDictionary<string, object>(); private static long _demoCount;
[ExcelFunction(Name = "MarketDataSubscibe")] public static object MarketDataSubscibe(string ticker, string field) { return RxExcel.Observe("MarketDataSubscription", $"{ticker}|{field}", GetObserver($"{ticker}|{field}")); }
// This one will force ExcelDna to call OnCompleted() and then Excel will not resue the RTD tpoics [ExcelFunction(Name = "MarketDataContribution1")] public static object MarketDataContribution1(string ticker, string field, object value) { var key = $"{ticker}|{field}"; // field1 should be the key because it never changes var asyncResult = ExcelAsyncUtil.Run("MarketDataContribution", key, h => { ProcessContributionTask1(h, value); });
return asyncResult.Equals(ExcelError.ExcelErrorNA) ? "#Processing..." : asyncResult; }
// This one works perfectly but need to implement your own RtdServer [ExcelFunction(Name = "MarketDataContribution2")] public static object MarketDataContribution2(string ticker, string field, object value) { var key = $"{ticker}|{field}"; MarketDataContriButionRtdServer.TopicInfoHolder[key] = value; // we mimic updating the value return XlCall.RTD("ExcelDnaMemoryLeak.MarketDataContriButionRtdServer", null, $"{ticker}|{field}"); } // This one works perfectly but a little trick is needed [ExcelFunction(Name = "MarketDataContribution3")] public static object MarketDataContribution3(string ticker, string field, object value) { // put a "Contribution" prefix to avoid messing up with other observers var key = $"Contribution|{ticker}|{field}"; var asyncResult = RxExcel.Observe("MarketDataContribution3", key, GetObserver(key));
//////////////////////////////////////////////////////////////////////////////////////////////////////// //// If you have some validation logic, you have to put it after RxExcel.Observe //// Otherwise the Excel will call DisconnectData hence ExcelDan will call Dispose to kill the observer //////////////////////////////////////////////////////////////////////////////////////////////////////// if (((double)value) < 0) { return "#Value cannot be less than zero"; }
if (!_processing.ContainsKey(key)) ProcessContributionTask3(key, value);
return asyncResult.Equals(ExcelError.ExcelErrorNA) ? "#Processing..." : asyncResult; }
private static void ProcessContributionTask1(ExcelAsyncHandle handle, object value) { Task.Run(async () => { // mimic some long run task. For example, http request await Task.Delay(_random.Next(200, 997)); var result = $"LEAK OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})"; handle.SetResult(result); }); }
private static void ProcessContributionTask3(string key, object value) { Task.Run(async () => { // mimic some long run task. For example, http request IObserver<object> obs; if (_observers.TryGetValue(key, out obs)) { _processing[key] = null; await Task.Delay(_random.Next(500, 997)); var result = $"Observer OK: {DateTime.Now:yyyy-MM-dd HH:mm:ss} ({value})"; obs.OnNext(result);
await Task.Delay(1);
object o; _processing.TryRemove(key, out o); } }); }
public static Func<IObservable<object>> GetObserver(string key) { return () => Observable.Create<object>(observer => { IObserver<object> obsEx; if (!_observers.TryGetValue(key, out obsEx)) { if (_observers.TryAdd(key, observer)) { observer.OnNext(ExcelError.ExcelErrorNA); } }
return Disposable.Create(() => { IObserver<object> obs;
if (_observers.TryRemove(key, out obs)) { obs.OnCompleted(); // This is not necessary but may help? } }); }); }
private static void FakeMarketDataTicking(object state) { foreach (var observer in _observers) { if((int)(_random.NextDouble() * 100) % 5 == 0) // we just want some random ticks if (!observer.Key.StartsWith("Contribution")) observer.Value.OnNext(_random.NextDouble()); }
if (_demoCount == 0) // give all initial values { foreach (var observer in _observers) { if(!observer.Key.StartsWith("Contribution"))