Thanks for providing the information, it's been a great help. I've updated my nuget packages and managed to connect to via port 61619, including the fallover.
The issue I'm running into now is that it is complaining that 'The archive entry was compressed using an unsupported compression method.'
Can you please provide a small code sample for how you connect to the feed and receive a message? Below is some of my code.
-----
//Creating the connection
Console.WriteLine($"Creating ConnectionFactory...");
factory = new ConnectionFactory(new Uri($"{currentConnection}?transport.useInactivityMonitor=false"), Credentials.ClientId);
Console.WriteLine($"Creating Connection...");
connection = factory.CreateConnection(Credentials.Username, Credentials.Password);
Console.WriteLine($"Adding Interupts...");
connection.ConnectionInterruptedListener += ConnectionInterruptedListener;
connection.ConnectionResumedListener += ConnectionResumedListener;
connection.ExceptionListener += ConnectionOnExceptionListener;
connection.ClientId = $"{Credentials.ClientId}";
Console.WriteLine($"Creating Session...");
session = connection.CreateSession(AcknowledgementMode.ClientAcknowledge);
Console.WriteLine("Connection Starting.");
connection.Start();
connected = true;
IsConnecting = false;
ConnectedSince = DateTime.UtcNow;
Console.WriteLine("Connection Complete.");
foreach(_TrustFeed feed in _feeds) {
Console.WriteLine($"Starting feed: {feed.Topic}");
await feed.Listen(session, $"{Credentials.ClientId}");
}
//Listening
/// <summary>
/// Starts listening.
/// </summary>
public async Task Listen(ISession session, string clientId) {
if(session == null)
return;
try {
Session = session;
Console.WriteLine($"Setting Up Topic: {Topic}");
// ActiveMQTopic
ActiveMQTopic topicd = new ActiveMQTopic($"{Topic}");
lock(_lock) {
Consumer = Session.CreateDurableConsumer(topicd, $"{clientId}-{Topic}", null, false);
Consumer.Listener += OnMessage;
}
Console.WriteLine($"Consumer started for {Topic}, waiting for messages...");
}
catch(Exception e) {
Console.WriteLine($"Error for {Topic}: {e.ToString()}");
throw;
}
}
//OnMessage
/// <summary>
/// The event that fires when data has been received from the feeds.
/// </summary>
/// <param name="message"></param>
protected void OnMessage(IMessage message) {
try {
Task.Factory.StartNew(() => {
//DO NOT PUT ANYTHING BEFORE THIS REGION. THE CODE WITHIN THIS REGION IS PRIORITSED TO STORE
//A COPY OF THE MESSAGE ON DISK AS SOON AS POSSIBLE. DATABASE SAVING SHOULD BE DONE AFTER.
#region IMPORTANT
ITextMessage msg = (ITextMessage)message;
ProcessTime time = ImportantProcessing(msg.NMSTimestamp, msg.Text); //CRASHES HERE
if(!Program.bClosing)
message.Acknowledge();
Console.WriteLine($"{DateTime.Now:G}-{Topic} [{time.Logs.Length:N0} in {time.TotalTime}ms]");
Debug.WriteLine($"{DateTime.Now:G}-{Topic} [GL:{time.GetLogs}ms LC:{time.LogCounting}ms S:{time.SaveTime}ms T:{time.TotalTime}ms]");
#endregion
});
}
catch(Exception ex) {
Console.WriteLine($"{DateTime.Now:G}-{Topic} [ERROR]");
Logging.WriteError(FeedLogType.MessageException, ex.Message, $"Process Error\r\n{message}", ex);
_lastError = DateTime.UtcNow;
_totalErrors++;
}
}
Thank you again!