Account Options

  1. Sign in
The old Google Groups will be going away soon, but your browser is incompatible with the new version.
Google Groups Home
« Groups Home
Message from discussion Durable message queues
The group you are posting to is a Usenet group. Messages posted to this group will make your email address visible to anyone on the Internet.
Your reply message has not been sent.
Your post was successful
 
From:
To:
Cc:
Followup To:
Add Cc | Add Followup-to | Edit Subject
Subject:
Validation:
For verification purposes please type the characters you see in the picture below or the numbers you hear by clicking the accessibility icon. Listen and type the numbers you hear
 
Alexey Abashev  
View profile  
 More options Apr 22 2012, 6:01 pm
From: Alexey Abashev <ale...@abashev.ru>
Date: Sun, 22 Apr 2012 15:01:02 -0700 (PDT)
Local: Sun, Apr 22 2012 6:01 pm
Subject: Durable message queues

Hello everybody,

A few days ago I found this library and I was really impressed how easy to
implement actor pattern with it. But I found one problem that I can't
solve. I've build a prototype with actor-for-entity in mind and it will be
nice it all messages for one entity will processed even after shutdown. So
I thought to create one fiber for each 'entity' and some channels for some
messages but I can't get list of processed and non-processed messages.
Here is I'm trying to save all messages into synchronized list

 private class DurableChannelSubscription<T> extends BaseSubscription<T> {
        private final Callback<T> _receiveMethod;

        private final List<T> messages = Collections.synchronizedList(new
ArrayList<T>());

        public DurableChannelSubscription(DisposingExecutor queue,
Callback<T> receiveMethod) {
            this(queue, receiveMethod, null);
        }

        public DurableChannelSubscription(DisposingExecutor fiber,
Callback<T> receiveMethod, Filter<T> filter) {
            super(fiber, filter);
            this._receiveMethod = receiveMethod;
        }

        /**
         * Receives the event and queues the execution on the target
execute.
         */
        @Override
        protected void onMessageOnProducerThread(final T msg) {
            messages.add(msg);

            Runnable asyncExec = new Runnable() {
                public void run() {
                    try {
                        _receiveMethod.onMessage(msg);
                    } finally {
                        messages.remove(msg);
                    }
                }

                @Override
                public String toString() {
                    return _receiveMethod.toString() + "(" + msg + ")";
                }
            };

            getQueue().execute(asyncExec);
        }
    }

And there is code how I'm using this subscription

        ExecutorService service = Executors.newFixedThreadPool(3);
        PoolFiberFactory fact = new PoolFiberFactory(service);
        Fiber receiver = fact.create();
        receiver.start();

        Channel<String> channel = new MemoryChannel<String>();

        Callback<String> onMsg = new Callback<String>() {
            public void onMessage(String message) {
                System.out.println(Thread.currentThread().getName() + " " +
message);

                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                }
            }
        };

        DurableChannelSubscription<String> sub = new
DurableChannelSubscription<String>(receiver, onMsg);
        channel.subscribe(sub);

        for (int i = 0; i < 10; i++) {
            channel.publish("Hello" + i);
        }

        Thread.sleep(500);

        receiver.dispose();
        fact.dispose();
        service.shutdown();

        try {
            if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
                service.shutdownNow();

                if (!service.awaitTermination(60, TimeUnit.SECONDS))
                    System.err.println("Pool did not terminate");
            }
        } catch (InterruptedException ie) {
            service.shutdownNow();

            Thread.currentThread().interrupt();
        }

        System.out.println(sub.messages.toString());

And then I run it I see this exception

Exception in thread "pool-2-thread-2"
java.util.concurrent.RejectedExecutionException: Task
org.jetlang.fibers.PoolFiber$1@f0548c rejected from
java.util.concurrent.ThreadPoolExecutor@3ec646[Shutting down, pool size =
1, active threads = 1, queued tasks = 0, completed tasks = 1]
at
java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Threa dPoolExecutor.java:2001)
at
java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:816)
at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:133 7)
at org.jetlang.fibers.PoolFiber.flushIfNotPending(PoolFiber.java:57)
at org.jetlang.fibers.PoolFiber.flush(PoolFiber.java:69)
at org.jetlang.fibers.PoolFiber.access$000(PoolFiber.java:19)
at org.jetlang.fibers.PoolFiber$1.run(PoolFiber.java:36)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 603)
at java.lang.Thread.run(Thread.java:722)

Did I miss something? Or there is better way to save messages for shutdown?


 
You must Sign in before you can post messages.
To post a message you must first join this group.
Please update your nickname on the subscription settings page before posting.
You do not have the permission required to post.