Problems with limiting an endpoint to only one consumer.

38 views
Skip to first unread message

Travis Lagnese

unread,
Dec 14, 2016, 5:32:48 PM12/14/16
to masstransit-discuss
Hi all,

Bear with me, I'm very new to masstransit.

I have a consumer that I have defined who has the sole purpose of popping one message from it's queue at a time, completing some work, and grabbing the next message.

Because the work that this consumer is doing is using a resource that can only process one piece of information at a time, I can only have one consumer doing the work on each message on the queue, one message at a time.

We're using masstransit 3.5.2, and my code looks like this:

            configurator.ReceiveEndpoint(host, "search_people_delta_queue", e =>
            {
                e.UseConcurrencyLimit(1, null, null);
                e.Consumer(() => new SearchPeopleCoreDeltaConsumer());
                e.PrefetchCount = 1;
            }); 
This is a mish-mash of many examples that I've tried to emulate from this discussion group, and the only code that I've been able to compile.

Any help would be greatly appreciated!

Thanks,
Travis
 

Travis Lagnese

unread,
Dec 14, 2016, 5:38:07 PM12/14/16
to masstransit-discuss
I want to add the actual problem I'm having. Looking at the logs, I see all messages in the queue firing right away as soon as the search engine is ready to perform another index. The consumer is forced to Task.Sleep(5000) until the search engine is ready to perform another index. As soon as it's ready, it appears that many consumers who were all Task.Sleep(5000)ing immediately try to ask for a delta.

Here's my consumer:

public Task Consume(ConsumeContext<SolrPeopleDeltaMessage> context)
        {
            var successfullyWaited = Infrastructure.Utils.SolrUtility.WaitForSolr(Cores.People);

            var solrParameter = context.Message.SolrParameter;
            var solrIdParameter = (IdParameters)Enum.Parse(typeof(IdParameters), solrParameter);
            var solrParameterValue = context.Message.SolrParameterValue;

            if (!successfullyWaited)
            {
                _loggingService.Log(string.Format("Solr People Core was not ready in time to receive delta index request for {0} = {1}", solrIdParameter, solrParameterValue), LogLevel.Info);
            }

            else
            {
                _loggingService.Log(string.Format("Solr People Core reports ready to index. Delta indexing {0} = {1}", solrIdParameter, solrParameterValue), LogLevel.Info);

                string peopleCoreDeltaUrl = SolrUtility.DeltaUrlFor(Cores.People, solrIdParameter, solrParameterValue);

                var response = SolrUtility.GetResponse(peopleCoreDeltaUrl);
            }

            return context.CompleteTask;
        }
 
And here's the WaitForSolr method:
        public static bool WaitForSolr(Cores core)
        {
            bool solrIsCurrent = false;
            int numberAttempts = 0;

            int secondsUntilRetry = 5;

            //Minutes x 60 = seconds, divide by seconds until retry = maximum allowed number of attempts.
            int maxNumberAttempts = (int) Math.Ceiling(double.Parse(AppSettings.SolrMaximumDeltaWaitMinutes) * 60 / secondsUntilRetry);

            // Attempts will be made every few seconds. The above calculations tell us how many attempts we get based
            // on the maximum amount of minutes we're willing to wait.
            while (!solrIsCurrent && numberAttempts <= maxNumberAttempts)
            {
                var solrStatus = Core.Data.Solr.SolrUtility.GetCoreStatus(core);
                var solrStatusXml = XDocument.Parse(solrStatus);

                // I happen to know that the XML that comes back from Solr has an element that
                // looks like <bool name="current">true or false</bool>
                // We want to know when this is "true", because that means Solr is current and
                // that means we can now do another index.
                var currentValue = solrStatusXml.Descendants("bool")
                    .Single(x => (string)x.Attribute("name") == "current")
                    .Value;

                solrIsCurrent = bool.Parse(currentValue);

                // If Solr is not current, wait a few seconds before asking again.
                if (!solrIsCurrent)
                {
                    Thread.Sleep(secondsUntilRetry * 1000);
                    numberAttempts++;
                }
            }

            return solrIsCurrent;


Reply all
Reply to author
Forward
0 new messages