Simplify InternalEngine#innerIndex (#22721)

Today `InternalEngine#innerIndex` is a pretty big method (> 150 SLoC). This
commit merged `#index` and `#innerIndex` and splits it up into smaller contained
methods.
This commit is contained in:
Simon Willnauer 2017-01-21 08:51:35 +01:00 committed by GitHub
parent 8028578305
commit 3ad6d6ebcc
1 changed files with 149 additions and 156 deletions

View File

@ -496,30 +496,6 @@ public class InternalEngine extends Engine {
return currentVersion;
}
@Override
public IndexResult index(Index index) throws IOException {
IndexResult result;
try (ReleasableLock ignored = readLock.acquire()) {
ensureOpen();
if (index.origin().isRecovery()) {
// Don't throttle recovery operations
result = innerIndex(index);
} else {
try (Releasable r = throttle.acquireThrottle()) {
result = innerIndex(index);
}
}
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
return result;
}
private boolean canOptimizeAddDocument(Index index) {
if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
@ -559,148 +535,165 @@ public class InternalEngine extends Engine {
return true;
}
private IndexResult innerIndex(Index index) throws IOException {
// TODO we gotta split this method up it's too big!
assert assertSequenceNumber(index.origin(), index.seqNo());
final Translog.Location location;
long seqNo = index.seqNo();
try (Releasable ignored = acquireLock(index.uid())) {
lastWriteNanos = index.startTime();
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
* and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board.
* Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
* to detect if it has potentially been added before. We use the documents timestamp for this since it's something
* that:
* - doesn't change per document
* - is preserved in the transaction log
* - and is assigned before we start to index / replicate
* NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
* case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
* for instance:
* - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
* - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
*
* while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
* - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
*
* if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
* documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
* While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
*
* if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
* updateDocument.
*/
long currentVersion;
final boolean deleted;
// if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
// lucene index without checking the version map but we still do the version check
final boolean forceUpdateDocument;
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
if (canOptimizeAddDocument) {
long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
if (index.isRetry()) {
forceUpdateDocument = true;
do {
deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) {
break;
}
} while (maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
index.getAutoGeneratedIdTimestamp()) == false);
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
} else {
// in this case we force
forceUpdateDocument = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp();
}
currentVersion = Versions.NOT_FOUND;
deleted = true;
} else {
// update the document
forceUpdateDocument = false; // we don't force it - it depends on the version
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
assert incrementVersionLookup();
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
deleted = currentVersion == Versions.NOT_FOUND;
} else {
currentVersion = checkDeletedAndGCed(versionValue);
deleted = versionValue.delete();
}
}
final long expectedVersion = index.version();
Optional<IndexResult> resultOnVersionConflict;
try {
final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false))
: Optional.empty();
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo()));
}
final IndexResult indexResult;
if (resultOnVersionConflict.isPresent()) {
indexResult = resultOnVersionConflict.get();
} else {
// no version conflict
if (index.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService().generateSeqNo();
}
/*
* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
@Override
public IndexResult index(Index index) throws IOException {
final boolean doThrottle = index.origin().isRecovery() == false;
try (ReleasableLock releasableLock = readLock.acquire()) {
ensureOpen();
assert assertSequenceNumber(index.origin(), index.seqNo());
final Translog.Location location;
long seqNo = index.seqNo();
try (Releasable ignored = acquireLock(index.uid());
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
lastWriteNanos = index.startTime();
/* if we have an autoGeneratedID that comes into the engine we can potentially optimize
* and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board.
* Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
* to detect if it has potentially been added before. We use the documents timestamp for this since it's something
* that:
* - doesn't change per document
* - is preserved in the transaction log
* - and is assigned before we start to index / replicate
* NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
* case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
* for instance:
* - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
* - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
*
* while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
* - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
*
* if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
* documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
* While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
*
* if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
* updateDocument.
*/
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);
IndexResult innerIndexResult;
try {
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
index(index.docs(), indexWriter);
long currentVersion;
final boolean deleted;
// if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
// lucene index without checking the version map but we still do the version check
final boolean forceUpdateDocument;
final boolean canOptimizeAddDocument = canOptimizeAddDocument(index);
if (canOptimizeAddDocument) {
forceUpdateDocument = isForceUpdateDocument(index);
currentVersion = Versions.NOT_FOUND;
deleted = true;
} else {
// update the document
forceUpdateDocument = false; // we don't force it - it depends on the version
final VersionValue versionValue = versionMap.getUnderLock(index.uid());
assert incrementVersionLookup();
if (versionValue == null) {
currentVersion = loadCurrentVersionFromIndex(index.uid());
deleted = currentVersion == Versions.NOT_FOUND;
} else {
update(index.uid(), index.docs(), indexWriter);
}
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
innerIndexResult = new IndexResult(updatedVersion, seqNo, deleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
/* There is no tragic event recorded so this must be a document failure.
*
* The handling inside IW doesn't guarantee that an tragic / aborting exception
* will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
* only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
* we can potentially handle the exception before the engine is failed.
* Bottom line is that we can only rely on the fact that if it's a document failure then
* `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
* non-document failure
*/
innerIndexResult = new IndexResult(ex, currentVersion, index.seqNo());
} else {
throw ex;
currentVersion = checkDeletedAndGCed(versionValue);
deleted = versionValue.delete();
}
}
assert innerIndexResult != null;
indexResult = innerIndexResult;
final long expectedVersion = index.version();
Optional<IndexResult> resultOnVersionConflict;
try {
final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted);
resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false))
: Optional.empty();
} catch (IllegalArgumentException | VersionConflictEngineException ex) {
resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo()));
}
final IndexResult indexResult;
if (resultOnVersionConflict.isPresent()) {
indexResult = resultOnVersionConflict.get();
} else {
// no version conflict
if (index.origin() == Operation.Origin.PRIMARY) {
seqNo = seqNoService().generateSeqNo();
}
indexResult = indexIntoLucene(index, seqNo, currentVersion, deleted, forceUpdateDocument, canOptimizeAddDocument, expectedVersion);
}
if (indexResult.hasFailure() == false) {
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Index(index, indexResult))
: null;
indexResult.setTranslogLocation(location);
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
} finally {
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(seqNo);
}
}
if (!indexResult.hasFailure()) {
location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY
? translog.add(new Translog.Index(index, indexResult))
: null;
indexResult.setTranslogLocation(location);
} catch (RuntimeException | IOException e) {
try {
maybeFailEngine("index", e);
} catch (Exception inner) {
e.addSuppressed(inner);
}
indexResult.setTook(System.nanoTime() - index.startTime());
indexResult.freeze();
return indexResult;
} finally {
if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) {
seqNoService().markSeqNoAsCompleted(seqNo);
throw e;
}
}
private IndexResult indexIntoLucene(Index index, long seqNo, long currentVersion, boolean deleted, boolean forceUpdateDocument, boolean canOptimizeAddDocument, long expectedVersion) throws IOException {
/* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
*/
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);
try {
if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
index(index.docs(), indexWriter);
} else {
update(index.uid(), index.docs(), indexWriter);
}
versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion));
return new IndexResult(updatedVersion, seqNo, deleted);
} catch (Exception ex) {
if (indexWriter.getTragicException() == null) {
/* There is no tragic event recorded so this must be a document failure.
*
* The handling inside IW doesn't guarantee that an tragic / aborting exception
* will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW
* only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that
* we can potentially handle the exception before the engine is failed.
* Bottom line is that we can only rely on the fact that if it's a document failure then
* `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather
* non-document failure
*/
return new IndexResult(ex, currentVersion, index.seqNo());
} else {
throw ex;
}
}
}
private boolean isForceUpdateDocument(Index index) {
boolean forceUpdateDocument;
long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
if (index.isRetry()) {
forceUpdateDocument = true;
do {
deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) {
break;
}
} while (maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
index.getAutoGeneratedIdTimestamp()) == false);
assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
} else {
// in this case we force
forceUpdateDocument = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp();
}
return forceUpdateDocument;
}
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
if (docs.size() > 1) {
indexWriter.addDocuments(docs);