Durable message queues

70 views
Skip to first unread message

Alexey Abashev

unread,
Apr 22, 2012, 6:01:02 PM4/22/12
to jetla...@googlegroups.com
Hello everybody,

A few days ago I found this library and I was really impressed how easy to implement actor pattern with it. But I found one problem that I can't solve. I've build a prototype with actor-for-entity in mind and it will be nice it all messages for one entity will processed even after shutdown. So I thought to create one fiber for each 'entity' and some channels for some messages but I can't get list of processed and non-processed messages.
Here is I'm trying to save all messages into synchronized list

 private class DurableChannelSubscription<T> extends BaseSubscription<T> {
        private final Callback<T> _receiveMethod;

        private final List<T> messages = Collections.synchronizedList(new ArrayList<T>());

        public DurableChannelSubscription(DisposingExecutor queue, Callback<T> receiveMethod) {
            this(queue, receiveMethod, null);
        }

        public DurableChannelSubscription(DisposingExecutor fiber, Callback<T> receiveMethod, Filter<T> filter) {
            super(fiber, filter);
            this._receiveMethod = receiveMethod;
        }

        /**
         * Receives the event and queues the execution on the target execute.
         */
        @Override
        protected void onMessageOnProducerThread(final T msg) {
            messages.add(msg);

            Runnable asyncExec = new Runnable() {
                public void run() {
                    try {
                        _receiveMethod.onMessage(msg);
                    } finally {
                        messages.remove(msg);
                    }
                }

                @Override
                public String toString() {
                    return _receiveMethod.toString() + "(" + msg + ")";
                }
            };

            getQueue().execute(asyncExec);
        }
    }

And there is code how I'm using this subscription

        ExecutorService service = Executors.newFixedThreadPool(3);
        PoolFiberFactory fact = new PoolFiberFactory(service);
        Fiber receiver = fact.create();
        receiver.start();

        Channel<String> channel = new MemoryChannel<String>();

        Callback<String> onMsg = new Callback<String>() {
            public void onMessage(String message) {
                System.out.println(Thread.currentThread().getName() + " " + message);

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                }
            }
        };

        DurableChannelSubscription<String> sub = new DurableChannelSubscription<String>(receiver, onMsg);
        channel.subscribe(sub);

        for (int i = 0; i < 10; i++) {
            channel.publish("Hello" + i);
        }

        Thread.sleep(500);

        receiver.dispose();
        fact.dispose();
        service.shutdown();

        try {
            if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
                service.shutdownNow();

                if (!service.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            service.shutdownNow();

            Thread.currentThread().interrupt();
        }

        System.out.println(sub.messages.toString());

And then I run it I see this exception

Exception in thread "pool-2-thread-2"
java.util.concurrent.RejectedExecutionException: Task org.jetlang.fibers.PoolFiber$1@f0548c rejected from java.util.concurrent.ThreadPoolExecutor@3ec646[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 1]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2001)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:816)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1337)
at org.jetlang.fibers.PoolFiber.flushIfNotPending(PoolFiber.java:57)
at org.jetlang.fibers.PoolFiber.flush(PoolFiber.java:69)
at org.jetlang.fibers.PoolFiber.access$000(PoolFiber.java:19)
at org.jetlang.fibers.PoolFiber$1.run(PoolFiber.java:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)

Did I miss something? Or there is better way to save messages for shutdown?

William la Forge

unread,
Apr 22, 2012, 10:37:38 PM4/22/12
to jetla...@googlegroups.com
Oh ouch. For myself I very much prefer the crash-proof approach where you can pull the plug at any time and, on startup, everything recovers nicely. Things are always happening, and this becomes especially noticeable when you have a large farm of servers. Anything can happen at any time, so there is never any assurance of a proper close or a final flush to disk. 

So I feel that it is better to focus on the unpredictable immediate halt or even a partial halt like a disk or network controller failure. Any fancy logic to add graceful recoveries is harder to test and consequently likely to contain bugs that will bite you in production when you are least prepared for them.

Bill

--
You received this message because you are subscribed to the Google Groups "jetlang-dev" group.
To view this discussion on the web visit https://groups.google.com/d/msg/jetlang-dev/-/_I1fH9aHDOsJ.
To post to this group, send email to jetla...@googlegroups.com.
To unsubscribe from this group, send email to jetlang-dev...@googlegroups.com.
For more options, visit this group at http://groups.google.com/group/jetlang-dev?hl=en.

William la Forge

unread,
Apr 22, 2012, 10:39:46 PM4/22/12
to jetla...@googlegroups.com
Sorrry, I meant to say this:

Any fancy logic to add graceful a >>>shutdown<<< is harder to test and consequently likely to contain bugs that will bite you in production when you are least prepared for them. 

peter royal

unread,
Apr 22, 2012, 10:55:02 PM4/22/12
to jetla...@googlegroups.com
I mostly agree with Bill. 

the graceful shutdown approach I use (which is exercised very frequently as to not hit the unexpected case) is:

1) first gracefully stop anything that may be producing messages. 
2) send each fiber a 'checkpoint' message that decrements a shared CountdownLatch
3) when the latches fully complete, you know all the fibers have reached the same point and can continue to bring the system down.

obviously, it's possible that messages in the queue between 1 and 2 could case further work, but our system is written such that those messages are ok to get dropped (everything important that generates work that requires record keeping is stopped in step 1)

-pete

-- 
(peter.royal|osi)@pobox.com - http://fotap.org/~osi

Mike Rettig

unread,
Apr 24, 2012, 10:14:30 PM4/24/12
to jetla...@googlegroups.com
For most of the apps I work on, I first close the external connections
as these are most likely the critical event produces. At that point, I
actually don't care to process every event. I just want the app to
stop. Fibers are stopped in the reverse order of creation which
usually results in producers stopping before consumers.

Also the systems have on and off states. Prior to shutdown, systems
are turned 'off'. Once a system is off, I know it is not creating any
critical events so I can safely turn it off and I really don't have to
worry about trying to flush every event out of the system.

In the event I need to process every event, I use an approach similar
to Peter. I create a flush event with a countdown latch, pass it to
the fiber, and wait for it to be triggered when it is run. Once it is
run, I know the fiber has been fully flushed as long as there are no
other producers.

Mike

Reply all
Reply to author
Forward
0 new messages