From 3ad6d6ebcccd06969f7d1fc22d6ad45655f3a349 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Sat, 21 Jan 2017 08:51:35 +0100 Subject: [PATCH] Simplify InternalEngine#innerIndex (#22721) Today `InternalEngine#innerIndex` is a pretty big method (> 150 SLoC). This commit merged `#index` and `#innerIndex` and splits it up into smaller contained methods. --- .../index/engine/InternalEngine.java | 305 +++++++++--------- 1 file changed, 149 insertions(+), 156 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index fb041603a38..d0000da2d11 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -496,30 +496,6 @@ public class InternalEngine extends Engine { return currentVersion; } - @Override - public IndexResult index(Index index) throws IOException { - IndexResult result; - try (ReleasableLock ignored = readLock.acquire()) { - ensureOpen(); - if (index.origin().isRecovery()) { - // Don't throttle recovery operations - result = innerIndex(index); - } else { - try (Releasable r = throttle.acquireThrottle()) { - result = innerIndex(index); - } - } - } catch (RuntimeException | IOException e) { - try { - maybeFailEngine("index", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw e; - } - return result; - } - private boolean canOptimizeAddDocument(Index index) { if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) { assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: " @@ -559,148 +535,165 @@ public class InternalEngine extends Engine { return true; } - private IndexResult innerIndex(Index index) throws IOException { - // TODO we gotta split this method up it's too big! - assert assertSequenceNumber(index.origin(), index.seqNo()); - final Translog.Location location; - long seqNo = index.seqNo(); - try (Releasable ignored = acquireLock(index.uid())) { - lastWriteNanos = index.startTime(); - /* if we have an autoGeneratedID that comes into the engine we can potentially optimize - * and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board. - * Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added - * to detect if it has potentially been added before. We use the documents timestamp for this since it's something - * that: - * - doesn't change per document - * - is preserved in the transaction log - * - and is assigned before we start to index / replicate - * NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common - * case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship. - * for instance: - * - doc A has autoGeneratedIdTimestamp = 10, isRetry = false - * - doc B has autoGeneratedIdTimestamp = 9, isRetry = false - * - * while both docs are in in flight, we disconnect on one node, reconnect and send doc A again - * - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true - * - * if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent - * documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp. - * While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case. - * - * if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls - * updateDocument. - */ - long currentVersion; - final boolean deleted; - // if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the - // lucene index without checking the version map but we still do the version check - final boolean forceUpdateDocument; - final boolean canOptimizeAddDocument = canOptimizeAddDocument(index); - if (canOptimizeAddDocument) { - long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get(); - if (index.isRetry()) { - forceUpdateDocument = true; - do { - deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get(); - if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) { - break; - } - } while (maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp, - index.getAutoGeneratedIdTimestamp()) == false); - assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); - } else { - // in this case we force - forceUpdateDocument = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp(); - } - currentVersion = Versions.NOT_FOUND; - deleted = true; - } else { - // update the document - forceUpdateDocument = false; // we don't force it - it depends on the version - final VersionValue versionValue = versionMap.getUnderLock(index.uid()); - assert incrementVersionLookup(); - if (versionValue == null) { - currentVersion = loadCurrentVersionFromIndex(index.uid()); - deleted = currentVersion == Versions.NOT_FOUND; - } else { - currentVersion = checkDeletedAndGCed(versionValue); - deleted = versionValue.delete(); - } - } - final long expectedVersion = index.version(); - Optional resultOnVersionConflict; - try { - final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted); - resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false)) - : Optional.empty(); - } catch (IllegalArgumentException | VersionConflictEngineException ex) { - resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo())); - } - - final IndexResult indexResult; - if (resultOnVersionConflict.isPresent()) { - indexResult = resultOnVersionConflict.get(); - } else { - // no version conflict - if (index.origin() == Operation.Origin.PRIMARY) { - seqNo = seqNoService().generateSeqNo(); - } - - /* - * Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence - * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The - * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. + @Override + public IndexResult index(Index index) throws IOException { + final boolean doThrottle = index.origin().isRecovery() == false; + try (ReleasableLock releasableLock = readLock.acquire()) { + ensureOpen(); + assert assertSequenceNumber(index.origin(), index.seqNo()); + final Translog.Location location; + long seqNo = index.seqNo(); + try (Releasable ignored = acquireLock(index.uid()); + Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) { + lastWriteNanos = index.startTime(); + /* if we have an autoGeneratedID that comes into the engine we can potentially optimize + * and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board. + * Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added + * to detect if it has potentially been added before. We use the documents timestamp for this since it's something + * that: + * - doesn't change per document + * - is preserved in the transaction log + * - and is assigned before we start to index / replicate + * NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common + * case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship. + * for instance: + * - doc A has autoGeneratedIdTimestamp = 10, isRetry = false + * - doc B has autoGeneratedIdTimestamp = 9, isRetry = false + * + * while both docs are in in flight, we disconnect on one node, reconnect and send doc A again + * - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true + * + * if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent + * documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp. + * While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case. + * + * if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls + * updateDocument. */ - index.parsedDoc().updateSeqID(seqNo, index.primaryTerm()); - final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); - index.parsedDoc().version().setLongValue(updatedVersion); - IndexResult innerIndexResult; - try { - if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { - // document does not exists, we can optimize for create, but double check if assertions are running - assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); - index(index.docs(), indexWriter); + long currentVersion; + final boolean deleted; + // if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the + // lucene index without checking the version map but we still do the version check + final boolean forceUpdateDocument; + final boolean canOptimizeAddDocument = canOptimizeAddDocument(index); + if (canOptimizeAddDocument) { + forceUpdateDocument = isForceUpdateDocument(index); + currentVersion = Versions.NOT_FOUND; + deleted = true; + } else { + // update the document + forceUpdateDocument = false; // we don't force it - it depends on the version + final VersionValue versionValue = versionMap.getUnderLock(index.uid()); + assert incrementVersionLookup(); + if (versionValue == null) { + currentVersion = loadCurrentVersionFromIndex(index.uid()); + deleted = currentVersion == Versions.NOT_FOUND; } else { - update(index.uid(), index.docs(), indexWriter); - } - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); - innerIndexResult = new IndexResult(updatedVersion, seqNo, deleted); - } catch (Exception ex) { - if (indexWriter.getTragicException() == null) { - /* There is no tragic event recorded so this must be a document failure. - * - * The handling inside IW doesn't guarantee that an tragic / aborting exception - * will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW - * only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that - * we can potentially handle the exception before the engine is failed. - * Bottom line is that we can only rely on the fact that if it's a document failure then - * `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather - * non-document failure - */ - innerIndexResult = new IndexResult(ex, currentVersion, index.seqNo()); - } else { - throw ex; + currentVersion = checkDeletedAndGCed(versionValue); + deleted = versionValue.delete(); } } - assert innerIndexResult != null; - indexResult = innerIndexResult; + final long expectedVersion = index.version(); + Optional resultOnVersionConflict; + try { + final boolean isVersionConflict = checkVersionConflict(index, currentVersion, expectedVersion, deleted); + resultOnVersionConflict = isVersionConflict ? Optional.of(new IndexResult(currentVersion, index.seqNo(), false)) + : Optional.empty(); + } catch (IllegalArgumentException | VersionConflictEngineException ex) { + resultOnVersionConflict = Optional.of(new IndexResult(ex, currentVersion, index.seqNo())); + } + + final IndexResult indexResult; + if (resultOnVersionConflict.isPresent()) { + indexResult = resultOnVersionConflict.get(); + } else { + // no version conflict + if (index.origin() == Operation.Origin.PRIMARY) { + seqNo = seqNoService().generateSeqNo(); + } + indexResult = indexIntoLucene(index, seqNo, currentVersion, deleted, forceUpdateDocument, canOptimizeAddDocument, expectedVersion); + } + if (indexResult.hasFailure() == false) { + location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY + ? translog.add(new Translog.Index(index, indexResult)) + : null; + indexResult.setTranslogLocation(location); + } + indexResult.setTook(System.nanoTime() - index.startTime()); + indexResult.freeze(); + return indexResult; + } finally { + if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { + seqNoService().markSeqNoAsCompleted(seqNo); + } } - if (!indexResult.hasFailure()) { - location = index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY - ? translog.add(new Translog.Index(index, indexResult)) - : null; - indexResult.setTranslogLocation(location); + } catch (RuntimeException | IOException e) { + try { + maybeFailEngine("index", e); + } catch (Exception inner) { + e.addSuppressed(inner); } - indexResult.setTook(System.nanoTime() - index.startTime()); - indexResult.freeze(); - return indexResult; - } finally { - if (seqNo != SequenceNumbersService.UNASSIGNED_SEQ_NO) { - seqNoService().markSeqNoAsCompleted(seqNo); + throw e; + } + } + + private IndexResult indexIntoLucene(Index index, long seqNo, long currentVersion, boolean deleted, boolean forceUpdateDocument, boolean canOptimizeAddDocument, long expectedVersion) throws IOException { + /* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence + * number service if this is on the primary, or the existing document's sequence number if this is on the replica. The + * primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created. + */ + index.parsedDoc().updateSeqID(seqNo, index.primaryTerm()); + final long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); + index.parsedDoc().version().setLongValue(updatedVersion); + try { + if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { + // document does not exists, we can optimize for create, but double check if assertions are running + assert assertDocDoesNotExist(index, canOptimizeAddDocument == false); + index(index.docs(), indexWriter); + } else { + update(index.uid(), index.docs(), indexWriter); + } + versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion)); + return new IndexResult(updatedVersion, seqNo, deleted); + } catch (Exception ex) { + if (indexWriter.getTragicException() == null) { + /* There is no tragic event recorded so this must be a document failure. + * + * The handling inside IW doesn't guarantee that an tragic / aborting exception + * will be used as THE tragicEventException since if there are multiple exceptions causing an abort in IW + * only one wins. Yet, only the one that wins will also close the IW and in turn fail the engine such that + * we can potentially handle the exception before the engine is failed. + * Bottom line is that we can only rely on the fact that if it's a document failure then + * `indexWriter.getTragicException()` will be null otherwise we have to rethrow and treat it as fatal or rather + * non-document failure + */ + return new IndexResult(ex, currentVersion, index.seqNo()); + } else { + throw ex; } } } + private boolean isForceUpdateDocument(Index index) { + boolean forceUpdateDocument; + long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get(); + if (index.isRetry()) { + forceUpdateDocument = true; + do { + deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get(); + if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) { + break; + } + } while (maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp, + index.getAutoGeneratedIdTimestamp()) == false); + assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp(); + } else { + // in this case we force + forceUpdateDocument = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp(); + } + return forceUpdateDocument; + } + private static void index(final List docs, final IndexWriter indexWriter) throws IOException { if (docs.size() > 1) { indexWriter.addDocuments(docs);