[hawtdb] org.fusesource.hawtdb.api.OptimisticUpdateException

16 views
Skip to first unread message

davsclaus

unread,
Apr 22, 2010, 5:15:14 AM4/22/10
to hawtdb
Hi Hiram

Software: Apache Camel 2.3-SNAPSHOT (trunk)
http://camel.apache.org/

Unit tests from Camel component: camel-hawtdb

HawtDB 1.0.0



You can let the Camel aggregator use completion interval, which means
that every X period, the current in flight aggregated messages will be
completed
and published out of the aggregator.

To do this there is a background task which read the keys from the
hawtdb file. It uses the following code

public Set<String> getKeys() {
final Set<String> keys = new LinkedHashSet<String>();

hawtDBFile.execute(new Work<Buffer>() {
public Buffer execute(Transaction tx) {
// interval task could potentially be running while we
are shutting down so check for that
if (!isRunAllowed()) {
return null;
}

Index<Buffer, Buffer> indexCompleted =
hawtDBFile.getRepositoryIndex(tx, getRepositoryName());

Iterator<Map.Entry<Buffer, Buffer>> it =
indexCompleted.iterator();
// scan could potentially be running while we are
shutting down so check for that
while (it.hasNext() && isRunAllowed()) {
Map.Entry<Buffer, Buffer> entry = it.next();
Buffer keyBuffer = entry.getKey();

String key;
try {
key = marshaller.unmarshallKey(keyBuffer);
} catch (IOException e) {
throw new RuntimeException("Error
unmarshalling key: " + keyBuffer, e);
}
if (key != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("getKey [" + key + "]");
}
keys.add(key);
}
}
return null;

}

@Override
public String toString() {
return "getKeys";
}
});

return Collections.unmodifiableSet(keys);
}


You can also check the source code HawtDBAggregationRepository.

However I have seen this in the logs, when running a test with it.



2010-04-20 12:34:03,907 [eTimeoutChecker] WARN
HawtDBFile - Error executing work getKeys will do
rollback.
org.fusesource.hawtdb.api.OptimisticUpdateException
at org.fusesource.hawtdb.internal.page.Commit.commitCheck(Commit.java:
77)
at
org.fusesource.hawtdb.internal.page.SnapshotHead.commitCheck(SnapshotHead.java:
148)
at
org.fusesource.hawtdb.internal.page.HawtTxPageFile.commit(HawtTxPageFile.java:
323)
at
org.fusesource.hawtdb.internal.page.HawtTransaction.commit(HawtTransaction.java:
250)
at
org.apache.camel.component.hawtdb.HawtDBFile.execute(HawtDBFile.java:
111)
at
org.apache.camel.component.hawtdb.HawtDBAggregationRepository.getKeys(HawtDBAggregationRepository.java:
226)
at org.apache.camel.processor.aggregate.AggregateProcessor
$AggregationIntervalTask.run(AggregateProcessor.java:599)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
417)
at java.util.concurrent.FutureTask
$Sync.innerRunAndReset(FutureTask.java:280)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:135)
at java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:65)
at java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:146)
at java.util.concurrent.ScheduledThreadPoolExecutor
$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:170)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:651)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:676)
at java.lang.Thread.run(Thread.java:613)


I am only doing a read-only operation here so no unit of work is
needed.
However I am puzzled why the commit check throws that exception?

In the AggregatorProcessor I use a Lock (from JDK) to only let 1
thread at any time work with the aggregation repository.


Maybe you can shed some lights?


--
Subscription settings: http://groups.google.com/group/hawtdb/subscribe?hl=en

Hiram Chirino

unread,
Apr 22, 2010, 5:22:38 AM4/22/10
to haw...@googlegroups.com
It seems like your not doing any updates.. but the following stack
trace shows that you actually are doing an update.
Notice that the HawtDBFile.getRepositoryIndex() method creates a new
index if it does not already exist.

