You could use Concatenate to produce final index for all the portions. In the following code sample I merge the batches produced by our system (these are different from batches produced by mg4j itself).
HTH
...
public boolean mergeIndex(final IndexingContext ic) throws ConfigurationException, ClassNotFoundException,
SecurityException, InstantiationException, IllegalAccessException, InvocationTargetException,
NoSuchMethodException, IOException, URISyntaxException {
boolean result = false;
final Path previousMergeFolder = ic.getLastValidMergedFolder();
final LongList batchesToMerge = ic.getBatchesToMerge();
if (batchesToMerge == null || batchesToMerge.isEmpty()) {
LOG.info("No batches found to merge");
return true;
}
final Path mg4jIndexDir = ic.getNextMergedFolder();
FileUtils7.prepareCleanDirectory(mg4jIndexDir);
LOG.debug("Merge semantic index of batch[es]: {} into {}", batchesToMerge, mg4jIndexDir);
final Path outputBasename = mg4jIndexDir.resolve("output-");
final int numberOfPathsToMerge = batchesToMerge.size() + (previousMergeFolder != null ? 1 : 0);
int i = 0;
final Path[] pathsToMerge = new Path[numberOfPathsToMerge];
if (previousMergeFolder != null) {
pathsToMerge[i++] = previousMergeFolder;
}
for (final long batchId : batchesToMerge) {
pathsToMerge[i++] = fs.getBatchDirPath(batchId);
}
LOG.info("Concatenating old indexes and newly created for batches {}", batchesToMerge);
final Mg4jFields[] fields = Mg4jFields.values();
try {
for (final Mg4jFields field : fields) {
final String name = field.getIndexName();
LOG.info("Merging field {}", name);
final String[] inputFieldNameBases = new String[numberOfPathsToMerge];
i = 0;
for (final Path path : pathsToMerge) {
inputFieldNameBases[i++] = path.resolve("output-" + name).toString();
}
final String outputFieldNameBase = outputBasename + name;
final int combineBufferSize = Combine.DEFAULT_BUFFER_SIZE;
// TODO DVP restrict input paths array size (process by portions) to avoid reaching limit of open files in
// process (this didn't happen so far)
if (field.getFieldType() == DocumentFactory.FieldType.TEXT) {
new Concatenate(
outputFieldNameBase,
inputFieldNameBases,
NOT_METADATA_ONLY,
combineBufferSize,
field.getWriterFlags(),
IndexType.QUASI_SUCCINCT,
SKIPS,
QUANTUM,
HEIGHT,
SKIP_BUFFER_SIZE,
LOG_INTERVAL).run();
}
else {
new Concatenate(
outputFieldNameBase,
inputFieldNameBases,
NOT_METADATA_ONLY,
combineBufferSize,
field.getWriterFlags(),
IndexType.INTERLEAVED,
SKIPS,
QUANTUM,
HEIGHT,
SKIP_BUFFER_SIZE,
LOG_INTERVAL).run();
}
final String termsFileName = outputFieldNameBase + DiskBasedIndex.TERMS_EXTENSION;
BinIO.storeObject(
StringMaps.synchronize(TERM_MAP_CLASS.getConstructor(Iterable.class).newInstance(
new FileLinesCollection(termsFileName, "UTF-8") //
) //
), //
outputFieldNameBase + DiskBasedIndex.TERMMAP_EXTENSION //
);
LOG.debug("Created term maps (class: {}) for field {}", TERM_MAP_CLASS.getSimpleName(),
field.getIndexName());
}
LOG.info("All the index fields are successfully merged...");
LOG.info("Saving resulting sentence ids");
Mg4jFSUtils.append(mg4jIndexDir.resolve(Mg4jFSConfig.SENTENCE_IDS_BIN),
Mg4jFSUtils.resolve(pathsToMerge, Mg4jFSConfig.SENTENCE_IDS_BIN));
result = true;
LOG.info("Index is successfully builded");
}
finally {
if (!result) {
FileUtils7.forceDelete(mg4jIndexDir);
}
}
return result;
}
...