RavenDB 3.0 client hangs on custom factory task scheduler

169 views
Skip to first unread message

Ruslan Hnyp

unread,
Feb 14, 2015, 8:45:04 AM2/14/15
to rav...@googlegroups.com

In a project I am using LimitedConcurrencyLevelTaskScheduler

In ravenDB version 2.0 everything works fine. But now a simple query hangs at  Raven.Client.Extensions.TaskExtensions2.ResultUnwrap(Task`1 task)

Test method Tests.DbTests.TestMethod threw exception:

System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: Query failed. See inner exception for details. ---> System.Threading.Tasks.TaskCanceledException: A task was canceled.
    at
Raven.Client.Extensions.TaskExtensions2.ResultUnwrap(Task`1 task) in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Extensions\TaskExtensions2.cs: line 59
   at Raven.Client.Connection.ServerClient.Query(String index, IndexQuery query, String[] includes, Boolean metadataOnly, Boolean indexEntriesOnly) in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Connection\ServerClient.cs: line 256
 --- End of inner exception stack trace ---
    at Raven.Client.Connection.ServerClient.Query(String index, IndexQuery query, String[] includes, Boolean metadataOnly, Boolean indexEntriesOnly) in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Connection\ServerClient.cs: line 263
   at Raven.Client.Document.AbstractDocumentQuery`
2.ExecuteActualQuery() in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Document\AbstractDocumentQuery.cs: line 685
   at
Raven.Client.Document.AbstractDocumentQuery`2.InitSync() in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Document\AbstractDocumentQuery.cs: line 666
   at Raven.Client.Document.AbstractDocumentQuery`
2.ExecuteQueryOperation(Int32 take) in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Document\AbstractDocumentQuery.cs: line 1112
   at
Raven.Client.Document.AbstractDocumentQuery`2.SingleOrDefault() in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Document\AbstractDocumentQuery.cs: line 1104
   at Raven.Client.Linq.RavenQueryProviderProcessor`
1.GetQueryResult(IDocumentQuery`1 finalQuery) in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Linq\RavenQueryProviderProcessor.cs: line 1694
   at Raven.Client.Linq.RavenQueryProviderProcessor`
1.ExecuteQuery() in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Linq\RavenQueryProviderProcessor.cs: line 1623
   at
Raven.Client.Linq.RavenQueryProviderProcessor`1.Execute(Expression expression) in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Linq\RavenQueryProviderProcessor.cs: line 1597
   at Raven.Client.Linq.RavenQueryProvider`
1.Execute(Expression expression) in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Linq\RavenQueryProvider.cs: line 154
   at
Raven.Client.Linq.RavenQueryProvider`1.System.Linq.IQueryProvider.Execute(Expression expression) in c:\Builds\RavenDB-3.0-Unstable\Raven.Client.Lightweight\Linq\RavenQueryProvider.cs: line 200
   at System.Linq.Queryable.SingleOrDefault(IQueryable`
1 source, Expression`1 predicate)
   at Tests.DbTests.SearchById() in DbTests.cs: line 53
   at Tests.DbTests.<TestMethod>b__1() in DbTests.cs: line 43
   at System.Threading.Tasks.Task`
1.InnerInvoke()
   at
System.Threading.Tasks.Task.Execute()
 
--- End of inner exception stack trace ---
    at
System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at
System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at
System.Threading.Tasks.Task.Wait()
   at
Tests.DbTests.TestMethod() in DbTests.cs: line 44


Below is sample test code: 

using System;
using System.Collections.Generic;

using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Raven.Client.Embedded;


namespace Tests
{
   
[TestClass]
   
public class DbTests
   
{
       
private EmbeddableDocumentStore _documentStore;


       
public interface IIdentifier
       
{
           
string Id { get; set; }
       
}


       
public class Entity : IIdentifier
       
{
           
public string Id { get; set; }
       
}


       
[TestInitialize]
       
public void Initialize()
       
{
            _documentStore
= new EmbeddableDocumentStore
           
{
               
UseEmbeddedHttpServer = true,
               
RunInMemory = true,
           
};
            _documentStore
.Initialize();
       
}


       
[TestMethod]
       
public void TestMethod()
       
{
           
var scheduler = new LimitedTasksScheduler(1);
           
var taskFactory = new TaskFactory(scheduler);
           
           
var task = taskFactory.StartNew(() => SearchById<Entity>());
            task
.Wait();
           
           
Assert.AreEqual(null,task.Result);
       
}


       
private T SearchById<T>() where T:IIdentifier
       
{
           
using (var session = _documentStore.OpenSession())
           
{
               
return session.Query<T>().SingleOrDefault(x => x.Id == string.Empty);
           
}
       
}
   
}


   
/// <summary>
   
/// Provides a task scheduler that ensures a maximum concurrency level while
   
/// running on top of the ThreadPool.
   
/// </summary>
   
public class LimitedTasksScheduler : TaskScheduler
   
{
       
/// <summary>Whether the current thread is processing work items.</summary>
       
[ThreadStatic]
       
private static bool _currentThreadIsProcessingItems;
       
/// <summary>The list of tasks to be executed.</summary>
       
private readonly LinkedList<Task> _tasks = new LinkedList<Task>(); // protected by lock(_tasks)
       
/// <summary>The maximum concurrency level allowed by this scheduler.</summary>
       
private readonly int _maxDegreeOfParallelism;
       
/// <summary>Whether the scheduler is currently processing work items.</summary>
       
private int _delegatesQueuedOrRunning = 0; // protected by lock(_tasks)


       
/// <summary>
       
/// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
       
/// specified degree of parallelism.
       
/// </summary>
       
/// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
       
public LimitedTasksScheduler(int maxDegreeOfParallelism)
       
{
           
if (maxDegreeOfParallelism < 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
            _maxDegreeOfParallelism
= maxDegreeOfParallelism;
       
}


       
/// <summary>Queues a task to the scheduler.</summary>
       
/// <param name="task">The task to be queued.</param>
       
protected sealed override void QueueTask(Task task)
       
{
           
// Add the task to the list of tasks to be processed.  If there aren't enough
           
// delegates currently queued or running to process tasks, schedule another.
           
lock (_tasks)
           
{
                _tasks
.AddFirst(task);
               
if (_delegatesQueuedOrRunning >= _maxDegreeOfParallelism) return;


               
++_delegatesQueuedOrRunning;
               
NotifyThreadPoolOfPendingWork();
           
}
       
}


       
/// <summary>
       
/// Informs the ThreadPool that there's work to be executed for this scheduler.
       
/// </summary>
       
private void NotifyThreadPoolOfPendingWork()
       
{
           
ThreadPool.UnsafeQueueUserWorkItem(_ =>
           
{
               
// Note that the current thread is now processing work items.
               
// This is necessary to enable inlining of tasks into this thread.
                _currentThreadIsProcessingItems
= true;
               
try
               
{
                   
// Process all available items in the queue.
                   
while (true)
                   
{
                       
Task item;
                       
lock (_tasks)
                       
{
                           
// When there are no more items to be processed,
                           
// note that we're done processing, and get out.
                           
if (_tasks.Count == 0)
                           
{
                               
--_delegatesQueuedOrRunning;
                               
break;
                           
}


                           
// Get the next item from the queue
                            item
= _tasks.First.Value;
                            _tasks
.RemoveFirst();
                       
}


                       
// Execute the task we pulled out of the queue
                       
TryExecuteTask(item);
                   
}
               
}
               
// We're done processing items on the current thread
               
finally { _currentThreadIsProcessingItems = false; }
           
}, null);
       
}


       
/// <summary>Attempts to execute the specified task on the current thread.</summary>
       
/// <param name="task">The task to be executed.</param>
       
/// <param name="taskWasPreviouslyQueued"></param>
       
/// <returns>Whether the task could be executed on the current thread.</returns>
       
protected sealed override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
       
{
           
// If this thread isn't already processing a task, we don't support inlining
           
if (!_currentThreadIsProcessingItems) return false;


           
// If the task was previously queued, remove it from the queue
           
if (taskWasPreviouslyQueued) TryDequeue(task);


           
// Try to run the task.
           
return TryExecuteTask(task);
       
}


       
/// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
       
/// <param name="task">The task to be removed.</param>
       
/// <returns>Whether the task could be found and removed.</returns>
       
protected sealed override bool TryDequeue(Task task)
       
{
           
lock (_tasks) return _tasks.Remove(task);
       
}


       
/// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
       
public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }


       
/// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
       
/// <returns>An enumerable of the tasks currently scheduled.</returns>
       
protected sealed override IEnumerable<Task> GetScheduledTasks()
       
{
           
var lockTaken = false;
           
try
           
{
               
Monitor.TryEnter(_tasks, ref lockTaken);
               
if (lockTaken) return _tasks.ToArray();
               
else throw new NotSupportedException();
           
}
           
finally
           
{
               
if (lockTaken) Monitor.Exit(_tasks);
           
}
       
}
   
}
}



Oren Eini (Ayende Rahien)

unread,
Feb 16, 2015, 1:20:08 AM2/16/15
to ravendb
We make heavy use of the TPL internally, and we assume that async operation can run in a background thread, while the currently operation can wait for it when you are using sync.
The problem here is that you are running a task, that is waiting for an async operation, that can't complete because the only thread that would run it is waiting for the async operation to complete.
Set the limited concurrency to 2.

Hibernating Rhinos Ltd  

Oren Eini l CEO Mobile: + 972-52-548-6969

Office: +972-4-622-7811 l Fax: +972-153-4-622-7811

 


--
You received this message because you are subscribed to the Google Groups "RavenDB - 2nd generation document database" group.
To unsubscribe from this group and stop receiving emails from it, send an email to ravendb+u...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Ramon Smits

unread,
Jun 30, 2015, 2:47:28 PM6/30/15
to rav...@googlegroups.com

Hi Oren and Ruslan,

Increasing the limit with 1 will not help. If the limit is reached the blocking/deadlocking will occur.

What seems to happen is that tasks created in the RavenDB client are using the scheduler that was used on the task that called the RavenDB client instead of using the default scheduler.

The solution is to create tasks with the custom scheduler and specify on the task that the current scheduler should be hidden. This prevents it from being used by child tasks.

 Task.Factory
                        .StartNew(
                            () => Handle(context),
                            cancellationToken,
                            TaskCreationOptions.HideScheduler,
                            scheduler
                            );


This way tasks created by RavenDB client will use the default scheduler.


Hope this helps.


Regards,
Ramon
...

 
 

Oren Eini (Ayende Rahien)

unread,
Jul 1, 2015, 2:24:08 AM7/1/15
to ravendb
Yes, this is already handled in the latest unstable.

--

Ramon Smits

unread,
Jul 1, 2015, 2:57:00 AM7/1/15
to rav...@googlegroups.com

Hi Oren,

What did you change? That tasks are scheduled with the default scheduler?

Regards,
Ramon
...

Oren Eini (Ayende Rahien)

unread,
Jul 1, 2015, 3:00:50 AM7/1/15
to ravendb
With our own scheduler.

--
Reply all
Reply to author
Forward
0 new messages