Alternative C# Library for Stomp?

123 views
Skip to first unread message

Legolash2o

unread,
Oct 21, 2022, 4:08:58 PM10/21/22
to A gathering place for the Open Rail Data community
Hi,

Does anyone know an alternative nuget library for accessing Stomp / ActiveMQ for the feeds please?

I currently use Apache.NMS (v1.5.1), Apache.NMS.ActieMQ (v1.5.6) and Apache.NMS.Stomp (v1.5.4) but they don't seem to work with the latest (approved) feeds. The latest versions of those libraries don't work with either the current or latest feeds.

Hope someone can help, thank you in advance.

Message has been deleted

WantStuff

unread,
Oct 22, 2022, 9:52:12 AM10/22/22
to A gathering place for the Open Rail Data community
Trying to post again - for some reason my first was immediately deleted upon posting.

I use Apache.NMS 1.8.0 and Apache.NMS.ActiveMQ 1.8.0, with the AtiveMQ/OpenWire protocol (port 61619). Both are NET Standard 2.0.
Do NOT use higher versions as they are incomplete - the C# implementation of the project appears to be dead.

I believe Apache.NMS.Stomp 1.5.4 is compatible with Apache.NMS 1.8.0, but have not tested beyond a quick POC before I settled for ActiveMQ.

Legolash2o

unread,
Oct 23, 2022, 3:53:00 AM10/23/22
to A gathering place for the Open Rail Data community
Thanks for providing the information, it's been a great help. I've updated my nuget packages and managed to connect to via port 61619, including the fallover. 

The issue I'm running into now is that it is complaining that 'The archive entry was compressed using an unsupported compression method.'

Can you please provide a small code sample for how you connect to the feed and receive a message? Below is some of my code.
-----
//Creating the connection
Console.WriteLine($"Creating ConnectionFactory...");
                    factory = new ConnectionFactory(new Uri($"{currentConnection}?transport.useInactivityMonitor=false"), Credentials.ClientId);

                    Console.WriteLine($"Creating Connection...");
                    connection = factory.CreateConnection(Credentials.Username, Credentials.Password);
                   
                    Console.WriteLine($"Adding Interupts...");
                    connection.ConnectionInterruptedListener += ConnectionInterruptedListener;
                    connection.ConnectionResumedListener += ConnectionResumedListener;
                    connection.ExceptionListener += ConnectionOnExceptionListener;
                    connection.ClientId = $"{Credentials.ClientId}";

                    Console.WriteLine($"Creating Session...");
                    session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
                    Console.WriteLine("Connection Starting.");
                    connection.Start();
                    connected = true;
                    IsConnecting = false;
                    ConnectedSince = DateTime.UtcNow;

                    Console.WriteLine("Connection Complete.");

                    foreach(_TrustFeed feed in _feeds) {
                        Console.WriteLine($"Starting feed: {feed.Topic}");
                        await feed.Listen(session, $"{Credentials.ClientId}");
                    }

//Listening
    /// <summary>
        ///     Starts listening.
        /// </summary>
        public async Task Listen(ISession session, string clientId) {
            if(session == null)
                return;

            try {
                Session = session;                
                Console.WriteLine($"Setting Up Topic: {Topic}");

                // ActiveMQTopic
                ActiveMQTopic topicd = new ActiveMQTopic($"{Topic}");

                lock(_lock) {
                    Consumer = Session.CreateDurableConsumer(topicd, $"{clientId}-{Topic}", null, false);
                    Consumer.Listener += OnMessage;
                }
                Console.WriteLine($"Consumer started for {Topic}, waiting for messages...");

            }
            catch(Exception e) {
                Console.WriteLine($"Error for {Topic}: {e.ToString()}");
                throw;
            }
        }

//OnMessage
    /// <summary>
        ///     The event that fires when data has been received from the feeds.
        /// </summary>
        /// <param name="message"></param>
        protected void OnMessage(IMessage message) {
            try {
                Task.Factory.StartNew(() => {
                    //DO NOT PUT ANYTHING BEFORE THIS REGION. THE CODE WITHIN THIS REGION IS PRIORITSED TO STORE
                    //A COPY OF THE MESSAGE ON DISK AS SOON AS POSSIBLE. DATABASE SAVING SHOULD BE DONE AFTER.
                    #region IMPORTANT
                    ITextMessage msg = (ITextMessage)message;
                    ProcessTime time = ImportantProcessing(msg.NMSTimestamp, msg.Text); //CRASHES HERE

                    if(!Program.bClosing)
                        message.Acknowledge();

                    Console.WriteLine($"{DateTime.Now:G}-{Topic} [{time.Logs.Length:N0} in {time.TotalTime}ms]");
                    Debug.WriteLine($"{DateTime.Now:G}-{Topic} [GL:{time.GetLogs}ms LC:{time.LogCounting}ms S:{time.SaveTime}ms T:{time.TotalTime}ms]");
                    #endregion

                });
            }
            catch(Exception ex) {
                Console.WriteLine($"{DateTime.Now:G}-{Topic} [ERROR]");
                Logging.WriteError(FeedLogType.MessageException, ex.Message, $"Process Error\r\n{message}", ex);
                _lastError = DateTime.UtcNow;
                _totalErrors++;
            }
        }

Thank you again!

WantStuff

unread,
Oct 23, 2022, 6:52:00 AM10/23/22
to A gathering place for the Open Rail Data community
Ah yes, I remember this one!

The problem is because the response is zipped in a "multi-member" Gzip format and the Apache implementation only handles "single-member" Gzip formats. However, we can switch it up.

1) First create a new "Compression Policy" that handles multi-member gzips.
Add the Ionic.Zlib.Core v1.0 library (it's an old library but is NET Standard 2.0), and create the class:

using System;
using System.IO;
using Apache.NMS.ActiveMQ;
using Ionic.Zlib;

. . .

    public class MultiMemberCompressionPolicy : ICompressionPolicy, ICloneable
    {
        public Stream CreateCompressionStream(Stream data)
        {
            return new ZlibStream(data, CompressionMode.Compress);
        }

        public Stream CreateDecompressionStream(Stream data)
        {
            return new ZlibStream(data, CompressionMode.Decompress);
        }

        public object Clone() => MemberwiseClone();
    }


2) Add this to the connection before you create the session (after you set the ClientID is fine):
// Use our own CompressionPolicy that supports multi-member Gzip files.
connection.SetCompressionPolicy(new MultiMemberCompressionPolicy());


You should be good to go!

Legolash2o

unread,
Oct 23, 2022, 10:10:51 AM10/23/22
to A gathering place for the Open Rail Data community
That worked perfectly, thank you!
Reply all
Reply to author
Forward
0 new messages