ravendb async and non async session TCP-IP connexion issue

17 views
Skip to first unread message

jonatan bouillon

unread,
Oct 7, 2021, 2:20:18 AM10/7/21
to rav...@googlegroups.com
We have currently tcp/ip related issues with our different web applications, web jobs that connect to ravendb 5.2.

Today I found a strange behavior with the stream async and I'm not sure where the issue is in the code used in a .Net Framework 4.8 console app.

With the following async code (Advanced.StreamAsync) when I launch the program the stream async creates a couple new tcp ip connexion for each item of the loop.

The number of TCP connections locally starting at 500 et goes up each time the query is executed and the program takes all my TCP-IP internet connexion ...

When we show the connexion in powershell we can find a bunch (up to 30 000) of new TCP-IP connections are established with my local ravendb.

127.0.0.1                           60414     127.0.0.1                           8080       Established Internet       40856
127.0.0.1                           60413     127.0.0.1                           8080       Established Internet       40856
127.0.0.1                           60412     127.0.0.1                           8080       Established Internet       40856
127.0.0.1                           60411     127.0.0.1                           8080       Established Internet       40856
127.0.0.1                           60410     127.0.0.1                           8080       Established Internet       40856
127.0.0.1                           60409     127.0.0.1                           8080       Established Internet       40856
127.0.0.1                           60408     127.0.0.1                           8080       Established Internet       40856
127.0.0.1                           60407     127.0.0.1                           8080       Established Internet       40856
127.0.0.1                           60238     127.0.0.1                           8080       CloseWait   Internet       40856
127.0.0.1                           60237     127.0.0.1                           8080       CloseWait   Internet       40856
127.0.0.1                           60236     127.0.0.1                           8080       CloseWait   Internet       40856
127.0.0.1                           60235     127.0.0.1                           8080       CloseWait   Internet       40856
127.0.0.1                           60234     127.0.0.1                           8080       CloseWait   Internet       40856
127.0.0.1                           60233     127.0.0.1                           8080       CloseWait   Internet       40856
127.0.0.1                           60232     127.0.0.1                           8080       CloseWait   Internet       40856
127.0.0.1                           60231     127.0.0.1                           8080       CloseWait   Internet       40856
127.0.0.1                           60230     127.0.0.1                           8080       CloseWait   Internet       40856

But, if I use the non-async version below I don't have the problem and the connexion number stays around the same amount (500) + 5.

I can't figure out why the async does not reuse the same TCP-IP connexion. We used to stream async at a lot of places so I think that our issues are probably related to this same problem.

// 1 - async code session 

 IList<Member> allMembers = new List<Member>();

