subscribe to stream via windows service, project events and persist data

111 views
Skip to first unread message

Christian Setzkorn

unread,
Dec 8, 2014, 8:18:16 AM12/8/14
to event...@googlegroups.com
Hi,

Is anyone aware of some .Net (C#) code that uses a windows service that subscribes to a stream and projects certain events and ultimately persists them in a database? I guess the windows service would ReadAllEventsBackwardAsync according to some heartbeat. It would also catch-up if it has been (re)started. Any feedback would be very much appreciated. Thanks.

Christian

Raif Harik

unread,
Dec 8, 2014, 10:03:48 AM12/8/14
to event...@googlegroups.com
Hey, are you looking for some code that implements all of that?  As far as I know there is no full ES infrastructure package.  I think many people have come up with there own event dispatchers ( listen to events and hand off to the appropriate handlers ).  I have and it took some thinkin.  I intend either make available, or ask for code review on mine sometime soon.  
R

Chris Ray

unread,
Dec 8, 2014, 11:21:34 AM12/8/14
to event...@googlegroups.com
Christian, I wrote a starter project for a C# project to SQL (with remembering last position) that I copy and modify as needed for whatever particular scenario. It's by no means production ready. To make it a working out-of-box solution, it uses ES-Embedded, but you would obviously change that. Other dependencies are Topshelf (for win-service) and Json.NET for serialization.

As an aside, all of the plumbing surrounding storing the position in SQL is moot once Competing-Consumers (now in dev) makes it to production.

Hope it helps:

using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using System.Timers;
using EventStore.ClientAPI;
using EventStore.ClientAPI.Embedded;
using EventStore.Core;
using Newtonsoft.Json;
using Topshelf;

namespace EsSvcProjection
{
    class Program
    {
        static readonly bool RESTART_SQL = true;//true = completely recreate database tables, false = from last position
        static readonly string CONNECTION_STRING = <<INSERT_CONNECTION_STRING_HERE>>;
        static void Main(string[] args)
        {
            HostFactory.Run(x =>
            {
                x.Service<EsSvcProjectionApp>(s =>
                {
                    s.ConstructUsing(name => new EsSvcProjectionApp(RESTART_SQL, CONNECTION_STRING));
                    s.WhenStarted(tc => tc.Start());
                    s.WhenStopped(tc => tc.Stop());
                });
                x.RunAsLocalSystem();
                x.SetDescription("EsSvcProjectionApp Description");
                x.SetDisplayName("EsSvcProjectionApp Display Name");
                x.SetServiceName("EsSvcProjectionApp");
            });
        }
    }

    public class EsSvcProjectionApp
    {
        readonly ClusterVNode _embeddedEventStore;
        readonly IEventStoreConnection _connection;
        readonly bool _restartSql;
        readonly string _connectionString;
        readonly Random _rnd = new Random();
        readonly Timer _timer;

        public EsSvcProjectionApp(bool restartSql, string connectionString)
        {
            _restartSql = restartSql;
            _connectionString = connectionString;
            _embeddedEventStore = EmbeddedVNodeBuilder.AsSingleNode().OnDefaultEndpoints().RunInMemory().Build();
            _connection = EmbeddedEventStoreConnection.Create(_embeddedEventStore);
            _timer = new Timer(1500);
            _timer.Elapsed += timerElapsed;
        }

        public void Start()
        {
            _embeddedEventStore.Start();
            _connection.ConnectAsync().Wait();
            _connection.AppendToStreamAsync("bankaccount", ExpectedVersion.Any, getEvents(3));

            if (_restartSql)
                rebuildSql();

            Console.WriteLine("Total before subscription: {0}", sqlScalar("select amount from bankaccount"));

            _connection.SubscribeToStreamFrom("bankaccount", getLastPos(), false, eventAppeared, liveProcessingStarted);
            _timer.Start();
        }

        public void Stop()
        {
            _connection.Dispose();
        }

        private void timerElapsed(object sender, ElapsedEventArgs e)
        {
            _connection.AppendToStreamAsync("bankaccount", ExpectedVersion.Any, getEvents(1));
        }

        private void liveProcessingStarted(EventStoreCatchUpSubscription obj)
        {
            Console.WriteLine("live!");
        }

        private void eventAppeared(EventStoreCatchUpSubscription sub, ResolvedEvent evt)
        {
            var type = evt.Event.EventType;
            switch (type)
            {
                case "MoneyDeposited": moneyDeposited(evt.Event); break;
                case "MoneyWithdrawn": moneyWithdrawn(evt.Event); break;
            }
        }

        private void moneyDeposited(RecordedEvent recordedEvent)
        {
            var evt = JsonConvert.DeserializeObject<MoneyDeposited>(Encoding.UTF8.GetString(recordedEvent.Data));
            var total = sqlAddToBankAccount(evt.Amount, recordedEvent.EventNumber);
            Console.WriteLine("Money Deposited: {0}. Total: {1}", evt.Amount, total);
        }

        private void moneyWithdrawn(RecordedEvent recordedEvent)
        {
            var evt = JsonConvert.DeserializeObject<MoneyWithdrawn>(Encoding.UTF8.GetString(recordedEvent.Data));
            var total = sqlAddToBankAccount(-evt.Amount, recordedEvent.EventNumber);
            Console.WriteLine("Money Withdrawn: {0}. Total: {1}", evt.Amount, total);
        }

        private void rebuildSql()
        {
            sqlScalar("drop table position", true);
            sqlScalar("drop table bankaccount", true);
            sqlScalar("create table position (pos int not null default(0))");
            sqlScalar("create table bankaccount (amount int not null)");
            sqlScalar("insert into bankaccount (amount) values (0)");
        }

        private IEnumerable<EventData> getEvents(int count)
        {
            var retVal = new List<EventData>();
            for (int i = 0; i < count; i++)
            {
                var amount = _rnd.Next(1, 100);
                if (amount % 2 == 0)
                    retVal.Add(new EventData(Guid.NewGuid(), "MoneyDeposited", true, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MoneyDeposited { Amount = amount })), null));
                else
                    retVal.Add(new EventData(Guid.NewGuid(), "MoneyWithdrawn", true, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new MoneyWithdrawn { Amount = amount })), null));
            }
            return retVal;
        }

        private int? getLastPos()
        {
            return (int?)sqlScalar("select pos from position");
        }

        private object sqlScalar(string stmt, bool suppressError = false)
        {
            using (var conn = new SqlConnection(_connectionString))
            {
                conn.Open();

                try
                {
                    using (var cmd = conn.CreateCommand())
                    {
                        cmd.CommandText = stmt;
                        cmd.CommandType = System.Data.CommandType.Text;
                        return cmd.ExecuteScalar();
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine(ex.Message);
                    if (!suppressError)
                        throw;
                    else
                        return ex.Message;
                }
                finally
                {
                    conn.Close();
                }
            }
        }

        private int sqlAddToBankAccount(int amount, int pos, bool suppressError = false)
        {
            using (var conn = new SqlConnection(_connectionString))
            {
                conn.Open();
                var trans = conn.BeginTransaction();
                try
                {
                    using (var cmd = conn.CreateCommand())
                    {
                        cmd.Transaction = trans;
                        cmd.CommandText = "delete position";
                        cmd.CommandType = System.Data.CommandType.Text;
                        cmd.ExecuteNonQuery();
                    }

                    using (var cmd = conn.CreateCommand())
                    {
                        cmd.Transaction = trans;
                        cmd.CommandText = "insert into position (pos) values (@pos)";
                        cmd.Parameters.AddWithValue("@pos", pos);
                        cmd.CommandType = System.Data.CommandType.Text;
                        cmd.ExecuteNonQuery();
                    }

                    using (var cmd = conn.CreateCommand())
                    {
                        cmd.Transaction = trans;
                        cmd.CommandText = "update bankaccount set amount = amount + @amount";
                        cmd.Parameters.AddWithValue("@amount", amount);
                        cmd.CommandType = System.Data.CommandType.Text;
                        cmd.ExecuteNonQuery();
                    }
                    trans.Commit();
                }
                catch (Exception ex)
                {
                    trans.Rollback();
                    Console.WriteLine(ex.Message);
                    if (!suppressError)
                        throw;
                }
                finally
                {
                    conn.Close();
                }
            }

            return (int)sqlScalar("select amount from bankaccount");
        }
    }

    public class MoneyDeposited
    {
        public int Amount { get; set; }
    }

    public class MoneyWithdrawn
    {
        public int Amount { get; set; }
    }
}