Camel Thread 1 - AggregateTimeoutChecker@984 daemon, prio=5, in group
'main', status: 'RUNNING'
at org.fusesource.hawtdb.internal.page.HawtTransaction.getUpdates(HawtTransaction.java:303)
at org.fusesource.hawtdb.internal.page.HawtTransaction.access$000(HawtTransaction.java:40)
at org.fusesource.hawtdb.internal.page.HawtTransaction$1.alloc(HawtTransaction.java:75)
at org.apache.camel.component.hawtdb.HawtDBFile.getRepositoryIndex(HawtDBFile.java:128)
at org.apache.camel.component.hawtdb.HawtDBAggregationRepository$5.execute(HawtDBAggregationRepository.java:234)
at org.apache.camel.component.hawtdb.HawtDBAggregationRepository$5.execute(HawtDBAggregationRepository.java:227)
at org.apache.camel.component.hawtdb.HawtDBFile.execute(HawtDBFile.java:106)
at org.apache.camel.component.hawtdb.HawtDBAggregationRepository.getKeys(HawtDBAggregationRepository.java:227)
at org.apache.camel.processor.aggregate.AggregateProcessor$AggregationIntervalTask.run(AggregateProcessor.java:599)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:181)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:205)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:637)
--
Regards,
Hiram

Blog: http://hiramchirino.com

Open Source SOA
http://fusesource.com/

davsclaus

unread,
Apr 22, 2010, 5:39:27 AM4/22/10
to hawtdb
Hi

Are you saying that when I just want to iterate the index in a sort of
read only mode, HawtDB may decide to create a new index.
And because of that its an update, and since I just read the keys,
then I do NOT do any actual writes?

Why do HawtDB want to create a new index, when I am just reading from
it?

And the errors is non nondeterministic and hard to reproduce.



On Apr 22, 11:22 am, Hiram Chirino <chir...@gmail.com> wrote:
> It seems like your not doing any updates.. but the following stack
> trace shows that you actually are doing an update.
> Notice that the HawtDBFile.getRepositoryIndex() method creates a new
> index if it does not already exist.
>
> Camel Thread 1 - AggregateTimeoutChecker@984 daemon, prio=5, in group
> 'main', status: 'RUNNING'
>           at org.fusesource.hawtdb.internal.page.HawtTransaction.getUpdates(HawtTransact ion.java:303)
>           at org.fusesource.hawtdb.internal.page.HawtTransaction.access$000(HawtTransact ion.java:40)
>           at org.fusesource.hawtdb.internal.page.HawtTransaction$1.alloc(HawtTransaction .java:75)
>           at org.apache.camel.component.hawtdb.HawtDBFile.getRepositoryIndex(HawtDBFile. java:128)
>           at org.apache.camel.component.hawtdb.HawtDBAggregationRepository$5.execute(Haw tDBAggregationRepository.java:234)
>           at org.apache.camel.component.hawtdb.HawtDBAggregationRepository$5.execute(Haw tDBAggregationRepository.java:227)
>           at org.apache.camel.component.hawtdb.HawtDBFile.execute(HawtDBFile.java:106)
>           at org.apache.camel.component.hawtdb.HawtDBAggregationRepository.getKeys(HawtD BAggregationRepository.java:227)
>           at org.apache.camel.processor.aggregate.AggregateProcessor$AggregationInterval Task.run(AggregateProcessor.java:599)
>           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>           at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
>           at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
>           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access $101(ScheduledThreadPoolExecutor.java:98)
>           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPer iodic(ScheduledThreadPoolExecutor.java:181)
>           at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Sc heduledThreadPoolExecutor.java:205)
>           at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.j ava:886)

Hiram Chirino

unread,
Apr 22, 2010, 5:49:48 AM4/22/10
to haw...@googlegroups.com
On Thu, Apr 22, 2010 at 5:39 AM, davsclaus <claus...@gmail.com> wrote:
> Hi
>
> Are you saying that when I just want to iterate the index in a sort of
> read only mode, HawtDB may decide to create a new index.
> And because of that its an update, and since I just read the keys,
> then I do NOT do any actual writes?

No.. I'm saying your code is not just iterating. Peek at
at org.apache.camel.component.hawtdb.HawtDBFile.getRepositoryIndex(HawtDBFile.
java:128)
You'll see your code is doing an update.

>
> Why do HawtDB want to create a new index, when I am just reading from
> it?
>

You may then need to implement 'real' read only version of
of the org.apache.camel.component.hawtdb.HawtDBFile.getRepositoryIndex

