ConcurrentModificationException when individually inserting entries into Hazelcast Lucene Directory

418 views
Skip to first unread message

Kevin Jordan

unread,
Jan 28, 2011, 10:47:17 PM1/28/11
to Hazelcast
I'm trying to use the Hazelcast Directory on here with Hibernate
Search and in an attempt not to have to reindex anytime I need to
bring up the entire cluster, I'm copying it to disk occasionally and
then just inserting modified data when bringing it back up.
Unfortunately I get the following exception:

2011-01-28 21:20:49,349
[org.hibernate.search.exception.impl.LogErrorHandler] ERROR: Exception
occurred com.hazelcast.lucene.HazelcastDirectoryException: Index
[myIndex] file [segments_2]: Failed to flush buckets

com.hazelcast.lucene.HazelcastDirectoryException: Index [myIndex] file
[segments_2]: Failed to flush buckets
at
com.hazelcast.lucene.FlushOnCloseHazelcastIndexOutput.close(FlushOnCloseHazelcastIndexOutput.java:
115)
at
org.apache.lucene.store.ChecksumIndexOutput.close(ChecksumIndexOutput.java:
58)
at
org.apache.lucene.index.SegmentInfos.finishCommit(SegmentInfos.java:
809)
at
org.apache.lucene.index.IndexWriter.finishCommit(IndexWriter.java:
3549)
at org.apache.lucene.index.IndexWriter.commit(IndexWriter.java:
3470)
at
org.apache.lucene.index.IndexWriter.closeInternal(IndexWriter.java:
1736)
at org.apache.lucene.index.IndexWriter.close(IndexWriter.java:
1680)
at org.apache.lucene.index.IndexWriter.close(IndexWriter.java:
1644)
at
org.hibernate.search.backend.Workspace.closeIndexWriter(Workspace.java:
211)
at
org.hibernate.search.backend.impl.lucene.PerDPQueueProcessor.run(PerDPQueueProcessor.java:
113)
at java.util.concurrent.Executors
$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask
$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:885)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:619)
Caused by: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at com.hazelcast.impl.FactoryImpl$MProxyImpl
$DynamicInvoker.invoke(FactoryImpl.java:1900)
at $Proxy662.putMulti(Unknown Source)
at com.hazelcast.impl.FactoryImpl
$MProxyImpl.putMulti(FactoryImpl.java:2233)
at com.hazelcast.impl.FactoryImpl$MultiMapProxy
$MultiMapBase.put(FactoryImpl.java:1778)
at com.hazelcast.impl.FactoryImpl
$MultiMapProxy.put(FactoryImpl.java:1628)
at
com.hazelcast.lucene.FlushOnCloseHazelcastIndexOutput.close(FlushOnCloseHazelcastIndexOutput.java:
111)
... 15 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor326.invoke(Unknown
Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:
25)
at java.lang.reflect.Method.invoke(Method.java:597)
at com.hazelcast.impl.FactoryImpl$MProxyImpl
$DynamicInvoker.invoke(FactoryImpl.java:1893)
... 20 more
Caused by: java.util.ConcurrentModificationException: Another thread
holds a lock for the key :
com.hazelcast.lucene.data.FileListKey@118268a
at com.hazelcast.impl.BaseManager.throwCME(BaseManager.java:
1281)
at com.hazelcast.impl.ConcurrentMapManager
$MPutMulti.put(ConcurrentMapManager.java:743)
at com.hazelcast.impl.FactoryImpl$MProxyImpl
$MProxyReal.putMulti(FactoryImpl.java:2316)
... 24 more


Code which is done through a Spring Bean @PostConstruct is as follows
(with some of my internal class names changed, but otherwise a direct
copy):

import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.Set;

import javax.annotation.PostConstruct;
import javax.persistence.EntityManagerFactory;
import javax.persistence.PersistenceUnit;

import org.apache.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.hibernate.envers.AuditReader;
import org.hibernate.envers.AuditReaderFactory;
import org.hibernate.envers.RevisionType;
import org.hibernate.envers.query.AuditEntity;
import org.hibernate.search.MassIndexer;
import org.hibernate.search.jpa.FullTextEntityManager;
import org.hibernate.search.jpa.Search;
import org.springframework.transaction.annotation.Transactional;

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.Member;