Chris


On Monday, December 8, 2014 7:18:16 AM UTC-6, Christian Setzkorn wrote:

Greg Young

unread,
Dec 8, 2014, 11:50:21 AM12/8/14
to event...@googlegroups.com
This exists its called a catchupsubscription in the client api. Just
host it in your windows service. You would just store your last
persisted event and pass it back on startup (will continue from that
point)
> --
> You received this message because you are subscribed to the Google Groups
> "Event Store" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to event-store...@googlegroups.com.
> For more options, visit https://groups.google.com/d/optout.



--
Studying for the Turing test

Greg Young

unread,
Dec 8, 2014, 11:50:47 AM12/8/14
to event...@googlegroups.com
You probably don't want to use competing consumers for projections.

Raif Harik

unread,
Dec 8, 2014, 12:02:04 PM12/8/14
to event...@googlegroups.com
@greg is this because of datastore cluster replication?  It's difficult/impossible to be sure that a write to the datastore has been replicated to other nodes before a competing consumer reads the same record from the ds and updates.  That's what I was running into last week.

Greg Young

unread,
Dec 8, 2014, 12:04:19 PM12/8/14
to event...@googlegroups.com
because of our replication confused by the response? You should never
get a message that has not yet been replicated.

Competing consumers can quite easily give out of order messages where
as catchup subscription gives in order.

Cheers,

Greg

Raif Harik

unread,
Dec 8, 2014, 12:13:05 PM12/8/14
to event...@googlegroups.com
rather that when you make an update to a clusterd instance of say elastic, which has a 1 sec replication time, you could be reading and acting on the next message in the queue before the replication is complete.  If that message is updating the same record as the last message this could result in querying the db, getting the stale date, updating and persisting it.  There are ways, I discovered last week, of mitigating this, however, they are not possible when you have competing consumers on the projection side.
I guess that is ultimately the same issue with a different cause, as out of order messages.
R

Greg Young

unread,
Dec 8, 2014, 12:17:57 PM12/8/14
to event...@googlegroups.com
Ah ok so you are referring to the store you are writing to that makes sense.

Competing will do its best to be in order but a single slow message or
zombied tcp socket could cause out of order messages (waits until
retry)

Raif Harik

unread,
Dec 8, 2014, 12:18:51 PM12/8/14
to event...@googlegroups.com
yes sorry if that wasn't clear.

Chris Ray

unread,
Dec 8, 2014, 12:23:05 PM12/8/14
to event...@googlegroups.com
That's a good point, I'll stick to catchup subscriptions for projections.
Reply all
Reply to author
Forward
0 new messages