Nhibernate async await

88 views
Skip to first unread message

Hakan karaoğlu

unread,
Oct 30, 2020, 1:46:48 PM10/30/20
to nhusers
Hi,

I'm using rabbitmq and nhibernate in a .net core project. I run asynchronous queries for database operations on some consumers. For example, I am sending a single message to the queue, the message is processed successfully, but when I send 100 messages, database operations start getting errors after a while.

I tested the similar structure as follows. I did database transactions 100 times in a loop and it didn't give an error. But when I tried it in parallel loop, it gave an error like in rabbitmq.

Is this from Nhibernate or is there a different logic? Nhibernate not thread-safe?

Sample Code with Error :

Parallel.For(0, 100, async act =>
            {
                var ws = await _workspaceRepository.GetWorkspaceByIdAsync(66);
                System.Console.WriteLine($"GetWorkspaceByIdAsync : {ws.Name}");
                var keywords = await _workspaceKeywordRepository.GetForbiddenKeywordsByWorkspaceAsync(66);
                System.Console.WriteLine($"GetForbiddenKeywordsByWorkspaceAsync : {keywords.Count}");
                var IsforbiddenKeyword = await HasForbiddenKeywordAsync(66, "facebook", "instagram");
                System.Console.WriteLine($"HasForbiddenKeywordAsync : {IsforbiddenKeyword}");
            });

Paulo Quicoli

unread,
Oct 30, 2020, 3:57:18 PM10/30/20
to nhusers
Hi

Open and close a session inside your for each loop and you will be fine

--
You received this message because you are subscribed to the Google Groups "nhusers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nhusers+u...@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/nhusers/15a15084-855f-4d1c-9ebd-ea1b62cf9a0cn%40googlegroups.com.

Gunnar Liljas

unread,
Oct 30, 2020, 4:04:11 PM10/30/20
to nhu...@googlegroups.com
You really shouldn't combine Parallel.For and async. Ever. And NHibernate's session isn't thread safe, so it shouldn't be used from multiple threads.

/G

--

Hakan karaoğlu

unread,
Oct 30, 2020, 8:44:20 PM10/30/20
to nhusers
The parallel loop is just one example. I am using the methods called here within a rabbitmq consumer.

For Example:

public async Task Consume(ConsumeContext<WorkspaceDispatcherMessage> context)
        {
            var wsMessage = context.Message;
            try
            {
                var feedDispatcher = FeedDispatcherFactory.CreateDispatcher(wsMessage.FeedProxy, wsMessage.Keywords);               
                var workspaceAsync = await feedDispatcher.GetWorkspaceAsync(wsMessage.FeedProxy, wsMessage.Keywords);                

                foreach (var workspaceId in workspaceAsync.Keys)
                {
                    var sentimentDetectionMessage = new SentimentDetectionMessage
                    {
                        ConversationId = wsMessage.ConversationId,
                        FeedProxy = wsMessage.FeedProxy,
                        FeederProxy = wsMessage.FeederProxy,
                        Keywords = workspaceAsync[workspaceId],
                        WorkspaceId = workspaceId
                    };
                    await _bus.Publish(sentimentDetectionMessage);
                }
            }
            catch (Exception exp)
            {                
                throw exp;
            }
        }

30 Ekim 2020 Cuma tarihinde saat 23:04:11 UTC+3 itibarıyla Gunnar Liljas şunları yazdı:

satis...@gmail.com

unread,
Oct 30, 2020, 8:44:20 PM10/30/20
to nhusers
Nhibernate session is not thread safe.

Hakan karaoğlu

unread,
Oct 30, 2020, 8:44:20 PM10/30/20
to nhusers
Hi,
Firstly, thank you so much for reply 
My project's Nhibernate configuration is as follows. Is there anything wrong here, is there a point I'm missing?

