I'm trying to use the Hazelcast Directory on here with Hibernate
Search and in an attempt not to have to reindex anytime I need to
bring up the entire cluster, I'm copying it to disk occasionally and
then just inserting modified data when bringing it back up.
Unfortunately I get the following exception:
2011-01-28 21:20:49,349
[org.hibernate.search.exception.impl.LogErrorHandler] ERROR: Exception
occurred com.hazelcast.lucene.HazelcastDirectoryException: Index
[myIndex] file [segments_2]: Failed to flush buckets
com.hazelcast.lucene.HazelcastDirectoryException: Index [myIndex] file
[segments_2]: Failed to flush buckets
at
com.hazelcast.lucene.FlushOnCloseHazelcastIndexOutput.close(FlushOnCloseHazelcastIndexOutput.java:
115)
at
org.apache.lucene.store.ChecksumIndexOutput.close(ChecksumIndexOutput.java:
58)
at
org.apache.lucene.index.SegmentInfos.finishCommit(SegmentInfos.java:
809)
at
org.apache.lucene.index.IndexWriter.finishCommit(IndexWriter.java:
3549)
at org.apache.lucene.index.IndexWriter.commit(IndexWriter.java:
3470)
at
org.apache.lucene.index.IndexWriter.closeInternal(IndexWriter.java:
1736)
at org.apache.lucene.index.IndexWriter.close(IndexWriter.java:
1680)
at org.apache.lucene.index.IndexWriter.close(IndexWriter.java:
1644)
at
org.hibernate.search.backend.Workspace.closeIndexWriter(Workspace.java:
211)
at
org.hibernate.search.backend.impl.lucene.PerDPQueueProcessor.run(PerDPQueueProcessor.java:
113)
at java.util.concurrent.Executors
$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask
$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor
$Worker.runTask(ThreadPoolExecutor.java:885)
at java.util.concurrent.ThreadPoolExecutor
$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:619)
Caused by: java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException
at com.hazelcast.impl.FactoryImpl$MProxyImpl
$DynamicInvoker.invoke(FactoryImpl.java:1900)
at $Proxy662.putMulti(Unknown Source)
at com.hazelcast.impl.FactoryImpl
$MProxyImpl.putMulti(FactoryImpl.java:2233)
at com.hazelcast.impl.FactoryImpl$MultiMapProxy
$MultiMapBase.put(FactoryImpl.java:1778)
at com.hazelcast.impl.FactoryImpl
$MultiMapProxy.put(FactoryImpl.java:1628)
at
com.hazelcast.lucene.FlushOnCloseHazelcastIndexOutput.close(FlushOnCloseHazelcastIndexOutput.java:
111)
... 15 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor326.invoke(Unknown
Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:
25)
at java.lang.reflect.Method.invoke(Method.java:597)
at com.hazelcast.impl.FactoryImpl$MProxyImpl
$DynamicInvoker.invoke(FactoryImpl.java:1893)
... 20 more
Caused by: java.util.ConcurrentModificationException: Another thread
holds a lock for the key :
com.hazelcast.lucene.data.FileListKey@118268a
at com.hazelcast.impl.BaseManager.throwCME(BaseManager.java:
1281)
at com.hazelcast.impl.ConcurrentMapManager
$MPutMulti.put(ConcurrentMapManager.java:743)
at com.hazelcast.impl.FactoryImpl$MProxyImpl
$MProxyReal.putMulti(FactoryImpl.java:2316)
... 24 more
Code which is done through a Spring Bean @PostConstruct is as follows
(with some of my internal class names changed, but otherwise a direct
copy):
import java.io.File;
import java.util.Date;
import java.util.List;
import java.util.Set;
import javax.annotation.PostConstruct;
import javax.persistence.EntityManagerFactory;
import javax.persistence.PersistenceUnit;
import org.apache.log4j.Logger;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.hibernate.envers.AuditReader;
import org.hibernate.envers.AuditReaderFactory;
import org.hibernate.envers.RevisionType;
import org.hibernate.envers.query.AuditEntity;
import org.hibernate.search.MassIndexer;
import org.hibernate.search.jpa.FullTextEntityManager;
import org.hibernate.search.jpa.Search;
import org.springframework.transaction.annotation.Transactional;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.Member;
public class LuceneReIndexImpl implements LuceneReIndex {
@PersistenceUnit
private EntityManagerFactory emf;
private final Logger logger =
Logger.getLogger(LuceneReIndexImpl.class);
@SuppressWarnings("unchecked")
@Override
@Transactional
@PostConstruct
public void reindex() throws Exception {
Set<Member> members = Hazelcast.getCluster().getMembers();
logger.info("We currently have: " + members.size());
FullTextEntityManager manager =
Search.getFullTextEntityManager(emf.createEntityManager());
AuditReader reader =
AuditReaderFactory.get(emf.createEntityManager());
if (members.size() == 1) {
logger.info("Starting index at: " + new Date());
Class<?>[] clazzes = { MyClass.class };
for (Class<?> clazz : clazzes) {
File existingIndex = new File("/tmp/" + clazz.getSimpleName() +
".idx");
if (existingIndex.exists() && existingIndex.isDirectory()) {
Directory hazelcastDir =
manager.getSearchFactory().getDirectoryProviders(clazz)
[0].getDirectory();
Directory.copy(FSDirectory.open(existingIndex), hazelcastDir,
true);
hazelcastDir.close();
Date lastModified = new Date(existingIndex.lastModified());
logger.info("Finding entities of type: " + clazz.getName() + "
modified after: " + lastModified);
List<Object[]> results =
reader.createQuery().forRevisionsOfEntity(clazz, false,
true).add(AuditEntity.revisionProperty("timestamp").gt(existingIndex.lastModified())).getResultList();
logger.info("Got " + results.size() + " revisions of class " +
clazz.getName() + " to reindex");
for (Object[] result : results) {
Object entity = result[0];
RevisionType revType = (RevisionType) result[2];
manager.getTransaction().begin();
try {
switch (revType) {
case ADD:
case MOD:
MyInterface pojo = manager.find((Class<? extends MyInterface>)
clazz, ((MyInterface) entity).getId());
if (pojo != null) { // Must have been
// deleted, ignore
logger.info("Indexing " + pojo.getId() + " of type " +
clazz.getName());
manager.index(pojo);
}
break;
case DEL:
logger.info("Purging " + ((MyInterface) entity).getId() + " of
type " + clazz.getName());
manager.purge((Class<? extends MyInterface>) clazz,
((MyInterface) entity).getId());
break;
}
if (manager.getTransaction().isActive()) {
logger.info("Committing transaction");
manager.getTransaction().commit();
}
} catch (Throwable e) {
logger.error("ERROR", e);
if (manager.getTransaction().isActive()) {
manager.getTransaction().rollback();
}
throw new Exception(e);
}
}
Directory.copy(manager.getSearchFactory().getDirectoryProviders(clazz)
[0].getDirectory(), FSDirectory.open(new File("/tmp/" +
clazz.getSimpleName() + ".idx")), false);
} else {
MassIndexer indexer = manager.createIndexer(clazz);
logger.info("Got MassIndexer of type: " +
indexer.getClass().getName());
indexer.startAndWait();
Directory.copy(manager.getSearchFactory().getDirectoryProviders(clazz)
[0].getDirectory(), FSDirectory.open(new File("/tmp/" +
clazz.getSimpleName() + ".idx")), false);
}
}
logger.info("Done reindexing");
} else {
logger.info("Have more than one member, not reindexing");
}
}
}
I've tried moving things around quite a bit, but I get this exception
no matter where it is.