using (var session this.documentStore.OpenAsyncSession(this.ParametresExecution.Tenant))
{
       allMembers = await session.Query<Member, MemberSearchIndex>()
                    .Where(x => x.Tags.ContainsAny(new[] { "Local" }))
                    .ToStreamedListAsync(session);
}

            var nowYear = parametres.AnneeFermee ? DateTime.Now.Year : DateTime.Now.Year - 
            var memberOrderedByLastName = allMembers.OrderBy(f => f.LastName);

            foreach (var member in memberOrderedByLastName)
            {
                try
                {
                 
                    using (IAsyncDocumentSession subSessionAsync = this.documentStore.OpenAsyncSession(this.ParametresExecution.Tenant))
                    {
                    
                  
                        var yearStartDate = new DateTime(nowYear, 1, 1);

                        var payments = subSessionAsync
                            .Query<CCQLocal9Payment, CCQLocal9PaymentsIndex>()
                            .Where(p => p.MemberId == member.Id && p.Period < yearStartDate);

                        var results = new List<CCQLocal9Payment>();
                        var resultsStream = await subSessionAsync.Advanced.StreamAsync(payments.ProjectInto<CCQLocal9Payment>());
                        while (await resultsStream.MoveNextAsync())
                        {
                            results.Add(resultsStream.Current.Document);
                        }
                       

                }
       
            }


// 2 - non async
     
                    using (IDocumentSession session = this.documentStore.OpenSession(this.ParametresExecution.Tenant))
                    {                                      
                        var yearStartDate = new DateTime(nowYear, 1, 1);

                        var payments =  session 
                            .Query<CCQLocal9Payment, CCQLocal9PaymentsIndex>()
                            .Where(p => p.MemberId == member.Id && p.Period < yearStartDate);

                        var results = new List<CCQLocal9Payment>();
                        var resultsStream = session   .Advanced.StreamAsync(payments.ProjectInto<CCQLocal9Payment>());
                        while (resultsStream.MoveNext())
                        {
                            results.Add(resultsStream.Current.Document);
                        }
                       

                }


// index use
public class CCQLocal9PaymentsIndex : AbstractIndexCreationTask<CCQLocal9Payment>, ITenantIndex
    {
        public CCQLocal9PaymentsIndex()
        {
            this.Map = payments => payments.Select(p => new
            {
                p.MemberId,
                p.Period,
                p.PaymentDate,
                p.Amount,
                p.Id,
                p.Method,
                p.IsCancelled
            });
        }
    }


Oren Eini (Ayende Rahien)

unread,
Oct 7, 2021, 4:58:01 AM10/7/21
to ravendb
What is  ToStreamedListAsync()
You might need to dispose the value from the stream.

You are mixing StremAsync() and MoveNext(), which isn't possible on the non async version. So this isn't actually the real code (won't compile).

Can you send a repro?

--
You received this message because you are subscribed to the Google Groups "RavenDB - an awesome database" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ravendb+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/ravendb/CAKdJJEzuzC5r0VJsCOXwMWaPpEunAn40ZbpRbowaQ-eR8%2BEeOw%40mail.gmail.com.


--
Oren Eini
CEO   /   Hibernating Rhinos LTD
Skype:  ayenderahien
Support:  sup...@ravendb.net
  

jonatan bouillon

unread,
Oct 7, 2021, 9:56:26 AM10/7/21
to rav...@googlegroups.com


I'm preparing a repo that i will send soon. 


The code is getting Members, in my case it's giving me around 20000.

After, for each one I get CCQLocal9Payment. Some member have like 10-50 payment.


ToStreamedListAsync is an extension method that we use to convert to a list Member and a list of CCQLocal9Payment of the streamed objects. It's equivalent i think to do :


                 var members = session.Query<Member, MemberSearchIndex>().
                    Where(x => x.Status == MemberStatus.Active)
                    .Where(x => x.Tags.ContainsAny(new[] { "Local - 0009" }))
                    .Where(x => x.SeniorityDate <= parametres.Dateminimum);

                var resultsStream = await session.Advanced.StreamAsync(members.ProjectInto<Member>(), default);
                while (await resultsStream.MoveNextAsync())
                {
                    allMembers.Add(resultsStream.Current.Document);
                }


You can find here ToStreamedListAsync =>

 

       public static async Task<IList<TResult>> ToStreamedListAsync<TResult>(this IQueryable<TResult> query, IAsyncDocumentSession session, CancellationToken cancellationToken)
        {
            var results = new List<TResult>();
            var resultsStream = await session.Advanced.StreamAsync(query.ProjectInto<TResult>(), cancellationToken);
            while (await resultsStream.MoveNextAsync())
            {
                results.Add(resultsStream.Current.Document);
            }

            return results;
        }

        public static async Task<IList<TResult>> ToStreamedListAsync<TResult>(this IQueryable<TResult> query, IAsyncDocumentSession session)
        {
            return await query.ToStreamedListAsync(session, CancellationToken.None);
        }

 

---


   IList<Member> allMembers = new List<Member>();

            var parametres = this.GetRapportParametres<RapportParametres>();

 

            using (var session = this.documentStore.OpenAsyncSession(this.ParametresExecution.Tenant))

            {

                var members = session.Query<Member, MemberSearchIndex>().

                    Where(x => x.Status == MemberStatus.Active)
                    .Where(x => x.Tags.ContainsAny(new[] { "Local - 0009" }))
                    .Where(x => x.SeniorityDate <= parametres.Dateminimum);


                    var resultsStream = await subSessionAsync.Advanced.StreamAsync(payments.ProjectInto<Member>(), default);

                        while (await resultsStream.MoveNextAsync())

                        {

                            results.Add(resultsStream.Current.Document);

                        }

            }

 

            var membresAssures = new List<MembresAssures>();

 

            var nowYear = DateTime.Now;

 

            var memberOrderedByLastName = allMembers.OrderBy(f => f.LastName);

 

            var progress = 1;

            var total = memberOrderedByLastName.Count();

 

            foreach (var member in memberOrderedByLastName)

            {

                try

                {

                    DrawProgressBar(progress, total);

                    progress++;

 

                    // nonasync  -> no tcpip problem

                    using (IAsyncDocumentSession subSessionAsync = this.documentStore.OpenAsyncSession(this.ParametresExecution.Tenant))

                    {

                        var payments = subSessionAsync

                            .Query<CCQLocal9Payment, CCQLocal9PaymentsIndex>()

                            .Where(p => p.MemberId == member.Id && p.Period < nowYear);

 

                        var results = new List<CCQLocal9Payment>();

 

                        var resultsStream = await subSessionAsync.Advanced.StreamAsync(payments.ProjectInto<CCQLocal9Payment>(), default);

                        while (await resultsStream.MoveNextAsync())

                        {

                            results.Add(resultsStream.Current.Document);

                        }

                    }

 

                    // nonasync  -> no tcpip problem

                    //using (IDocumentSession session = this.documentStore.OpenSession(this.ParametresExecution.Tenant))

                    //{

                    //    var payments = session

                    //        .Query<CCQLocal9Payment, CCQLocal9PaymentsIndex>()

                    //        .Where(p => p.MemberId == member.Id && p.Period < nowYear);

 

                    //    var results = new List<CCQLocal9Payment>();

                    //    var resultsStream = session.Advanced.Stream(payments.ProjectInto<CCQLocal9Payment>());

                    //    while (resultsStream.MoveNext())

                    //    {

                    //        results.Add(resultsStream.Current.Document);

                    //    }

                    //}

 

                }

                catch (Exception e)

                {

                    var objet = new MembresAssures

                    {

                        Nas = member.MemberNumber,

                        Adresse = e.ToString(),

                    };

 

                    membresAssures.Add(objet);

                    Console.WriteLine(member.MemberNumber + " - error - use non async");

                    Log.Error(member.MemberNumber + " - error - use non async");

                }

            }

 

 


Reply all
Reply to author
Forward
0 new messages