public static IServiceCollection AddSniffactorNhibernate(this IServiceCollection services, string connectionString)
        {
            Check.NotNull(connectionString);

            ISessionFactory _sessionFactory = BuildSessionFactory(connectionString);

            services.AddSingleton(_sessionFactory);
            services.AddScoped<ISession>(factory => factory.GetServices<ISessionFactory>().First().OpenSession());
            services.AddScoped(typeof(IRepository<>), typeof(NhRepositoryBase<>));
            services.AddSingleton(typeof(IUnitOfWork), typeof(NhUnitOfWork));

            ConsoleHelper.WriteWarningCommandLine(">>> NHibernate Configurasyonu tamamlandı.");

            return services;
        }

        private static ISessionFactory BuildSessionFactory(string connectionString)
        {
            FluentConfiguration configuration = Fluently.Configure()
                .Database(OracleManagedDataClientConfiguration.Oracle9.ConnectionString(connectionString))
                .Mappings(m => m.FluentMappings
                    .AddFromAssemblyOf<ISniffactorMapping>()
                    .Conventions.Add(
                        ConventionBuilder.Property.When(criteria => criteria.Expect(x => x.Nullable, Is.Not.Set), x => x.Not.Nullable()))
                    .Conventions.Add<OtherConversions>()
                );

            return configuration.BuildSessionFactory();
        }

private class OtherConversions : IHasManyConvention, IReferenceConvention
        {
            public void Apply(IOneToManyCollectionInstance instance)
            {
                instance.LazyLoad();
                instance.AsBag();
                instance.Cascade.SaveUpdate();
                instance.Inverse();
            }

            public void Apply(IManyToOneInstance instance)
            {
                instance.LazyLoad(Laziness.Proxy);
                instance.Cascade.None();
                instance.Not.Nullable();
            }
        }
30 Ekim 2020 Cuma tarihinde saat 22:57:18 UTC+3 itibarıyla Quicoli şunları yazdı:

Gunnar Liljas

unread,
Oct 31, 2020, 8:09:38 PM10/31/20
to nhu...@googlegroups.com
What is the error you're getting?

--
You received this message because you are subscribed to the Google Groups "nhusers" group.
To unsubscribe from this group and stop receiving emails from it, send an email to nhusers+u...@googlegroups.com.

Hakan karaoğlu

unread,
Nov 2, 2020, 4:59:58 PM11/2/20
to nhu...@googlegroups.com
Stack trace error and giving "object reference error" as follows.Interesting is that the same code works without errors in a normal for loop.

 at OracleInternal.ServiceObjects.OracleCommandImpl.ExtractAccessorValuesIntoParam(OracleParameterCollection paramColl, OracleConnection connection, Int32 paramCount, String commandText, Int64 longFetchSize, Int64 clientInitialLOBFS, Int64 internalInitialLOBFS, Int64[] scnFromExecution, Boolean bCallFromExecuteReader)
   at Oracle.ManagedDataAccess.Client.OracleCommand.ExecuteReader(Boolean requery, Boolean fillRequest, CommandBehavior behavior)
   at Oracle.ManagedDataAccess.Client.OracleCommand.ExecuteDbDataReader(CommandBehavior behavior)
   at System.Data.Common.DbCommand.ExecuteReader()
   at NHibernate.AdoNet.AbstractBatcher.ExecuteReader(DbCommand cmd)
   at NHibernate.Loader.Loader.GetResultSet(DbCommand st, QueryParameters queryParameters, ISessionImplementor session, IResultTransformer forcedResultTransformer)
   at NHibernate.Loader.Loader.DoQuery(ISessionImplementor session, QueryParameters queryParameters, Boolean returnProxies, IResultTransformer forcedResultTransformer)
   at NHibernate.Loader.Loader.DoQueryAndInitializeNonLazyCollections(ISessionImplementor session, QueryParameters queryParameters, Boolean returnProxies, IResultTransformer forcedResultTransformer)
   at NHibernate.Loader.Loader.DoList(ISessionImplementor session, QueryParameters queryParameters, IResultTransformer forcedResultTransformer)



Gunnar Liljas <gunnar...@gmail.com>, 1 Kas 2020 Paz, 03:09 tarihinde şunu yazdı:

Hakan karaoğlu

unread,
Nov 2, 2020, 5:11:23 PM11/2/20
to nhu...@googlegroups.com
By the way, I create as Nhibernate session follows,

