Hi Hiram
Software: Apache Camel 2.3-SNAPSHOT (trunk)
http://camel.apache.org/
Unit tests from Camel component: camel-hawtdb
HawtDB 1.0.0
You can let the Camel aggregator use completion interval, which means
that every X period, the current in flight aggregated messages will be
completed
and published out of the aggregator.
To do this there is a background task which read the keys from the
hawtdb file. It uses the following code
public Set<String> getKeys() {
final Set<String> keys = new LinkedHashSet<String>();
hawtDBFile.execute(new Work<Buffer>() {
public Buffer execute(Transaction tx) {
// interval task could potentially be running while we
are shutting down so check for that
if (!isRunAllowed()) {
return null;
}
Index<Buffer, Buffer> indexCompleted =
hawtDBFile.getRepositoryIndex(tx, getRepositoryName());
Iterator<Map.Entry<Buffer, Buffer>> it =
indexCompleted.iterator();
// scan could potentially be running while we are
shutting down so check for that
while (it.hasNext() && isRunAllowed()) {
Map.Entry<Buffer, Buffer> entry = it.next();
Buffer keyBuffer = entry.getKey();
String key;
try {
key = marshaller.unmarshallKey(keyBuffer);
} catch (IOException e) {
throw new RuntimeException("Error
unmarshalling key: " + keyBuffer, e);
}
if (key != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("getKey [" + key + "]");
}
keys.add(key);
}
}
return null;
}
@Override
public String toString() {
return "getKeys";
}
});
return Collections.unmodifiableSet(keys);
}
You can also check the source code HawtDBAggregationRepository.
However I have seen this in the logs, when running a test with it.
2010-04-20 12:34:03,907 [eTimeoutChecker] WARN
HawtDBFile - Error executing work getKeys will do
rollback.
org.fusesource.hawtdb.api.OptimisticUpdateException
at org.fusesource.hawtdb.internal.page.Commit.commitCheck(Commit.java:
77)
at
org.fusesource.hawtdb.internal.page.SnapshotHead.commitCheck(SnapshotHead.java:
148)
at
org.fusesource.hawtdb.internal.page.HawtTxPageFile.commit(HawtTxPageFile.java:
323)
at
org.fusesource.hawtdb.internal.page.HawtTransaction.commit(HawtTransaction.java:
250)
at
org.apache.camel.component.hawtdb.HawtDBFile.execute(HawtDBFile.java:
111)
at
org.apache.camel.component.hawtdb.HawtDBAggregationRepository.getKeys(HawtDBAggregationRepository.java:
226)
at org.apache.camel.processor.aggregate.AggregateProcessor
$AggregationIntervalTask.run(AggregateProcessor.java:599)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
417)
at java.util.concurrent.FutureTask
$Sync.innerRunAndReset(FutureTask.java:280)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:135)
at java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:65)
at java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:146)
at java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:170)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:651)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:676)
at java.lang.Thread.run(Thread.java:613)
I am only doing a read-only operation here so no unit of work is
needed.
However I am puzzled why the commit check throws that exception?
In the AggregatorProcessor I use a Lock (from JDK) to only let 1
thread at any time work with the aggregation repository.
Maybe you can shed some lights?
--
Subscription settings:
http://groups.google.com/group/hawtdb/subscribe?hl=en