Hello everybody,
A few days ago I found this library and I was really impressed how easy to implement actor pattern with it. But I found one problem that I can't solve. I've build a prototype with actor-for-entity in mind and it will be nice it all messages for one entity will processed even after shutdown. So I thought to create one fiber for each 'entity' and some channels for some messages but I can't get list of processed and non-processed messages.
Here is I'm trying to save all messages into synchronized list
private class DurableChannelSubscription<T> extends BaseSubscription<T> {
private final Callback<T> _receiveMethod;
private final List<T> messages = Collections.synchronizedList(new ArrayList<T>());
public DurableChannelSubscription(DisposingExecutor queue, Callback<T> receiveMethod) {
this(queue, receiveMethod, null);
}
public DurableChannelSubscription(DisposingExecutor fiber, Callback<T> receiveMethod, Filter<T> filter) {
super(fiber, filter);
this._receiveMethod = receiveMethod;
}
/**
* Receives the event and queues the execution on the target execute.
*/
@Override
protected void onMessageOnProducerThread(final T msg) {
messages.add(msg);
Runnable asyncExec = new Runnable() {
public void run() {
try {
_receiveMethod.onMessage(msg);
} finally {
messages.remove(msg);
}
}
@Override
public String toString() {
return _receiveMethod.toString() + "(" + msg + ")";
}
};
getQueue().execute(asyncExec);
}
}
And there is code how I'm using this subscription
ExecutorService service = Executors.newFixedThreadPool(3);
PoolFiberFactory fact = new PoolFiberFactory(service);
Fiber receiver = fact.create();
receiver.start();
Channel<String> channel = new MemoryChannel<String>();
Callback<String> onMsg = new Callback<String>() {
public void onMessage(String message) {
System.out.println(Thread.currentThread().getName() + " " + message);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
};
DurableChannelSubscription<String> sub = new DurableChannelSubscription<String>(receiver, onMsg);
channel.subscribe(sub);
for (int i = 0; i < 10; i++) {
channel.publish("Hello" + i);
}
Thread.sleep(500);
receiver.dispose();
fact.dispose();
service.shutdown();
try {
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
service.shutdownNow();
if (!service.awaitTermination(60, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
service.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println(sub.messages.toString());
And then I run it I see this exception
Exception in thread "pool-2-thread-2"
java.util.concurrent.RejectedExecutionException: Task org.jetlang.fibers.PoolFiber$1@f0548c rejected from java.util.concurrent.ThreadPoolExecutor@3ec646[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 1]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2001)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:816)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1337)
at org.jetlang.fibers.PoolFiber.flushIfNotPending(PoolFiber.java:57)
at org.jetlang.fibers.PoolFiber.flush(PoolFiber.java:69)
at org.jetlang.fibers.PoolFiber.access$000(PoolFiber.java:19)
at org.jetlang.fibers.PoolFiber$1.run(PoolFiber.java:36)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
Did I miss something? Or there is better way to save messages for shutdown?