davsclaus

unread,
Apr 22, 2010, 6:03:07 AM4/22/10
to hawtdb

Line 128 is as follows:
Integer location = indexes.get(name);

All it have done is to open the index.
Yes later in the code its creating if the index did not exist. But the
stracktrace shows that we havent got that far down in the code.


And the full source code for that method is

public Index<Buffer, Buffer> getRepositoryIndex(Transaction tx,
String name) {
Index<Buffer, Buffer> answer;

Index<String, Integer> indexes = ROOT_INDEXES_FACTORY.open(tx,
0);
Integer location = indexes.get(name);

if (location == null) {
// create it..
int page = tx.allocator().alloc(1);
Index<Buffer, Buffer> created = INDEX_FACTORY.create(tx,
page);

// add it to indexes so we can find it the next time
indexes.put(name, page);

if (LOG.isDebugEnabled()) {
LOG.debug("Created new repository index with name " +
name + " at location " + page);
}

answer = created;
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Repository index with name " + name + " at
location " + location);
}
answer = INDEX_FACTORY.open(tx, location);
}

if (LOG.isTraceEnabled()) {
LOG.trace("Repository index with name " + name + " -> " +
answer);
}
return answer;
}



On Apr 22, 11:49 am, Hiram Chirino <chir...@gmail.com> wrote:

Hiram Chirino

unread,
Apr 22, 2010, 6:13:53 AM4/22/10
to haw...@googlegroups.com
line numbers may be off a little.. but notice in the stack trace, the
getRepositoryIndex() method is calling the alloc() method.

davsclaus

unread,
Apr 22, 2010, 6:31:42 AM4/22/10
to hawtdb
Hi

Thanks a lot for the help.

I will now ensure for my read only operations the index exists before
I try to read from the index.
Well in fact I just break out because index doesnt exist.

Then only the write operations will create the index if missing.
> ...
>
> read more »

davsclaus

unread,
Apr 22, 2010, 6:34:53 AM4/22/10
to hawtdb
Hi

I can get this exception in another area.

This time is actually whitin a write operation as we want to remove an
index from the repo.


2010-04-22 12:30:46,008 [pool-2-thread-1] WARN
HawtDBFile - Error executing work Removing key [E]
will do rollback.
org.fusesource.hawtdb.api.OptimisticUpdateException
at org.fusesource.hawtdb.internal.page.Commit.commitCheck(Commit.java:
77)
at
org.fusesource.hawtdb.internal.page.SnapshotHead.commitCheck(SnapshotHead.java:
148)
at
org.fusesource.hawtdb.internal.page.HawtTxPageFile.commit(HawtTxPageFile.java:
323)
at
org.fusesource.hawtdb.internal.page.HawtTransaction.commit(HawtTransaction.java:
250)
at
org.apache.camel.component.hawtdb.HawtDBFile.execute(HawtDBFile.java:
111)
at
org.apache.camel.component.hawtdb.HawtDBAggregationRepository.remove(HawtDBAggregationRepository.java:
179)
at
org.apache.camel.processor.aggregate.AggregateProcessor.onCompletion(AggregateProcessor.java:
338)
at
org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:
250)
at
org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:
180)
at
org.apache.camel.processor.DelegateProcessor.processNext(DelegateProcessor.java:
53)
at
org.apache.camel.processor.DelegateProcessor.proceed(DelegateProcessor.java:
82)
at
org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:
97)
at
org.apache.camel.processor.RedeliveryErrorHandler.processExchange(RedeliveryErrorHandler.java:
177)
at
org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:
143)
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:
88)
at
org.apache.camel.processor.DefaultErrorHandler.process(DefaultErrorHandler.java:
49)
at
org.apache.camel.processor.DefaultChannel.process(DefaultChannel.java:
228)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:75)
at
org.apache.camel.processor.UnitOfWorkProcessor.processNext(UnitOfWorkProcessor.java:
66)
at
org.apache.camel.processor.DelegateProcessor.process(DelegateProcessor.java:
48)
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:
67)
at
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:
44)



On Apr 22, 12:13 pm, Hiram Chirino <chir...@gmail.com> wrote:
> ...
>
> read more »
Reply all
Reply to author
Forward
0 new messages