http://hg.openjdk.java.net/jdk7/hotspot/jdk/rev/025f9e57566a
I don't know if this is a bug in the jdk or if lamina might be doing
something incorrect with the semaphore used to guard an observer.
-Jeff
java.lang.Error: Maximum permit count exceeded
at java.util.concurrent.Semaphore$Sync.tryReleaseShared(Semaphore.java:197)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.releaseShared(AbstractQueuedSynchronizer.java:1340)
at java.util.concurrent.Semaphore.release(Semaphore.java:615)
at lamina.core.observable.Observable.close(observable.clj:71)
at lamina.core.observable$siphon$fn__17034$fn__17038.invoke(observable.clj:234)
at lamina.core.observable$observer$reify__16758.on_close(observable.clj:43)
at lamina.core.observable.Observable.close(observable.clj:125)
at lamina.core.channel$close.invoke(channel.clj:127)
at aleph.tcp$basic_server_pipeline$fn__22437.invoke(tcp.clj:94)
at aleph.netty$channel_close_stage$fn__21499.invoke(netty.clj:156)
at aleph.netty$upstream_stage$reify__21482.handleUpstream(netty.clj:122)
at aleph.netty$upstream_stage$reify__21482.handleUpstream(netty.clj:124)
at aleph.netty$upstream_stage$reify__21482.handleUpstream(netty.clj:123)
at org.jboss.netty.handler.codec.frame.FrameDecoder.cleanup(FrameDecoder.java:344)
at org.jboss.netty.handler.codec.frame.FrameDecoder.channelDisconnected(FrameDecoder.java:226)
at aleph.netty$upstream_stage$reify__21482.handleUpstream(netty.clj:124)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:587)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:356)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)
at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWork
Zach
user=> (/ (* 1e6 5) 1000 60.0)
83.33333333333333
an hour and a half.
Why does calling a closed handler result in something so non-deterministic?
But really, a million iterations is probably overkill. Feel free to
stop it sooner if you're not seeing the issue.
Zach
I'd be interested to understand your thinking about the locking
though. For example, in the implementation of close in the Observable
defrecord form, it closes, locks, calls the on-close handlers, and
then unsubscribes the handlers, but I would have thought you could
read the value of @observers once, call the associated handlers, and
then unsubscribe them, without locking. New subscribers will fail
anytime after the compare-and-set! of closed? so they don't need to be
locked out, right? If @observers is dereferenced once after the
compare-and-set! at the top of the function then it will be consistent
for both calling the on-closed handlers and unsubscribing them. Also,
I noticed that unsubscribe passes false for the closed? parameter to
modify-observers, which in the case of being called inside close is
not true.
I am still just feeling my way around so I'm sure I don't grok all of
what's happening here, as I've only used semaphores to guard pools of
objects and I've never used thread locals. Can you explain how this
works? Clearly it is working well, but I don't understand the need
for both the thread-local counter and the semaphore, which itself
maintains a counter.
Thanks,
Jeff
Thread 1 checks closed?, which is false
Thread 2 sets closed? to true
Thread 2 triggers the on-closed event for all subscribers
Thread 1 adds a new subscriber which will never get the on-closed event
If you take out the locks, the test-on-closed function you're running
will generally fail 1 time in 100. Similar issues can occur with
sending a message after on-closed callbacks have been triggered.
What we want, then, is to be sure that nothing else is going on when
we close an observable. However, we don't want to serialize
everything; we're perfectly happy having two threads sending a message
through the observable at the same time, or one thread adding a
subscriber while another sends a message.
The solution I use for this is to create a semaphore with
Integer/MAX_VALUE permits, have each subscribe and message operation
take a single permit, and have the close operation take all the
permits. This means that an arbitrary number of messages and
subscriptions can happen concurrently, but a close operation will wait
for all pending messages to clear, and then all further messages will
have to wait until it's complete.
But there's an additional problem: what if the close operation occurs
inside a message callback? The message operation has already consumed
a single permit (and who knows how deeply we're nested), so if we
simply have (close ...) consume Integer/MAX_VALUE permits, it will
hang forever. My solution for this is to have a ThreadLocal variable
keeping track of how many permits have been consumed in the context of
the current thread, and only consume the difference between that value
and Integer/MAX_VALUE.
This re-entrant locking mechanism also allows for stranger behavior,
like trying to send a message into a channel inside an on-close
callback, or trying to close the channel a second time within an
on-close callback, and so on.
The exception that's being thrown is that the permits are overflowing
from Integer/MAX_VALUE to a negative value. This check only exists in
JDK 7, so it's likely that it was silently overflowing previously, and
it's a flaw in my code. I've identified one scenario where that could
happen, but as far as I can tell it would only be when a thread is
interrupted. Regardless, I've pushed a fix. If this stress test
doesn't cause any errors, please pull the latest and let me know if
you're still seeing the issue.
Thanks,
Zach
Anyways, great stuff, thanks for Lamina.
-Jeff