Concurrency in 3.6 again

67 views
Skip to first unread message

Igor Fedurkin

unread,
May 16, 2014, 6:51:50 PM5/16/14
to rhino...@googlegroups.com
Hello society,
Bad news to begin with - sorry.

Something like this will deadlock (pseudo language):

class ListProcessor : IValueProcessor<List<TIn>, List<TOut>>
{
     IValueProcessor<TIn, TOut> _itemProcessor;
     ILog _unsuccesfulEnd;

     public List<TOut> Process(List<TIn> input)
     {
         return input.AsParallel().Select(TryExecute)
     }

     <enumerable of TOut> TryExecute<TIn>(input)
     {
         try
         {
              return new[]{_itemProcessor.Process(input)}
         }
         catch{return new[]}
     }
}

class Test:
   expected:
      itemProcessor.Expect(Process(1)).Return("1")
      itemProcessor.Expect(Process(2)).Throw(ex)
      _unsuccesfulEnd.Error(ex)
      itemProcessor.Expect(Process(3)).Throw()
      _unsuccesfulEnd.Error(ex)
      itemProcessor.Expect(Process(4)).Return("4")
      repository.Reply()
     
      var result = [1,2,3,4].Select(sut.Process)

expected: ["1", "4"]

Actual result is the infinitely hanging test.

Will give it another try try to debug tomorrow, but it doesn't look like a simple one.

Any help appreciated.

Regards,
Igor

Message has been deleted

Igor Fedurkin

unread,
May 22, 2014, 7:46:23 AM5/22/14
to rhino...@googlegroups.com
Hello,
Ok - next attempt to describe what's happening:

Test code:
        public void If_error_supression_is_on_then_problematic_items_will_be_skipped()
        {
            var exception1 = new Exception();
            var exception2 = new ArgumentException();
            var itemProcessor = Mock<IValueProcessor<int, string>>();
            var logger = Mock<ILog>();

            itemProcessor
                .Expect(p => p.Process(1))
                .Return("1");

            itemProcessor
                .Expect(p => p.Process(2))
                .Throw(exception1);

            logger.Error(exception1);

            itemProcessor
                .Expect(p => p.Process(3))
                .Throw(exception2);

            logger.Error(exception2);

            itemProcessor
                .Expect(p => p.Process(4))
                .Return("4");

            Replay();

            var processor = new ParallelArrayProcessor<int, string>(itemProcessor, true, logger,
                //TODO [IF] - find a way around this deadlock in rhino mocks. I hope Ayende could help to solve it quickly.
                true);

            var result = processor.Process(List(1, 2, 3, 4).ToArray());

            CollectionAssert.AreEquivalent(List("1", "4"), result.ToList());
        }

Where Mock<T>() asks repository instance to create a strict mock. And Reply() asks the repository instance to reply all. VerifyAll happens in the test tear down.

Processor code:
    public class ParallelArrayProcessor<TIn, TOut> : IValueProcessor<ReadOnlyArray<TIn>, ReadOnlyArray<TOut>>
    {
        private readonly ILog _logger;
        private readonly bool _supressErrors;
        private readonly bool _runSequentially;
        private readonly IValueProcessor<TIn, TOut> _itemProcessor;

        public ParallelArrayProcessor(IValueProcessor<TIn, TOut> itemProcessor, bool supressErrors, ILog logger, bool runSequentially)
        {
            _itemProcessor = itemProcessor.NotNull("itemProcessor");
            _supressErrors = supressErrors;
            _runSequentially = runSequentially;
            _logger = logger.NotNull("logger");
        }

        public ParallelArrayProcessor(IValueProcessor<TIn, TOut> itemProcessor, bool supressErrors)
            : this(itemProcessor, supressErrors,
                LogManager.GetLogger(typeof(ParallelArrayProcessor<TIn, TOut>)), false){}

        public ReadOnlyArray<TOut> Process(ReadOnlyArray<TIn> input)
        {
            return MakeParallel(input)
                .SelectMany(TryProcess)
                .ToArray();
        }

        //TODO [IF] - this nasty workaround was done because Rhino Mocks will end up with a deadlock if we just leave AsParallel() here
        private ParallelQuery<TIn> MakeParallel(ReadOnlyArray<TIn> input)
        {
            var result = input.AsParallel();
           
            return _runSequentially ? result.WithDegreeOfParallelism(1) : result;
        }

        private IEnumerable<TOut> TryProcess(TIn value)
        {
            try
            {
                return new[] {_itemProcessor.Process(value)};
            }
            catch (Exception ex)
            {
                if (!_supressErrors)
                    throw;

                _logger.Error(ex);
                return new TOut[0];
            }
        }
    }



Reply all
Reply to author
Forward
0 new messages