We are using jansugraph with Cassandra + ES combination , we are doing data ingestion with OLAP mode , since we are submitting batch requests , and we are using commit method to commit the transaction and their by to start actual data ingestion , here problem is first it will do cassandra commit and if their is any issue during commit janusgraph core classes are doing rolback and in same janus graph core classes , their is no rollback performed when their is issues during ES commit . below the janusgraph class where we identified the issue
In janusgraph-core .jar , internally this is the method that gets called in StandardJanusGraph.class
If you see the below code highlighted , even if their are errors during ES commit , nothing is re-thrown , those errors stored in MAP and after that those are printed as log statements . this leads data inconsistency , because in same code below where cassandra commit when their are any exceptions transaction is getting rollback , so that the changes are being rollback , but for ES this is not happening .
When we see the latest code for the StandardJanusGraph.class , their also no exception re-thrown , and it mentioned in the code this needs to be cleaned , does that means this is something not implemented completely ?
is this issue with janusgraph core jars ? is there any fix happened already ?
public void commit(Collection<InternalRelation> addedRelations, Collection<InternalRelation> deletedRelations, StandardJanusGraphTx tx) {
if (!addedRelations.isEmpty() || !deletedRelations.isEmpty()) {
log.debug("Saving transaction. Added {}, removed {}", addedRelations.size(), deletedRelations.size());
if (!tx.getConfiguration().hasCommitTime()) {
tx.getConfiguration().setCommitTime(this.times.getTime());
}
Instant txTimestamp = tx.getConfiguration().getCommitTime();
long transactionId = this.txCounter.incrementAndGet();
if (!tx.getConfiguration().hasAssignIDsImmediately()) {
this.idAssigner.assignIDs(addedRelations);
}
BackendTransaction mutator = tx.getTxHandle();
boolean acquireLocks = tx.getConfiguration().hasAcquireLocks();
boolean hasTxIsolation = this.backend.getStoreFeatures().hasTxIsolation();
boolean logTransaction = this.config.hasLogTransactions() && !tx.getConfiguration().hasEnabledBatchLoading();
KCVSLog txLog = logTransaction ? this.backend.getSystemTxLog() : null;
TransactionLogHeader txLogHeader = new TransactionLogHeader(transactionId, txTimestamp, this.times);
try {
if (logTransaction) {
Preconditions.checkNotNull(txLog, "Transaction log is null");
txLog.add(txLogHeader.serializeModifications(this.serializer, LogTxStatus.PRECOMMIT, tx, addedRelations, deletedRelations), txLogHeader.getLogKey());
}
boolean hasSchemaElements = !Iterables.isEmpty(Iterables.filter(deletedRelations, SCHEMA_FILTER)) || !Iterables.isEmpty(Iterables.filter(addedRelations, SCHEMA_FILTER));
Preconditions.checkArgument(!hasSchemaElements || !tx.getConfiguration().hasEnabledBatchLoading() && acquireLocks, "Attempting to create schema elements in inconsistent state");
StandardJanusGraph.ModificationSummary commitSummary;
if (hasSchemaElements && !hasTxIsolation) {
BackendTransaction schemaMutator = this.openBackendTransaction(tx);
try {
commitSummary = this.prepareCommit(addedRelations, deletedRelations, SCHEMA_FILTER, schemaMutator, tx, acquireLocks);
assert commitSummary.hasModifications && !commitSummary.has2iModifications;
} catch (Throwable var42) {
schemaMutator.rollback();
throw var42;
}
try {
schemaMutator.commit();
} catch (Throwable var40) {
log.error("Could not commit transaction [" + transactionId + "] due to storage exception in system-commit", var40);
throw var40;
}
}
commitSummary = this.prepareCommit(addedRelations, deletedRelations, hasTxIsolation ? NO_FILTER : NO_SCHEMA_FILTER, mutator, tx, acquireLocks);
if (commitSummary.hasModifications) {
String logTxIdentifier = tx.getConfiguration().getLogIdentifier();
boolean hasSecondaryPersistence = logTxIdentifier != null || commitSummary.has2iModifications;
if (logTransaction) {
txLog.add(txLogHeader.serializePrimary(this.serializer, hasSecondaryPersistence ? LogTxStatus.PRIMARY_SUCCESS : LogTxStatus.COMPLETE_SUCCESS), txLogHeader.getLogKey(), mutator.getTxLogPersistor());
}
try {
mutator.commitStorage();
} catch (Throwable var39) {
log.error("Could not commit transaction [" + transactionId + "] due to storage exception in commit", var39);
throw var39;
}
if (hasSecondaryPersistence) {
LogTxStatus status = LogTxStatus.SECONDARY_SUCCESS;
Map<String, Throwable> indexFailures = ImmutableMap.of();
boolean userlogSuccess = true;
try {
indexFailures = mutator.commitIndexes();
if (!((Map)indexFailures).isEmpty()) {
status = LogTxStatus.SECONDARY_FAILURE;
Iterator var20 = ((Map)indexFailures).entrySet().iterator();
while(var20.hasNext()) {
java.util.Map.Entry<String, Throwable> entry = (java.util.Map.Entry)var20.next();
log.error("Error while committing index mutations for transaction [" + transactionId + "] on index: " + (String)entry.getKey(), (Throwable)entry.getValue());
}
}
if (logTxIdentifier != null) {
try {
userlogSuccess = false;
Log userLog = this.backend.getUserLog(logTxIdentifier);
Future<Message> env = userLog.add(txLogHeader.serializeModifications(this.serializer, LogTxStatus.USER_LOG, tx, addedRelations, deletedRelations));
if (env.isDone()) {
try {
env.get();
} catch (ExecutionException var37) {
throw var37.getCause();
}
}
userlogSuccess = true;
} catch (Throwable var38) {
status = LogTxStatus.SECONDARY_FAILURE;
log.error("Could not user-log committed transaction [" + transactionId + "] to " + logTxIdentifier, var38);
}
}
} finally {
if (logTransaction) {
try {
txLog.add(txLogHeader.serializeSecondary(this.serializer, status, (Map)indexFailures, userlogSuccess), txLogHeader.getLogKey());
} catch (Throwable var36) {
log.error("Could not tx-log secondary persistence status on transaction [" + transactionId + "]", var36);
}
}
}
} else {
mutator.commitIndexes();
}
} else {
mutator.commit();
}
} catch (Throwable var43) {
log.error("Could not commit transaction [" + transactionId + "] due to exception", var43);
try {
mutator.rollback();
} catch (Throwable var35) {
log.error("Could not roll-back transaction [" + transactionId + "] after failure due to exception", var35);
}
if (var43 instanceof RuntimeException) {
throw (RuntimeException)var43;
} else {
throw new JanusGraphException("Unexpected exception", var43);
}
}
}
}