From 4f49a261a748b6fa75e0da46b11f4f4a8011ab47 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 21 Jun 2016 19:07:03 -0400 Subject: [PATCH] Refactor InternalEngine inner methods This commit refactors InternalEngine#innerIndex and InternalEngine#innerDelete to collapse some common logic into a single method. This has the advantage that it shrinks the bytecode size of InternalEngine#innerIndex so that it can be inlined. --- .../elasticsearch/index/engine/Engine.java | 8 + .../index/engine/InternalEngine.java | 204 ++++++++++-------- .../index/engine/LiveVersionMap.java | 9 +- 3 files changed, 127 insertions(+), 94 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 87ffb9331a6..5c1a20c16e3 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -840,6 +840,10 @@ public abstract class Engine implements Closeable { public long endTime() { return this.endTime; } + + abstract String type(); + + abstract String id(); } public static class Index extends Operation { @@ -863,10 +867,12 @@ public abstract class Engine implements Closeable { return this.doc; } + @Override public String type() { return this.doc.type(); } + @Override public String id() { return this.doc.id(); } @@ -929,10 +935,12 @@ public abstract class Engine implements Closeable { this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime(), template.found()); } + @Override public String type() { return this.type; } + @Override public String id() { return this.id; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 15667e79421..449c04ce4d5 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -314,7 +314,7 @@ public class InternalEngine extends Engine { try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); if (get.realtime()) { - VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes()); + VersionValue versionValue = versionMap.getUnderLock(get.uid()); if (versionValue != null) { if (versionValue.delete()) { return GetResult.NOT_EXISTS; @@ -336,6 +336,59 @@ public class InternalEngine extends Engine { } } + private boolean checkVersionConflict( + final Operation op, + final long currentVersion, + final long expectedVersion, + final boolean deleted) { + if (op.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { + if (op.origin().isRecovery()) { + // version conflict, but okay + return true; + } else { + // fatal version conflict + throw new VersionConflictEngineException(shardId, op.type(), op.id(), + op.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); + } + } + return false; + } + + private long checkDeletedAndGCed(VersionValue versionValue) { + long currentVersion; + if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) { + currentVersion = Versions.NOT_FOUND; // deleted, and GC + } else { + currentVersion = versionValue.version(); + } + return currentVersion; + } + + private static VersionValueSupplier NEW_VERSION_VALUE = (u, t, l) -> new VersionValue(u, l); + + @FunctionalInterface + private interface VersionValueSupplier { + VersionValue apply(long updatedVersion, long time, Translog.Location location); + } + + private void maybeAddToTranslog( + final T op, + final long updatedVersion, + final Function toTranslogOp, + final VersionValueSupplier toVersionValue) throws IOException { + if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { + final Translog.Location translogLocation = translog.add(toTranslogOp.apply(op)); + op.setTranslogLocation(translogLocation); + versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), op.getTranslogLocation())); + } else { + // we do not replay in to the translog, so there is no + // translog location; that is okay because real-time + // gets are not possible during recovery and we will + // flush when the recovery is complete + versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), null)); + } + } + @Override public boolean index(Index index) { final boolean created; @@ -361,58 +414,56 @@ public class InternalEngine extends Engine { lastWriteNanos = index.startTime(); final long currentVersion; final boolean deleted; - VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes()); + final VersionValue versionValue = versionMap.getUnderLock(index.uid()); if (versionValue == null) { currentVersion = loadCurrentVersionFromIndex(index.uid()); deleted = currentVersion == Versions.NOT_FOUND; } else { + currentVersion = checkDeletedAndGCed(versionValue); deleted = versionValue.delete(); - if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC - } else { - currentVersion = versionValue.version(); - } } - long expectedVersion = index.version(); - if (isVersionConflictForWrites(index, currentVersion, deleted, expectedVersion)) { - if (!index.origin().isRecovery()) { - throw new VersionConflictEngineException(shardId, index.type(), index.id(), - index.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); - } - return false; - } - long updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); + final long expectedVersion = index.version(); + if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) return false; - final boolean created; - index.updateVersion(updatedVersion); + final long updatedVersion = updateVersion(index, currentVersion, expectedVersion); - if (currentVersion == Versions.NOT_FOUND) { - // document does not exists, we can optimize for create - created = true; - index(index, indexWriter); - } else { - created = update(index, versionValue, indexWriter); - } + final boolean created = indexOrUpdate(index, currentVersion, versionValue); - if (index.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - final Translog.Location translogLocation = translog.add(new Translog.Index(index)); - index.setTranslogLocation(translogLocation); - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, index.getTranslogLocation())); - } else { - // we do not replay in to the translog, so there is no - // translog location; that is okay because real-time - // gets are not possible during recovery and we will - // flush when the recovery is complete - versionMap.putUnderLock(index.uid().bytes(), new VersionValue(updatedVersion, null)); - } + maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE); return created; } } - private static boolean update(Index index, VersionValue versionValue, IndexWriter indexWriter) throws IOException { - boolean created; + private long updateVersion(Engine.Operation op, long currentVersion, long expectedVersion) { + final long updatedVersion = op.versionType().updateVersion(currentVersion, expectedVersion); + op.updateVersion(updatedVersion); + return updatedVersion; + } + + private boolean indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException { + final boolean created; + if (currentVersion == Versions.NOT_FOUND) { + // document does not exists, we can optimize for create + created = true; + index(index, indexWriter); + } else { + created = update(index, versionValue, indexWriter); + } + return created; + } + + private static void index(final Index index, final IndexWriter indexWriter) throws IOException { + if (index.docs().size() > 1) { + indexWriter.addDocuments(index.docs()); + } else { + indexWriter.addDocument(index.docs().get(0)); + } + } + + private static boolean update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException { + final boolean created; if (versionValue != null) { created = versionValue.delete(); // we have a delete which is not GC'ed... } else { @@ -426,18 +477,6 @@ public class InternalEngine extends Engine { return created; } - private static void index(Index index, IndexWriter indexWriter) throws IOException { - if (index.docs().size() > 1) { - indexWriter.addDocuments(index.docs()); - } else { - indexWriter.addDocument(index.docs().get(0)); - } - } - - private boolean isVersionConflictForWrites(Index index, long currentVersion, boolean deleted, long expectedVersion) { - return index.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted); - } - @Override public void delete(Delete delete) throws EngineException { try (ReleasableLock lock = readLock.acquire()) { @@ -465,59 +504,44 @@ public class InternalEngine extends Engine { lastWriteNanos = delete.startTime(); final long currentVersion; final boolean deleted; - VersionValue versionValue = versionMap.getUnderLock(delete.uid().bytes()); + final VersionValue versionValue = versionMap.getUnderLock(delete.uid()); if (versionValue == null) { currentVersion = loadCurrentVersionFromIndex(delete.uid()); deleted = currentVersion == Versions.NOT_FOUND; } else { + currentVersion = checkDeletedAndGCed(versionValue); deleted = versionValue.delete(); - if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > getGcDeletesInMillis()) { - currentVersion = Versions.NOT_FOUND; // deleted, and GC - } else { - currentVersion = versionValue.version(); - } } - long updatedVersion; - long expectedVersion = delete.version(); - if (delete.versionType().isVersionConflictForWrites(currentVersion, expectedVersion, deleted)) { - if (delete.origin().isRecovery()) { - return; - } else { - throw new VersionConflictEngineException(shardId, delete.type(), delete.id(), - delete.versionType().explainConflictForWrites(currentVersion, expectedVersion, deleted)); - } - } - updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); - final boolean found; - if (currentVersion == Versions.NOT_FOUND) { - // doc does not exist and no prior deletes - found = false; - } else if (versionValue != null && versionValue.delete()) { - // a "delete on delete", in this case, we still increment the version, log it, and return that version - found = false; - } else { - // we deleted a currently existing document - indexWriter.deleteDocuments(delete.uid()); - found = true; - } + final long expectedVersion = delete.version(); + if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) return; + + final long updatedVersion = updateVersion(delete, currentVersion, expectedVersion); + + final boolean found = deleteIfFound(delete, currentVersion, deleted, versionValue); delete.updateVersion(updatedVersion, found); - if (delete.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - final Translog.Location translogLocation = translog.add(new Translog.Delete(delete)); - delete.setTranslogLocation(translogLocation); - versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), delete.getTranslogLocation())); - } else { - // we do not replay in to the translog, so there is no - // translog location; that is okay because real-time - // gets are not possible during recovery and we will - // flush when the recovery is complete - versionMap.putUnderLock(delete.uid().bytes(), new DeleteVersionValue(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis(), null)); - } + maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new); } } + private boolean deleteIfFound(Delete delete, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException { + final boolean found; + if (currentVersion == Versions.NOT_FOUND) { + // doc does not exist and no prior deletes + found = false; + } else if (versionValue != null && deleted) { + // a "delete on delete", in this case, we still increment the version, log it, and return that version + found = false; + } else { + // we deleted a currently existing document + indexWriter.deleteDocuments(delete.uid()); + found = true; + } + return found; + } + @Override public void refresh(String source) throws EngineException { // we obtain a read lock here, since we don't want a flush to happen while we are refreshing diff --git a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index f962d31bf8b..3cf3c83749c 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.engine; +import org.apache.lucene.index.Term; import org.apache.lucene.search.ReferenceManager; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.BytesRef; @@ -126,21 +127,21 @@ class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { } /** Returns the live version (add or delete) for this uid. */ - VersionValue getUnderLock(BytesRef uid) { + VersionValue getUnderLock(final Term uid) { Maps currentMaps = maps; // First try to get the "live" value: - VersionValue value = currentMaps.current.get(uid); + VersionValue value = currentMaps.current.get(uid.bytes()); if (value != null) { return value; } - value = currentMaps.old.get(uid); + value = currentMaps.old.get(uid.bytes()); if (value != null) { return value; } - return tombstones.get(uid); + return tombstones.get(uid.bytes()); } /** Adds this uid/version to the pending adds map. */