very fast adding runnables in dispatch queue causes all are running in the same thread

62 views
Skip to first unread message

Jakub Neubauer

unread,
Feb 13, 2012, 8:34:33 AM2/13/12
to hawtdispatch
Hello,
I tried to implement some parallel algorithm, which basically starts
10 Runnables with Dispatch.getGlobalQueue().execute(Runnable), to
process 10 chunks of data and then waits for them to finish.
The self-describing output is as follows (thread name:

hawtdispatch-DEFAULT-1: chunk 0 done in 645 ms
hawtdispatch-DEFAULT-1: chunk 1 done in 644 ms
hawtdispatch-DEFAULT-1: chunk 2 done in 642 ms
hawtdispatch-DEFAULT-1: chunk 3 done in 654 ms
hawtdispatch-DEFAULT-1: chunk 4 done in 646 ms
hawtdispatch-DEFAULT-1: chunk 5 done in 641 ms
hawtdispatch-DEFAULT-1: chunk 6 done in 668 ms
hawtdispatch-DEFAULT-1: chunk 7 done in 645 ms
hawtdispatch-DEFAULT-1: chunk 8 done in 642 ms
hawtdispatch-DEFAULT-1: chunk 9 done in 642 ms
Result: 3.1415926545898296
Time: 6474 ms

Problem is that all Runnables are processed in the same thread,
sequentially.

I think that problem is in
org.fusesource.hawtdispatch.internal.NioManager.isSelecting(), which
can return false for a small time after wakeup() was called (see also
method NioManager.select(long)).
This causes SimplePool.execute(Runnable) to wakeup just the first
thread few times, when called repeatedly very fast.

I'm sure that in real production when using hawtdispatch library for
servers connection processing. this is not real issue since the
requests are with high probabliity better distributed in time. But for
usages as I used it, it is a problem.

I propose a patch, which is in fact quite easy. The patch was created
against GIT clone from 02/13/2012, 13:00

I apologize, but I cannot see "Submit new" in the issue tracker
http://www.assembla.com/spaces/hawtdispatch/support/tickets so I add
the patch here, because it is really short.

output of git diff:

diff --git a/hawtdispatch/src/main/java/org/fusesource/hawtdispatch/
internal/NioManager.java b/hawtdispatch/src/main/java/org/fusesource/
hawtdispatch/internal/NioManager.java
index 4f971f3..b6679fe 100644
--- a/hawtdispatch/src/main/java/org/fusesource/hawtdispatch/internal/
NioManager.java
+++ b/hawtdispatch/src/main/java/org/fusesource/hawtdispatch/internal/
NioManager.java
@@ -163,7 +163,7 @@ public class NioManager {
}

public boolean isSelecting() {
- return selecting;
+ return selecting && wakeupCounter == selectCounter;
}

/**
@@ -196,8 +196,8 @@ public class NioManager {
selectStrategy.select(timeout);
}
} finally {
- selectCounter = wakeupCounter;
selecting=false;
+ selectCounter = wakeupCounter;
}
}
} catch (CancelledKeyException e) {

Hiram Chirino

unread,
Feb 13, 2012, 10:49:50 AM2/13/12
to hawtdi...@googlegroups.com
Looks like good analysis and patch.  I'm applying the patch and deploying a new snapshot.  I'll also benchmark an app which uses HawtDispatch against this change to see what the effect is.  Hopefully it's an improvement!
--

Hiram Chirino

Software Fellow | FuseSource Corp.

chi...@fusesource.com | fusesource.com

skype: hiramchirino | twitter: @hiramchirino

blog: Hiram Chirino's Bit Mojo




Hiram Chirino

unread,
Feb 13, 2012, 2:33:33 PM2/13/12
to hawtdi...@googlegroups.com
Ok just completed my benchmarks, I saw no major change for my use case, so your fix is staying in.  Thanks for the contribution!

Jakub Neubauer

unread,
Feb 13, 2012, 2:46:21 PM2/13/12
to hawtdispatch
In fact, the problem remains, when for example two threads will be
adding new tasks to the queue in the same time. Then it is still
possible that they will wake up the same thread causing the tasks will
run sequentially, although there is enough free threads.
But in current implementation I can't see easy fix without
synchronized sections, so I think it is good balance between
performance and the "user experience" of the API.
Thank you for such quick response.
Jakub

Hiram Chirino

unread,
Feb 13, 2012, 3:07:39 PM2/13/12
to hawtdi...@googlegroups.com
If we made

    final protected AtomicInteger wakeupCounter = new AtomicInteger();

and

    public boolean wakeupIfSelecting() {
        int wc = wakeupCounter.get();
        if( selecting && wc == selectCounter && wakeupCounter.compareAndSet(wc, wc+1)) {
            selector.wakeup();
            return true;
        }
        return false;
    }

Then I think we could cover that case too.  What do you think?

Hiram Chirino

unread,
Feb 13, 2012, 8:52:30 PM2/13/12
to hawtdi...@googlegroups.com
Hi Jakub,

I've just committed a version of what I proposed,  see:

If you get a chance could you validate that it handles even "two threads will be
adding new tasks to the queue in the same time" case?

Thanks,
Hiram

Jakub Neubauer

unread,
Feb 14, 2012, 3:25:21 PM2/14/12
to hawtdispatch
Great, I was thinking about AtomicInteger, but couldn't get it to the
result.
I'll look on it tommorow. Thank you.

On 13 ún, 21:07, Hiram Chirino <hi...@hiramchirino.com> wrote:
> If we made
>
>     final protected AtomicInteger wakeupCounter = new AtomicInteger();
>
> and
>
>     public boolean wakeupIfSelecting() {
>         int wc = wakeupCounter.get();
>         if( selecting && wc == selectCounter &&
> wakeupCounter.compareAndSet(wc, wc+1)) {
>             selector.wakeup();
>             return true;
>         }
>         return false;
>     }
>
> Then I think we could cover that case too.  What do you think?
>
> On Mon, Feb 13, 2012 at 2:46 PM, Jakub Neubauer <jakub.neuba...@gmail.com>wrote:
>
>
>
> > In fact, the problem remains, when for example two threads will be
> > adding new tasks to the queue in the same time. Then it is still
> > possible that they will wake up the same thread causing the tasks will
> > run sequentially, although there is enough free threads.
> > But in current implementation I can't see easy fix without
> > synchronized sections, so I think it is good balance between
> > performance and the "user experience" of the API.
> > Thank you for such quick response.
> > Jakub
>
> > On 13 ún, 20:33, Hiram Chirino <hi...@hiramchirino.com> wrote:
> > > Ok just completed my benchmarks, I saw no major change for my use case,
> > so
> > > your fix is staying in.  Thanks for the contribution!
>
> > > On Mon, Feb 13, 2012 at 10:49 AM, Hiram Chirino <hi...@hiramchirino.com
> > >wrote:
>
> --
>
> **
>
> *Hiram Chirino*
>
> *Software Fellow | FuseSource Corp.*
>
> *chir...@fusesource.com | fusesource.com*
>
> *skype: hiramchirino | twitter: @hiramchirino<http://twitter.com/hiramchirino>
> *
>
> *blog: Hiram Chirino's Bit Mojo <http://hiramchirino.com/blog/>*
>
> *
> *
>
> *
> *

Jakub Neubauer

unread,
Feb 15, 2012, 8:08:10 AM2/15/12
to hawtdispatch
I don't see problem in your commit. But now I'm wondering why this
test fails? Maybe there is some mistake in it... (Don't be confused,
there are 2 tests, I'm talking about test1 ).

http://dl.dropbox.com/u/14750848/DispatchExample.java

Sometimes the test finishes, but sometimes hangs.

Jakub

On Feb 14, 2:52 am, Hiram Chirino <hi...@hiramchirino.com> wrote:
> Hi Jakub,
>
> I've just committed a version of what I proposed,  see:https://github.com/fusesource/hawtdispatch/commit/e76e4f59af9e1d36d18...
>
> If you get a chance could you validate that it handles even "two threads
> will be
> adding new tasks to the queue in the same time" case?
>
> Thanks,
> Hiram
>
> On Mon, Feb 13, 2012 at 3:07 PM, Hiram Chirino <hi...@hiramchirino.com>wrote:
>
>
>
>
>
>
>
>
>
> > If we made
>
> >     final protected AtomicInteger wakeupCounter = new AtomicInteger();
>
> > and
>
> >     public boolean wakeupIfSelecting() {
> >         int wc = wakeupCounter.get();
> >         if( selecting && wc == selectCounter &&
> > wakeupCounter.compareAndSet(wc, wc+1)) {
> >             selector.wakeup();
> >             return true;
> >         }
> >         return false;
> >     }
>
> > Then I think we could cover that case too.  What do you think?
>
> > On Mon, Feb 13, 2012 at 2:46 PM, Jakub Neubauer <jakub.neuba...@gmail.com>wrote:
>
> >> In fact, the problem remains, when for example two threads will be
> >> adding new tasks to the queue in the same time. Then it is still
> >> possible that they will wake up the same thread causing the tasks will
> >> run sequentially, although there is enough free threads.
> >> But in current implementation I can't see easy fix without
> >> synchronized sections, so I think it is good balance between
> >> performance and the "user experience" of the API.
> >> Thank you for such quick response.
> >> Jakub
>
> >> On 13 ún, 20:33, Hiram Chirino <hi...@hiramchirino.com> wrote:
> >> > Ok just completed my benchmarks, I saw no major change for my use case,
> >> so
> >> > your fix is staying in.  Thanks for the contribution!
>
> >> > On Mon, Feb 13, 2012 at 10:49 AM, Hiram Chirino <hi...@hiramchirino.com
> >> >wrote:
>
> > --
>

Hiram Chirino

unread,
Feb 15, 2012, 10:23:04 AM2/15/12
to hawtdi...@googlegroups.com
Nice test!  The following commit seems to fix it for me: https://github.com/fusesource/hawtdispatch/commit/036f8c26d513295c55ed7127b09574083c719786
--

Hiram Chirino

Software Fellow | FuseSource Corp.

chi...@fusesource.com | fusesource.com

skype: hiramchirino | twitter: @hiramchirino

Jakub Neubauer

unread,
Feb 24, 2012, 4:13:50 AM2/24/12
to hawtdispatch
Although I don't completely understand why the previous version didn't
work, this seems to be working. Thank you

On Feb 15, 4:23 pm, Hiram Chirino <hi...@hiramchirino.com> wrote:
> Nice test!  The following commit seems to fix it for me:https://github.com/fusesource/hawtdispatch/commit/036f8c26d513295c55e...
>
> On Wed, Feb 15, 2012 at 8:08 AM, Jakub Neubauer <jakub.neuba...@gmail.com>wrote:
>
Reply all
Reply to author
Forward
0 new messages