public static class SniffactorNHibernateExtension
    {

        public static IServiceCollection AddSniffactorNhibernate(this IServiceCollection services, string connectionString)
        {
            Check.NotNull(connectionString);

            ISessionFactory _sessionFactory = Fluently.Configure()
                .Database(OracleManagedDataClientConfiguration.Oracle9
                    .ConnectionString(connectionString)
                    .DefaultSchema("IZCI")
                 )

                .Mappings(m => m.FluentMappings.AddFromAssemblyOf<ISniffactorMapping>())
                .BuildSessionFactory();

            services.AddSingleton(_sessionFactory);
            services.AddScoped(factory => { return _sessionFactory.OpenSession(); });
            services.AddScoped(typeof(IRepository<>), typeof(NhRepositoryBase<>));

            return services;
        }

        private static ISessionFactory BuildSessionFactory(string connectionString)
        {
            FluentConfiguration configuration = Fluently.Configure()
                .Database(OracleManagedDataClientConfiguration.Oracle9.ConnectionString(connectionString).DefaultSchema("IZCI"))

                .Mappings(m => m.FluentMappings
                    .AddFromAssemblyOf<ISniffactorMapping>()
                .Conventions.Add(
                    ConventionBuilder.Property.When(criteria => criteria.Expect(x => x.Nullable, Is.Not.Set), x => x.Not.Nullable()))
                .Conventions.Add<OtherConversions>()
                );

            return configuration.BuildSessionFactory();
        }

        private class OtherConversions : IHasManyConvention, IReferenceConvention
        {
            public void Apply(IOneToManyCollectionInstance instance)
            {
                instance.LazyLoad();
                instance.AsBag();
                instance.Cascade.SaveUpdate();
                instance.Inverse();
            }

            public void Apply(IManyToOneInstance instance)
            {
                instance.LazyLoad(Laziness.Proxy);
                instance.Cascade.None();
                instance.Not.Nullable();
            }
        }
    }

Hakan karaoğlu <hknn.k...@gmail.com>, 3 Kas 2020 Sal, 00:59 tarihinde şunu yazdı:

Gunnar Liljas

unread,
Nov 2, 2020, 6:40:25 PM11/2/20
to nhu...@googlegroups.com
When you say "normal for loop", what is not normal with this one? 

/G

Hakan karaoğlu

unread,
Nov 3, 2020, 12:42:52 PM11/3/20
to nhu...@googlegroups.com

I'm sorry I couldn't explain my problem. Let me explain again now. The problem is, Nhibernate session is not working correctly in async methods, I could not configure it. Now, I'm sending 1 message to RabbitMQ consumers, it works without errors, but when I send 100 messages it gets an error. So I tested the similar scenario with synchronous loop and asynchronous loop.

For example : Similar loops, one sync and the other async. When I run it separately, the sync cycle ends successfully. Doesn't give an error but parallel loop gives the attached error. I made the Nhibernate configuration in the attached file.

I hope this time I was able to explain my problem:) By the way, thank you for support @Gunnar Liljas

            for (int i = 1; i < 101; i++)
            {
                var ws = _workspaceRepository.GetWorkspaceByIdAsync(66);
                System.Console.WriteLine($"GetWorkspaceByIdAsync : {ws.Name}");
                var keywords = _workspaceKeywordRepository.GetForbiddenKeywordsByWorkspaceAsync(66);
                System.Console.WriteLine($"GetForbiddenKeywordsByWorkspaceAsync : {keywords.Count}");
                var IsforbiddenKeyword = HasForbiddenKeywordAsync(66, "Test Postman", "Test Postman");
                System.Console.WriteLine($"HasForbiddenKeywordAsync : {IsforbiddenKeyword}");
                System.Console.WriteLine($"------------------>{i}");
            }

            Parallel.For(0, 100, async act =>
            {
                var ws = _workspaceRepository.GetWorkspaceByIdAsync(66);
                System.Console.WriteLine($"GetWorkspaceByIdAsync : {ws.Name}");
                var keywords = _workspaceKeywordRepository.GetForbiddenKeywordsByWorkspaceAsync(66);
                System.Console.WriteLine($"GetForbiddenKeywordsByWorkspaceAsync : {keywords.Count}");
                var IsforbiddenKeyword = HasForbiddenKeywordAsync(66, "Test Postman", "Test Postman");
                System.Console.WriteLine($"HasForbiddenKeywordAsync : {IsforbiddenKeyword}");
                System.Console.WriteLine($"------------------>{act}");
            });  

            System.Console.ReadKey();

image.png
image.png

3 Kas 2020 Sal 02:40 tarihinde Gunnar Liljas <gunnar...@gmail.com> şunu yazdı:

Hakan karaoğlu

unread,
Nov 3, 2020, 12:46:21 PM11/3/20
to nhu...@googlegroups.com


Hakan karaoğlu <hknn.k...@gmail.com>, 3 Kas 2020 Sal, 20:42 tarihinde şunu yazdı:
nhibernateconfiguration.PNG
asynclooperror.PNG

Hakan karaoğlu