public class LuceneReIndexImpl implements LuceneReIndex {

@PersistenceUnit
private EntityManagerFactory emf;
private final Logger logger =
Logger.getLogger(LuceneReIndexImpl.class);

@SuppressWarnings("unchecked")
@Override
@Transactional
@PostConstruct
public void reindex() throws Exception {
Set<Member> members = Hazelcast.getCluster().getMembers();
logger.info("We currently have: " + members.size());
FullTextEntityManager manager =
Search.getFullTextEntityManager(emf.createEntityManager());
AuditReader reader =
AuditReaderFactory.get(emf.createEntityManager());
if (members.size() == 1) {
logger.info("Starting index at: " + new Date());
Class<?>[] clazzes = { MyClass.class };
for (Class<?> clazz : clazzes) {
File existingIndex = new File("/tmp/" + clazz.getSimpleName() +
".idx");
if (existingIndex.exists() && existingIndex.isDirectory()) {
Directory hazelcastDir =
manager.getSearchFactory().getDirectoryProviders(clazz)
[0].getDirectory();
Directory.copy(FSDirectory.open(existingIndex), hazelcastDir,
true);
hazelcastDir.close();
Date lastModified = new Date(existingIndex.lastModified());
logger.info("Finding entities of type: " + clazz.getName() + "
modified after: " + lastModified);
List<Object[]> results =
reader.createQuery().forRevisionsOfEntity(clazz, false,
true).add(AuditEntity.revisionProperty("timestamp").gt(existingIndex.lastModified())).getResultList();
logger.info("Got " + results.size() + " revisions of class " +
clazz.getName() + " to reindex");
for (Object[] result : results) {
Object entity = result[0];
RevisionType revType = (RevisionType) result[2];
manager.getTransaction().begin();
try {
switch (revType) {
case ADD:
case MOD:
MyInterface pojo = manager.find((Class<? extends MyInterface>)
clazz, ((MyInterface) entity).getId());
if (pojo != null) { // Must have been
// deleted, ignore
logger.info("Indexing " + pojo.getId() + " of type " +
clazz.getName());
manager.index(pojo);
}
break;
case DEL:
logger.info("Purging " + ((MyInterface) entity).getId() + " of
type " + clazz.getName());
manager.purge((Class<? extends MyInterface>) clazz,
((MyInterface) entity).getId());
break;
}
if (manager.getTransaction().isActive()) {
logger.info("Committing transaction");
manager.getTransaction().commit();
}
} catch (Throwable e) {
logger.error("ERROR", e);
if (manager.getTransaction().isActive()) {
manager.getTransaction().rollback();
}
throw new Exception(e);
}
}

Directory.copy(manager.getSearchFactory().getDirectoryProviders(clazz)
[0].getDirectory(), FSDirectory.open(new File("/tmp/" +
clazz.getSimpleName() + ".idx")), false);

} else {
MassIndexer indexer = manager.createIndexer(clazz);
logger.info("Got MassIndexer of type: " +
indexer.getClass().getName());
indexer.startAndWait();

Directory.copy(manager.getSearchFactory().getDirectoryProviders(clazz)
[0].getDirectory(), FSDirectory.open(new File("/tmp/" +
clazz.getSimpleName() + ".idx")), false);
}
}
logger.info("Done reindexing");
} else {
logger.info("Have more than one member, not reindexing");
}
}
}


I've tried moving things around quite a bit, but I get this exception
no matter where it is.

Kevin Jordan

unread,
Jan 28, 2011, 10:49:09 PM1/28/11
to Hazelcast
I get no problems when in the case where it uses the
MassIndexer indexer = manager.createIndexer(clazz);
indexer.startAndWait();
but that's very slow and reindexes all the entries in my database.

Talip Ozturk

unread,
Jan 29, 2011, 5:24:08 AM1/29/11
to haze...@googlegroups.com
Default transaction timeout is 8 seconds and it is not being enough
for this operation to acquire the lock. So it is throwing
ConcurrentModificationException.
You should change
com.hazelcast.impl.Constants.Timeouts.DEFAULT_TXN_TIMEOUT and rebuild
Hazelcast as this variable is not configurable.

That also means.. in the future:
1. we have to make this variable configurable
2. we should take it further and make the Transaction timeout
configurable. Transaction.setTimeout etc...


http://twitter.com/oztalip

> --
> You received this message because you are subscribed to the Google Groups "Hazelcast" group.
> To post to this group, send email to haze...@googlegroups.com.
> To unsubscribe from this group, send email to hazelcast+...@googlegroups.com.
> For more options, visit this group at http://groups.google.com/group/hazelcast?hl=en.
>
>

Kevin Jordan

unread,
Jan 30, 2011, 4:36:53 PM1/30/11
to Hazelcast
That worked, thanks!
> ...
>
> read more »
Reply all
Reply to author
Forward
0 new messages