unread,
Nov 3, 2020, 12:51:09 PM11/3/20
to nhu...@googlegroups.com
I did some research, I read that the Session object is not thread safe. They say you should use ISessionFactory instead. How true is that?

Hakan karaoğlu <hknn.k...@gmail.com>, 3 Kas 2020 Sal, 20:42 tarihinde şunu yazdı:

Oskar Berggren

unread,
Nov 3, 2020, 2:40:21 PM11/3/20
to nhusers
Yes, the NHibernate session does a huge amount of internal state tracking, and is not designed to be concurrently used by multiple threads, as was already mentioned previously in this thread:

On Sat, Oct 31, 2020 at 1:44 AM satis...@gmail.com <satis...@gmail.com> wrote:
Nhibernate session is not thread safe.





In fact, because of proxy-objects, automatic dirty-tracking and flushing etc, you are better off to consider the entire unit-of-work as owned by a single thread (at a time). What that means, for example, is to not load two objects from a single session and then have two different threads working concurrently on those objects while the objects are still associated with an open session, as it could for example trigger lazy loading on both threads, and they would both hit the same session.
 

> They say you should use ISessionFactory instead. How true is that?

The statement is a little weird. It's like saying you should use a car factory instead of a car. A car factory won't actually take you anywhere. It's not "instead of". However, if you need more sessions, you should of course use the session factory to create them.


Oskar Berggren

unread,
Nov 3, 2020, 2:51:02 PM11/3/20
to nhusers
From a design perspective, I should point out one more thing here.

If you have a loop because you want to do something on 100 objects, and try to speed it up by having multiple concurrent threads each work on one object at the time, it is probably not the best solution. Doing repeated SQL statements like that have a lot of overhead, as it would still issue 100 SQL statements and wait for each one to complete. If you can, try to instead use fewer SQL statements to fetch multiple objects.

A single SELECT where id IN (... 100 identifiers ...) is likely to be much faster than 100 separate SELECT where Id = aSingleId.

Another way to do it is to use session.Load(anId) 100 times to create 100 proxies cached in a list _before_ proceeding to work on them. When your code begins to work on 1 of them, NHibernate will automatically fetch several of them using a single SQL statement. This is controlled by the batch size setting and mapping property:

Hakan karaoğlu

unread,
Nov 3, 2020, 2:59:10 PM11/3/20
to nhu...@googlegroups.com
Oskar,Thank you for reply 
loops just an example, the business logic that will actually work is as follows. 

       public Task Consume(ConsumeContext<WorkspaceDispatcherMessage> context)

        {
            var wsMessage = context.Message;
            try
            {
                _logger.Information("{@conversationId} {@message} {methodname}", wsMessage.ConversationId, wsMessage, "WorkspaceDispatcherConsumer");


                var feedDispatcher = FeedDispatcherFactory.CreateDispatcher(wsMessage.FeedProxy, wsMessage.Keywords);
                Console.WriteLine($"Dispatcher : {feedDispatcher.GetType()}");
                var workspaceAsync = feedDispatcher.GetWorkspaceAsync(wsMessage.FeedProxy, wsMessage.Keywords);
                Console.WriteLine($"workspaceAsync : {workspaceAsync.Count}");


                foreach (var workspaceId in workspaceAsync.Keys)
                {
                    var sentimentDetectionMessage = new SentimentDetectionMessage
                    {
                        ConversationId = wsMessage.ConversationId,
                        FeedProxy = wsMessage.FeedProxy,
                        FeederProxy = wsMessage.FeederProxy,
                        Keywords = workspaceAsync[workspaceId],
                        WorkspaceId = workspaceId
                    };
                  await  _bus.Publish(sentimentDetectionMessage);
                }
            }
            catch (Exception exp)
            {
                _logger.Error("{@conversationId} {@message} {@stackTrace} {errorMessage}", wsMessage.ConversationId, wsMessage, exp.StackTrace, exp.Message);
                throw exp;
            }
        }

Oskar Berggren <oskar.b...@gmail.com>, 3 Kas 2020 Sal, 22:50 tarihinde şunu yazdı:

Gunnar Liljas

unread,
Nov 3, 2020, 8:24:37 PM11/3/20
to nhu...@googlegroups.com
First things first. A Parallel.For loop is not async, it's multithreaded. Different things. And as mentioned the session is not thread safe, so you can't use it in a multithreaded scenario. If you create the session inside the Parallel.For it would work, but using async methods within a Parallel.For is really not recommended. In general. It's not an Nhibernate thing.

/G

Hakan karaoğlu

unread,
Nov 4, 2020, 12:39:23 AM11/4/20
to nhu...@googlegroups.com
Gunnar, Ahh thank you for the info. Just, I want to learn correct. 

Now, forget the loops. As you know Rabbitmq consumers are methods of Task type. My main problem is already here. Here I am getting similar errors. I mentioned before, there is no error in a single message, but when I send 10 or more messages, the problem begin. As you said, should I create a session in the using block, not with the repository pattern, and do the database operations in that block?  What do you think is wrong with the consumer below?

An exemplary consumer:

public Task Consume(ConsumeContext<WorkspaceDispatcherMessage> context)
        {
            var wsMessage = context.Message;
            try
            {
                _logger.Information("{@conversationId} {@message} {methodname}", wsMessage.ConversationId, wsMessage, "WorkspaceDispatcherConsumer");

                var feedDispatcher = FeedDispatcherFactory.CreateDispatcher(wsMessage.FeedProxy, wsMessage.Keywords);
                Console.WriteLine($"Dispatcher : {feedDispatcher.GetType()}");
                var workspace = feedDispatcher.GetWorkspace(wsMessage.FeedProxy, wsMessage.Keywords);
                Console.WriteLine($"workspace : {workspace.Count}");

                if (workspace!=null && workspace.Count>0)
                {
                    foreach (var workspaceId in workspace.Keys)

                    {
                        var sentimentDetectionMessage = new SentimentDetectionMessage
                        {
                            ConversationId = wsMessage.ConversationId,
                            FeedProxy = wsMessage.FeedProxy,
                            FeederProxy = wsMessage.FeederProxy,
                            Keywords = workspace[workspaceId],
                            WorkspaceId = workspaceId
                        };
                        _bus.Publish(sentimentDetectionMessage);
                    }
                }
               
                return Task.CompletedTask;

            }
            catch (Exception exp)
            {
                _logger.Error("{@conversationId} {@message} {@stackTrace} {errorMessage}", wsMessage.ConversationId, wsMessage, exp.StackTrace, exp.Message);
                throw exp;
            }
        }

public override Dictionary<int, List<FeedKeywordProxy>> GetWorkspace(FeedProxy feedProxy, string[] tags)
        {
            if (tags != null && tags.Length > 0)
            {
                var feedKeywords = new List<FeedKeywordProxy>();
                for (int i = 0; i < tags.Length; i++)
                {
                    var tag = tags[i];
                    var keywords = _keywordDetectionViewRepository.GetKeywords(feedProxy.Source, tag, feedProxy.Language);
                    List<KeywordDetectionView> workspaces = keywords;
                    foreach (var ws in workspaces)
                    {
                        var keywordDetectionView = ws;
                        var feedKeyword = CreateFeedKeywordInstance(keywordDetectionView);
                        feedKeywords.Add(feedKeyword);
                    }
                }
                Dictionary<int, List<FeedKeywordProxy>> result = GroupFeedKeywordsByWorkspaceId(feedKeywords);
                return result;
            }
            return new Dictionary<int, List<FeedKeywordProxy>>();
        }

public class KeywordDetectionViewRepository : NhRepositoryBase<KeywordDetectionView>, IKeywordDetectionViewRepository
    {
        private readonly ISession _session;

        public KeywordDetectionViewRepository(ISession session) : base(session)
        {
            _session = session;
        }

        public List<KeywordDetectionView> GetKeywords(string source, string tag, LanguageTypes languageTypes)
        {
            var query = _session.CreateCriteria<KeywordDetectionView>();
            int keywordId = int.Parse(tag.Substring(2));

            if (tag.StartsWith("k_"))
            {
                query.Add(Restrictions.Eq("KeywordId", keywordId));
            }
            else if (tag.StartsWith("v_"))
            {
                query.Add(Restrictions.Eq("VariationId", keywordId));
            }
            query.Add(Restrictions.Eq("SourceCode", source));
            query.Add(Restrictions.Or(Restrictions.Eq("LanguageCode", languageTypes.ToString()), Restrictions.IsNull("LanguageCode")));
            return (List<KeywordDetectionView>)
               query.SetCacheable(false).List();
        }
    }

Gunnar Liljas <gunnar...@gmail.com>, 4 Kas 2020 Çar, 04:24 tarihinde şunu yazdı:
Reply all
Reply to author
Forward
0 new messages