diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index d97fc47af25..0b27b4c037c 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -57,8 +57,7 @@ import org.elasticsearch.transport.TransportService; import java.util.Map; -import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnPrimary; -import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnReplica; +import static org.elasticsearch.action.delete.TransportDeleteAction.*; import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary; import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnReplica; import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException; @@ -125,44 +124,44 @@ public class TransportShardBulkAction extends TransportWriteAction) () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", request.shardId(), docWriteRequest.opType().getLowercase(), request), failure); @@ -214,11 +213,11 @@ public class TransportShardBulkAction extends TransportWriteAction= 0 && tookInNanos > indexWarnThreshold) { - indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) { - indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) { - indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) { - indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } + public void postIndex(Engine.Index index, boolean created) { + final long took = index.endTime() - index.startTime(); + postIndexing(index.parsedDoc(), took); + } + + + private void postIndexing(ParsedDocument doc, long tookInNanos) { + if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) { + indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) { + indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) { + indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) { + indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 11e9a375c78..137775e7a84 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -278,9 +278,105 @@ public abstract class Engine implements Closeable { } } - public abstract void index(Index operation); + public abstract IndexResult index(Index operation); - public abstract void delete(Delete delete); + public abstract DeleteResult delete(Delete delete); + + public abstract static class Result { + private final Operation.TYPE operationType; + private final Translog.Location location; + private final long version; + private final Exception failure; + private final long took; + private final int estimatedSizeInBytes; + + private Result(Operation.TYPE operationType, Translog.Location location, Exception failure, + long version, long took, int estimatedSizeInBytes) { + this.operationType = operationType; + this.location = location; + this.failure = failure; + this.version = version; + this.took = took; + this.estimatedSizeInBytes = estimatedSizeInBytes; + } + + protected Result(Operation.TYPE operationType, Translog.Location location, + long version, long took, int estimatedSizeInBytes) { + this(operationType, location, null, version, took, estimatedSizeInBytes); + } + + protected Result(Operation.TYPE operationType, Exception failure, + long version, long took, int estimatedSizeInBytes) { + this(operationType, null, failure, version, took, estimatedSizeInBytes); + } + + public boolean hasFailure() { + return failure != null; + } + + public long getVersion() { + return version; + } + + public Translog.Location getLocation() { + return location; + } + + public Exception getFailure() { + return failure; + } + + public long getTook() { + return took; + } + + public Operation.TYPE getOperationType() { + return operationType; + } + + public int getSizeInBytes() { + if (location != null) { + return location.size; + } + return estimatedSizeInBytes; + } + } + + public static class IndexResult extends Result { + private final boolean created; + + public IndexResult(Translog.Location location, long version, boolean created, long took, int estimatedSizeInBytes) { + super(Operation.TYPE.INDEX, location, version, took, estimatedSizeInBytes); + this.created = created; + } + + public IndexResult(Exception failure, long version, long took, int estimatedSizeInBytes) { + super(Operation.TYPE.INDEX, failure, version, took, estimatedSizeInBytes); + this.created = false; + } + + public boolean isCreated() { + return created; + } + } + + public static class DeleteResult extends Result { + private final boolean found; + + public DeleteResult(Translog.Location location, long version, boolean found, long took, int estimatedSizeInBytes) { + super(Operation.TYPE.DELETE, location, version, took, estimatedSizeInBytes); + this.found = found; + } + + DeleteResult(Exception failure, long version, long took, int estimatedSizeInBytes) { + super(Operation.TYPE.DELETE, failure, version, took, estimatedSizeInBytes); + this.found = false; + } + + public boolean isFound() { + return found; + } + } /** * Attempts to do a special commit where the given syncID is put into the commit data. The attempt @@ -771,7 +867,7 @@ public abstract class Engine implements Closeable { /** type of operation (index, delete), subclasses use static types */ public enum TYPE { - INDEX, DELETE, FAILURE; + INDEX, DELETE; private final String lowercase; @@ -785,13 +881,10 @@ public abstract class Engine implements Closeable { } private final Term uid; - private long version; + private final long version; private final VersionType versionType; private final Origin origin; - private Translog.Location location; - private Exception failure; private final long startTime; - private long endTime; public Operation(Term uid, long version, VersionType versionType, Origin origin, long startTime) { this.uid = uid; @@ -824,39 +917,7 @@ public abstract class Engine implements Closeable { return this.version; } - public void updateVersion(long version) { - this.version = version; - } - - public void setTranslogLocation(Translog.Location location) { - this.location = location; - } - - public Translog.Location getTranslogLocation() { - return this.location; - } - - public Exception getFailure() { - return failure; - } - - public void setFailure(Exception failure) { - this.failure = failure; - } - - public boolean hasFailure() { - return failure != null; - } - - public int sizeInBytes() { - if (location != null) { - return location.size; - } else { - return estimatedSizeInBytes(); - } - } - - protected abstract int estimatedSizeInBytes(); + public abstract int estimatedSizeInBytes(); public VersionType versionType() { return this.versionType; @@ -869,24 +930,11 @@ public abstract class Engine implements Closeable { return this.startTime; } - public void endTime(long endTime) { - this.endTime = endTime; - } - - /** - * Returns operation end time in nanoseconds. - */ - public long endTime() { - return this.endTime; - } - public abstract String type(); abstract String id(); - public abstract TYPE operationType(); - - public abstract String toString(); + abstract TYPE operationType(); } public static class Index extends Operation { @@ -894,7 +942,6 @@ public abstract class Engine implements Closeable { private final ParsedDocument doc; private final long autoGeneratedIdTimestamp; private final boolean isRetry; - private boolean created; public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime, long autoGeneratedIdTimestamp, boolean isRetry) { @@ -927,7 +974,7 @@ public abstract class Engine implements Closeable { } @Override - public TYPE operationType() { + TYPE operationType() { return TYPE.INDEX; } @@ -943,12 +990,6 @@ public abstract class Engine implements Closeable { return this.doc.ttl(); } - @Override - public void updateVersion(long version) { - super.updateVersion(version); - this.doc.version().setLongValue(version); - } - public String parent() { return this.doc.parent(); } @@ -961,16 +1002,8 @@ public abstract class Engine implements Closeable { return this.doc.source(); } - public boolean isCreated() { - return created; - } - - public void setCreated(boolean created) { - this.created = created; - } - @Override - protected int estimatedSizeInBytes() { + public int estimatedSizeInBytes() { return (id().length() + type().length()) * 2 + source().length() + 12; } @@ -991,31 +1024,25 @@ public abstract class Engine implements Closeable { return isRetry; } - @Override - public String toString() { - return "index [{" + type() + "}][{" + id()+ "}] [{" + docs() + "}]"; - } } public static class Delete extends Operation { private final String type; private final String id; - private boolean found; - public Delete(String type, String id, Term uid, long version, VersionType versionType, Origin origin, long startTime, boolean found) { + public Delete(String type, String id, Term uid, long version, VersionType versionType, Origin origin, long startTime) { super(uid, version, versionType, origin, startTime); this.type = type; this.id = id; - this.found = found; } public Delete(String type, String id, Term uid) { - this(type, id, uid, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), false); + this(type, id, uid, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime()); } public Delete(Delete template, VersionType versionType) { - this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime(), template.found()); + this(template.type(), template.id(), template.uid(), template.version(), versionType, template.origin(), template.startTime()); } @Override @@ -1029,74 +1056,16 @@ public abstract class Engine implements Closeable { } @Override - public TYPE operationType() { + TYPE operationType() { return TYPE.DELETE; } @Override - public String toString() { - return "delete [{"+ uid().text() +"}]"; - } - - public void updateVersion(long version, boolean found) { - updateVersion(version); - this.found = found; - } - - public boolean found() { - return this.found; - } - - @Override - protected int estimatedSizeInBytes() { + public int estimatedSizeInBytes() { return (uid().field().length() + uid().text().length()) * 2 + 20; } } - public static class Failure extends Operation { - - private final String type; - private final String id; - - public Failure(String type, String id, long version, VersionType versionType, Origin origin, - long startTime, Exception failure) { - super(null, version, versionType, origin, startTime); - this.type = type; - this.id = id; - setFailure(failure); - } - - @Override - public Term uid() { - throw new UnsupportedOperationException("failure operation doesn't have uid"); - } - - @Override - protected int estimatedSizeInBytes() { - return 0; - } - - @Override - public String type() { - return type; - } - - @Override - protected String id() { - return id; - } - - @Override - public TYPE operationType() { - return TYPE.FAILURE; - } - - @Override - public String toString() { - return "failure [{" + type() + "}][{" + id()+ "}]"; - } - } - public static class Get { private final boolean realtime; private final Term uid; diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 6e9bfc0e91c..7ec35d73a38 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -57,7 +57,7 @@ import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.VersionType; -import org.elasticsearch.index.engine.Engine.Operation; +import org.elasticsearch.index.mapper.ParseContext; import org.elasticsearch.index.mapper.Uid; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.OnGoingMerge; @@ -394,44 +394,47 @@ public class InternalEngine extends Engine { VersionValue apply(long updatedVersion, long time); } - private void maybeAddToTranslog( + private Translog.Location maybeAddToTranslog( final T op, final long updatedVersion, final Function toTranslogOp, final VersionValueSupplier toVersionValue) throws IOException { + Translog.Location location = null; if (op.origin() != Operation.Origin.LOCAL_TRANSLOG_RECOVERY) { - final Translog.Location translogLocation = translog.add(toTranslogOp.apply(op)); - op.setTranslogLocation(translogLocation); + location = translog.add(toTranslogOp.apply(op)); } versionMap.putUnderLock(op.uid().bytes(), toVersionValue.apply(updatedVersion, engineConfig.getThreadPool().estimatedTimeInMillis())); - + return location; } @Override - public void index(Index index) { + public IndexResult index(Index index) { + IndexResult result; try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); if (index.origin().isRecovery()) { // Don't throttle recovery operations - innerIndex(index); + result = innerIndex(index); } else { try (Releasable r = throttle.acquireThrottle()) { - innerIndex(index); + result = innerIndex(index); } } } catch (Exception e) { - handleOperationFailure(index, e); + Exception transientOperationFailure = handleOperationFailure(index, e); + result = new IndexResult(transientOperationFailure, index.version(), index.startTime() - System.nanoTime()); } + return result; } /** * Handle failures executing write operations, distinguish persistent engine (environment) failures * from document (request) specific failures. * Write failures that fail the engine as a side-effect, are thrown wrapped in {@link OperationFailedEngineException} - * and document specific failures are captured through {@link Operation#setFailure(Exception)} to be handled + * and document specific failures are returned to be set on the {@link Engine.Result} to be handled * at the transport level. */ - private void handleOperationFailure(final Operation operation, final Exception failure) { + private Exception handleOperationFailure(final Operation operation, final Exception failure) { boolean isEnvironmentFailure; try { // When indexing a document into Lucene, Lucene distinguishes between environment related errors @@ -451,7 +454,7 @@ public class InternalEngine extends Engine { throw new OperationFailedEngineException(shardId, operation.operationType().getLowercase(), operation.type(), operation.id(), failure); } else { - operation.setFailure(failure); + return failure; } } @@ -479,7 +482,9 @@ public class InternalEngine extends Engine { return false; } - private void innerIndex(Index index) throws IOException { + private IndexResult innerIndex(Index index) throws IOException { + final Translog.Location location; + final long updatedVersion; try (Releasable ignored = acquireLock(index.uid())) { lastWriteNanos = index.startTime(); /* if we have an autoGeneratedID that comes into the engine we can potentially optimize @@ -544,54 +549,52 @@ public class InternalEngine extends Engine { } final long expectedVersion = index.version(); if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) { - index.setCreated(false); - return; - } - final long updatedVersion = updateVersion(index, currentVersion, expectedVersion); - index.setCreated(deleted); - if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { - // document does not exists, we can optimize for create - index(index, indexWriter); + // skip index operation because of version conflict on recovery + return new IndexResult(null, expectedVersion, false, index.startTime() - System.nanoTime()); } else { - update(index, indexWriter); + updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion); + index.parsedDoc().version().setLongValue(updatedVersion); + if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) { + // document does not exists, we can optimize for create + index(index.docs(), indexWriter); + } else { + update(index.uid(), index.docs(), indexWriter); + } + location = maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE); + return new IndexResult(location, updatedVersion, deleted, index.startTime() - System.nanoTime()); } - maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE); } } - private long updateVersion(Operation op, long currentVersion, long expectedVersion) { - final long updatedVersion = op.versionType().updateVersion(currentVersion, expectedVersion); - op.updateVersion(updatedVersion); - return updatedVersion; - } - - private static void index(final Index index, final IndexWriter indexWriter) throws IOException { - if (index.docs().size() > 1) { - indexWriter.addDocuments(index.docs()); + private static void index(final List docs, final IndexWriter indexWriter) throws IOException { + if (docs.size() > 1) { + indexWriter.addDocuments(docs); } else { - indexWriter.addDocument(index.docs().get(0)); + indexWriter.addDocument(docs.get(0)); } } - private static void update(final Index index, final IndexWriter indexWriter) throws IOException { - if (index.docs().size() > 1) { - indexWriter.updateDocuments(index.uid(), index.docs()); + private static void update(final Term uid, final List docs, final IndexWriter indexWriter) throws IOException { + if (docs.size() > 1) { + indexWriter.updateDocuments(uid, docs); } else { - indexWriter.updateDocument(index.uid(), index.docs().get(0)); + indexWriter.updateDocument(uid, docs.get(0)); } } @Override - public void delete(Delete delete) { + public DeleteResult delete(Delete delete) { + DeleteResult result; try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: - innerDelete(delete); + result = innerDelete(delete); } catch (Exception e) { - handleOperationFailure(delete, e); + Exception transientOperationFailure = handleOperationFailure(delete, e); + result = new DeleteResult(transientOperationFailure, delete.version(), delete.startTime() - System.nanoTime()); } - maybePruneDeletedTombstones(); + return result; } private void maybePruneDeletedTombstones() { @@ -602,7 +605,10 @@ public class InternalEngine extends Engine { } } - private void innerDelete(Delete delete) throws IOException { + private DeleteResult innerDelete(Delete delete) throws IOException { + final Translog.Location location; + final long updatedVersion; + final boolean found; try (Releasable ignored = acquireLock(delete.uid())) { lastWriteNanos = delete.startTime(); final long currentVersion; @@ -618,19 +624,19 @@ public class InternalEngine extends Engine { } final long expectedVersion = delete.version(); - if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) return; - - final long updatedVersion = updateVersion(delete, currentVersion, expectedVersion); - - final boolean found = deleteIfFound(delete, currentVersion, deleted, versionValue); - - delete.updateVersion(updatedVersion, found); - - maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new); + if (checkVersionConflict(delete, currentVersion, expectedVersion, deleted)) { + // skip executing delete because of version conflict on recovery + return new DeleteResult(null, expectedVersion, true, delete.startTime() - System.nanoTime()); + } else { + updatedVersion = delete.versionType().updateVersion(currentVersion, expectedVersion); + found = deleteIfFound(delete.uid(), currentVersion, deleted, versionValue); + location = maybeAddToTranslog(delete, updatedVersion, Translog.Delete::new, DeleteVersionValue::new); + return new DeleteResult(location, updatedVersion, found, delete.startTime() - System.nanoTime()); + } } } - private boolean deleteIfFound(Delete delete, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException { + private boolean deleteIfFound(Term uid, long currentVersion, boolean deleted, VersionValue versionValue) throws IOException { final boolean found; if (currentVersion == Versions.NOT_FOUND) { // doc does not exist and no prior deletes @@ -640,7 +646,7 @@ public class InternalEngine extends Engine { found = false; } else { // we deleted a currently existing document - indexWriter.deleteDocuments(delete.uid()); + indexWriter.deleteDocuments(uid); found = true; } return found; diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java index 317d6f89408..d84f03e83dd 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java @@ -106,12 +106,12 @@ public class ShadowEngine extends Engine { @Override - public void index(Index index) { + public IndexResult index(Index index) { throw new UnsupportedOperationException(shardId + " index operation not allowed on shadow engine"); } @Override - public void delete(Delete delete) { + public DeleteResult delete(Delete delete) { throw new UnsupportedOperationException(shardId + " delete operation not allowed on shadow engine"); } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index cfb7414c904..442adc98196 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -93,7 +93,6 @@ import org.elasticsearch.index.get.ShardGetService; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapperForType; import org.elasticsearch.index.mapper.MappedFieldType; -import org.elasticsearch.index.mapper.MapperParsingException; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SourceToParse; @@ -500,30 +499,24 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return previousState; } - public Engine.Operation prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp, + public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp, boolean isRetry) { try { verifyPrimary(); return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY, autoGeneratedIdTimestamp, isRetry); - } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.Failure(source.type(), source.id(), version, versionType, Engine.Operation.Origin.PRIMARY, - System.nanoTime(), e); } catch (Exception e) { verifyNotClosed(e); throw e; } } - public Engine.Operation prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp, + public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp, boolean isRetry) { try { verifyReplicationTarget(); return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp, isRetry); - } catch (MapperParsingException | IllegalArgumentException e) { - return new Engine.Failure(source.type(), source.id(), version, versionType, Engine.Operation.Origin.PRIMARY, - System.nanoTime(), e); } catch (Exception e) { verifyNotClosed(e); throw e; @@ -543,37 +536,31 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } - public void execute(Engine.Operation operation) { - ensureWriteAllowed(operation); + public Engine.IndexResult index(Engine.Index index) { + ensureWriteAllowed(index); Engine engine = getEngine(); - execute(engine, operation); + return index(engine, index); } - private void execute(Engine engine, Engine.Operation operation) { + private Engine.IndexResult index(Engine engine, Engine.Index index) { active.set(true); - indexingOperationListeners.preOperation(operation); + final Engine.IndexResult result; + index = indexingOperationListeners.preIndex(index); try { if (logger.isTraceEnabled()) { - logger.trace(operation.toString()); + logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs()); } - switch (operation.operationType()) { - case INDEX: - engine.index(((Engine.Index) operation)); - break; - case DELETE: - engine.delete(((Engine.Delete) operation)); - break; - } - operation.endTime(System.nanoTime()); + result = engine.index(index); } catch (Exception e) { - indexingOperationListeners.postOperation(operation, e); + indexingOperationListeners.postIndex(index, e); throw e; } - if (operation.hasFailure()) { - indexingOperationListeners.postOperation(operation, operation.getFailure()); + if (result.hasFailure()) { + indexingOperationListeners.postIndex(index, result.getFailure()); } else { - indexingOperationListeners.postOperation(operation); + indexingOperationListeners.postIndex(index, result); } + return result; } public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) { @@ -595,7 +582,34 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl static Engine.Delete prepareDelete(String type, String id, Term uid, long version, VersionType versionType, Engine.Operation.Origin origin) { long startTime = System.nanoTime(); - return new Engine.Delete(type, id, uid, version, versionType, origin, startTime, false); + return new Engine.Delete(type, id, uid, version, versionType, origin, startTime); + } + + public Engine.DeleteResult delete(Engine.Delete delete) { + ensureWriteAllowed(delete); + Engine engine = getEngine(); + return delete(engine, delete); + } + + private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) { + active.set(true); + final Engine.DeleteResult result; + delete = indexingOperationListeners.preDelete(delete); + try { + if (logger.isTraceEnabled()) { + logger.trace("delete [{}]", delete.uid().text()); + } + result = engine.delete(delete); + } catch (Exception e) { + indexingOperationListeners.postDelete(delete, e); + throw e; + } + if (result.hasFailure()) { + indexingOperationListeners.postDelete(delete, result.getFailure()); + } else { + indexingOperationListeners.postDelete(delete, result); + } + return result; } public Engine.GetResult get(Engine.Get get) { @@ -1829,12 +1843,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl @Override protected void index(Engine engine, Engine.Index engineIndex) { - IndexShard.this.execute(engine, engineIndex); + IndexShard.this.index(engine, engineIndex); } @Override protected void delete(Engine engine, Engine.Delete engineDelete) { - IndexShard.this.execute(engine, engineDelete); + IndexShard.this.delete(engine, engineDelete); } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java index ec0fab2629f..042ddec924e 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java @@ -25,19 +25,49 @@ import org.elasticsearch.index.engine.Engine; import java.util.List; -/** An engine operation listener for index and delete execution. */ +/** + * An indexing listener for indexing, delete, events. + */ public interface IndexingOperationListener { - /** Called before executing index or delete operation */ - default void preOperation(Engine.Operation operation) {} + /** + * Called before the indexing occurs. + */ + default Engine.Index preIndex(Engine.Index operation) { + return operation; + } - /** Called after executing index or delete operation */ - default void postOperation(Engine.Operation operation) {} + /** + * Called after the indexing operation occurred. + */ + default void postIndex(Engine.Index index, boolean created) {} - /** Called after index or delete operation failed with exception */ - default void postOperation(Engine.Operation operation, Exception ex) {} + /** + * Called after the indexing operation occurred with exception. + */ + default void postIndex(Engine.Index index, Exception ex) {} - /** A Composite listener that multiplexes calls to each of the listeners methods. */ + /** + * Called before the delete occurs. + */ + default Engine.Delete preDelete(Engine.Delete delete) { + return delete; + } + + + /** + * Called after the delete operation occurred. + */ + default void postDelete(Engine.Delete delete) {} + + /** + * Called after the delete operation occurred with exception. + */ + default void postDelete(Engine.Delete delete, Exception ex) {} + + /** + * A Composite listener that multiplexes calls to each of the listeners methods. + */ final class CompositeListener implements IndexingOperationListener{ private final List listeners; private final Logger logger; @@ -48,40 +78,79 @@ public interface IndexingOperationListener { } @Override - public void preOperation(Engine.Operation operation) { + public Engine.Index preIndex(Engine.Index operation) { assert operation != null; for (IndexingOperationListener listener : listeners) { try { - listener.preOperation(operation); + listener.preIndex(operation); } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("preOperation listener [{}] failed", listener), e); + logger.warn((Supplier) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e); + } + } + return operation; + } + + @Override + public void postIndex(Engine.Index index, boolean created) { + assert index != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postIndex(index, created); + } catch (Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), e); } } } @Override - public void postOperation(Engine.Operation operation) { - assert operation != null; + public void postIndex(Engine.Index index, Exception ex) { + assert index != null && ex != null; for (IndexingOperationListener listener : listeners) { try { - listener.postOperation(operation); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), e); - } - } - } - - @Override - public void postOperation(Engine.Operation operation, Exception ex) { - assert operation != null && ex != null; - for (IndexingOperationListener listener : listeners) { - try { - listener.postOperation(operation, ex); + listener.postIndex(index, ex); } catch (Exception inner) { inner.addSuppressed(ex); - logger.warn((Supplier) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), inner); + logger.warn((Supplier) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), inner); + } + } + } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + assert delete != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.preDelete(delete); + } catch (Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e); + } + } + return delete; + } + + @Override + public void postDelete(Engine.Delete delete) { + assert delete != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postDelete(delete); + } catch (Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), e); + } + } + } + + @Override + public void postDelete(Engine.Delete delete, Exception ex) { + assert delete != null && ex != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postDelete(delete, ex); + } catch (Exception inner) { + inner.addSuppressed(ex); + logger.warn((Supplier) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), inner); } } } } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java index cd1b1526e0c..f62b8f7fe3c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java +++ b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -65,60 +65,63 @@ final class InternalIndexingStats implements IndexingOperationListener { } @Override - public void preOperation(Engine.Operation operation) { + public Engine.Index preIndex(Engine.Index operation) { if (!operation.origin().isRecovery()) { - StatsHolder statsHolder = typeStats(operation.type()); - switch (operation.operationType()) { - case INDEX: - totalStats.indexCurrent.inc(); - statsHolder.indexCurrent.inc(); - break; - case DELETE: - totalStats.deleteCurrent.inc(); - statsHolder.deleteCurrent.inc(); - break; - } + totalStats.indexCurrent.inc(); + typeStats(operation.type()).indexCurrent.inc(); + } + return operation; + } + + @Override + public void postIndex(Engine.Index index, boolean created) { + if (!index.origin().isRecovery()) { + long took = index.endTime() - index.startTime(); + totalStats.indexMetric.inc(took); + totalStats.indexCurrent.dec(); + StatsHolder typeStats = typeStats(index.type()); + typeStats.indexMetric.inc(took); + typeStats.indexCurrent.dec(); } } @Override - public void postOperation(Engine.Operation operation) { - if (!operation.origin().isRecovery()) { - long took = operation.endTime() - operation.startTime(); - StatsHolder typeStats = typeStats(operation.type()); - switch (operation.operationType()) { - case INDEX: - totalStats.indexMetric.inc(took); - totalStats.indexCurrent.dec(); - typeStats.indexMetric.inc(took); - typeStats.indexCurrent.dec(); - break; - case DELETE: - totalStats.deleteMetric.inc(took); - totalStats.deleteCurrent.dec(); - typeStats.deleteMetric.inc(took); - typeStats.deleteCurrent.dec(); - break; - } + public void postIndex(Engine.Index index, Exception ex) { + if (!index.origin().isRecovery()) { + totalStats.indexCurrent.dec(); + typeStats(index.type()).indexCurrent.dec(); + totalStats.indexFailed.inc(); + typeStats(index.type()).indexFailed.inc(); } } @Override - public void postOperation(Engine.Operation operation, Exception ex) { - if (!operation.origin().isRecovery()) { - StatsHolder statsHolder = typeStats(operation.type()); - switch (operation.operationType()) { - case INDEX: - totalStats.indexCurrent.dec(); - statsHolder.indexCurrent.dec(); - totalStats.indexFailed.inc(); - statsHolder.indexFailed.inc(); - break; - case DELETE: - totalStats.deleteCurrent.dec(); - statsHolder.deleteCurrent.dec(); - break; - } + public Engine.Delete preDelete(Engine.Delete delete) { + if (!delete.origin().isRecovery()) { + totalStats.deleteCurrent.inc(); + typeStats(delete.type()).deleteCurrent.inc(); + } + return delete; + + } + + @Override + public void postDelete(Engine.Delete delete) { + if (!delete.origin().isRecovery()) { + long took = delete.endTime() - delete.startTime(); + totalStats.deleteMetric.inc(took); + totalStats.deleteCurrent.dec(); + StatsHolder typeStats = typeStats(delete.type()); + typeStats.deleteMetric.inc(took); + typeStats.deleteCurrent.dec(); + } + } + + @Override + public void postDelete(Engine.Delete delete, Exception ex) { + if (!delete.origin().isRecovery()) { + totalStats.deleteCurrent.dec(); + typeStats(delete.type()).deleteCurrent.dec(); } } @@ -155,5 +158,10 @@ final class InternalIndexingStats implements IndexingOperationListener { deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(), noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis)); } + + void clear() { + indexMetric.clear(); + deleteMetric.clear(); + } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index 64ae0c77007..5e5d2a84131 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -170,7 +170,7 @@ public class TranslogRecoveryPerformer { logger.trace("[translog] recover [delete] op of [{}][{}]", uid.type(), uid.id()); } final Engine.Delete engineDelete = new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(), - delete.versionType().versionTypeForReplicationAndRecovery(), origin, System.nanoTime(), false); + delete.versionType().versionTypeForReplicationAndRecovery(), origin, System.nanoTime()); delete(engine, engineDelete); break; default: diff --git a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index da5a9b7c28e..3b4258a8bdf 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -189,6 +189,11 @@ public class IndexingMemoryController extends AbstractComponent implements Index statusChecker.run(); } + /** called by IndexShard to record that this many bytes were written to translog */ + public void bytesWritten(int bytes) { + statusChecker.bytesWritten(bytes); + } + /** Asks this shard to throttle indexing to one thread */ protected void activateThrottling(IndexShard shard) { shard.activateThrottling(); @@ -200,8 +205,17 @@ public class IndexingMemoryController extends AbstractComponent implements Index } @Override - public void postOperation(Engine.Operation operation) { - statusChecker.bytesWritten(operation.sizeInBytes()); + public void postIndex(Engine.Index index, boolean created) { + recordOperationBytes(index); + } + + @Override + public void postDelete(Engine.Delete delete) { + recordOperationBytes(delete); + } + + private void recordOperationBytes(Engine.Operation op) { + bytesWritten(op.sizeInBytes()); } private static final class ShardAndBytesUsed implements Comparable { diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 1cf56c50234..5e9d1ffaf9e 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -233,10 +233,9 @@ public class IndexModuleTests extends ESTestCase { AtomicBoolean executed = new AtomicBoolean(false); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public void preOperation(Engine.Operation operation) { - if (operation.operationType() == Engine.Operation.TYPE.INDEX) { - executed.set(true); - } + public Engine.Index preIndex(Engine.Index operation) { + executed.set(true); + return operation; } }; module.addIndexOperationListener(listener); @@ -252,7 +251,7 @@ public class IndexModuleTests extends ESTestCase { Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); for (IndexingOperationListener l : indexService.getIndexOperationListeners()) { - l.preOperation(index); + l.preIndex(index); } assertTrue(executed.get()); indexService.close("simon says", false); diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java index 70f326a3ff0..0b40d4cea52 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java @@ -334,11 +334,11 @@ public class InternalEngineTests extends ESTestCase { // create two docs and refresh ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null); Engine.Index first = new Engine.Index(newUid("1"), doc); - engine.index(first); + Engine.IndexResult firstResult = engine.index(first); ParsedDocument doc2 = testParsedDocument("2", "2", "test", null, -1, -1, testDocumentWithTextField(), B_2, null); Engine.Index second = new Engine.Index(newUid("2"), doc2); - engine.index(second); - assertThat(second.getTranslogLocation(), greaterThan(first.getTranslogLocation())); + Engine.IndexResult secondResult = engine.index(second); + assertThat(secondResult.getLocation(), greaterThan(firstResult.getLocation())); engine.refresh("test"); segments = engine.segments(false); @@ -628,7 +628,7 @@ public class InternalEngineTests extends ESTestCase { operations.add(operation); initialEngine.index(operation); } else { - final Engine.Delete operation = new Engine.Delete("test", "1", newUid("test#1"), i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false); + final Engine.Delete operation = new Engine.Delete("test", "1", newUid("test#1"), i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()); operations.add(operation); initialEngine.delete(operation); } @@ -1039,82 +1039,82 @@ public class InternalEngineTests extends ESTestCase { public void testVersioningNewCreate() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED); - engine.index(create); - assertThat(create.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(create); + assertThat(indexResult.getVersion(), equalTo(1L)); - create = new Engine.Index(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - replicaEngine.index(create); - assertThat(create.version(), equalTo(1L)); + create = new Engine.Index(newUid("1"), doc, indexResult.getVersion(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + indexResult = replicaEngine.index(create); + assertThat(indexResult.getVersion(), equalTo(1L)); } public void testVersioningNewIndex() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(1L)); - index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - replicaEngine.index(index); - assertThat(index.version(), equalTo(1L)); + index = new Engine.Index(newUid("1"), doc, indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + indexResult = replicaEngine.index(index); + assertThat(indexResult.getVersion(), equalTo(1L)); } public void testExternalVersioningNewIndex() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertThat(index.version(), equalTo(12L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(12L)); - index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - replicaEngine.index(index); - assertThat(index.version(), equalTo(12L)); + index = new Engine.Index(newUid("1"), doc, indexResult.getVersion(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + indexResult = replicaEngine.index(index); + assertThat(indexResult.getVersion(), equalTo(12L)); } public void testVersioningIndexConflict() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(1L)); index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(2L)); + indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(2L)); index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // future versions should not work as well index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testExternalVersioningIndexConflict() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertThat(index.version(), equalTo(12L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(12L)); index = new Engine.Index(newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertThat(index.version(), equalTo(14L)); + indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(14L)); index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testForceVersioningNotAllowedExceptForOlderIndices() throws Exception { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc, 42, VersionType.FORCE, PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(IllegalArgumentException.class)); - assertThat(index.getFailure().getMessage(), containsString("version type [FORCE] may not be used for indices created after 6.0")); + Engine.IndexResult indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(IllegalArgumentException.class)); + assertThat(indexResult.getFailure().getMessage(), containsString("version type [FORCE] may not be used for indices created after 6.0")); IndexSettings oldIndexSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder() .put(IndexMetaData.SETTING_VERSION_CREATED, Version.V_5_0_0_beta1) @@ -1122,58 +1122,58 @@ public class InternalEngineTests extends ESTestCase { try (Store store = createStore(); Engine engine = createEngine(oldIndexSettings, store, createTempDir(), NoMergePolicy.INSTANCE)) { index = new Engine.Index(newUid("1"), doc, 84, VersionType.FORCE, PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(IllegalArgumentException.class)); - assertThat(index.getFailure().getMessage(), containsString("version type [FORCE] may not be used for non-translog operations")); + Engine.IndexResult result = engine.index(index); + assertTrue(result.hasFailure()); + assertThat(result.getFailure(), instanceOf(IllegalArgumentException.class)); + assertThat(result.getFailure().getMessage(), containsString("version type [FORCE] may not be used for non-translog operations")); index = new Engine.Index(newUid("1"), doc, 84, VersionType.FORCE, Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY, 0, -1, false); - engine.index(index); - assertThat(index.version(), equalTo(84L)); + result = engine.index(index); + assertThat(result.getVersion(), equalTo(84L)); } } public void testVersioningIndexConflictWithFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(1L)); index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(2L)); + indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(2L)); engine.flush(); index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // future versions should not work as well index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testExternalVersioningIndexConflictWithFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertThat(index.version(), equalTo(12L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(12L)); index = new Engine.Index(newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertThat(index.version(), equalTo(14L)); + indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(14L)); engine.flush(); index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testForceMerge() throws IOException { @@ -1274,202 +1274,202 @@ public class InternalEngineTests extends ESTestCase { public void testVersioningDeleteConflict() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(1L)); index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(2L)); + indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(2L)); - Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0, false); - engine.delete(delete); - assertTrue(delete.hasFailure()); - assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class)); + Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0); + Engine.DeleteResult result = engine.delete(delete); + assertTrue(result.hasFailure()); + assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); // future versions should not work as well - delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0, false); - engine.delete(delete); - assertTrue(delete.hasFailure()); - assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class)); + delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0); + result = engine.delete(delete); + assertTrue(result.hasFailure()); + assertThat(result.getFailure(), instanceOf(VersionConflictEngineException.class)); // now actually delete - delete = new Engine.Delete("test", "1", newUid("1"), 2L, VersionType.INTERNAL, PRIMARY, 0, false); - engine.delete(delete); - assertThat(delete.version(), equalTo(3L)); + delete = new Engine.Delete("test", "1", newUid("1"), 2L, VersionType.INTERNAL, PRIMARY, 0); + result = engine.delete(delete); + assertThat(result.getVersion(), equalTo(3L)); // now check if we can index to a delete doc with version index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testVersioningDeleteConflictWithFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getFailure(), equalTo(1L)); index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(2L)); + indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(2L)); engine.flush(); - Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0, false); - engine.delete(delete); - assertTrue(delete.hasFailure()); - assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class)); + Engine.Delete delete = new Engine.Delete("test", "1", newUid("1"), 1L, VersionType.INTERNAL, PRIMARY, 0); + Engine.DeleteResult deleteResult = engine.delete(delete); + assertTrue(deleteResult.hasFailure()); + assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // future versions should not work as well - delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0, false); - engine.delete(delete); - assertTrue(delete.hasFailure()); - assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class)); + delete = new Engine.Delete("test", "1", newUid("1"), 3L, VersionType.INTERNAL, PRIMARY, 0); + deleteResult = engine.delete(delete); + assertTrue(deleteResult.hasFailure()); + assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class)); engine.flush(); // now actually delete - delete = new Engine.Delete("test", "1", newUid("1"), 2L, VersionType.INTERNAL, PRIMARY, 0, false); - engine.delete(delete); - assertThat(delete.version(), equalTo(3L)); + delete = new Engine.Delete("test", "1", newUid("1"), 2L, VersionType.INTERNAL, PRIMARY, 0); + deleteResult = engine.delete(delete); + assertThat(deleteResult.getVersion(), equalTo(3L)); engine.flush(); // now check if we can index to a delete doc with version index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testVersioningCreateExistsException() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); - engine.index(create); - assertThat(create.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(create); + assertThat(indexResult.getVersion(), equalTo(1L)); create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); - engine.index(create); - assertTrue(create.hasFailure()); - assertThat(create.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(create); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testVersioningCreateExistsExceptionWithFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); - engine.index(create); - assertThat(create.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(create); + assertThat(indexResult.getVersion(), equalTo(1L)); engine.flush(); create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false); - engine.index(create); - assertTrue(create.hasFailure()); - assertThat(create.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(create); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testVersioningReplicaConflict1() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(1L)); index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(2L)); + indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(2L)); // apply the second index to the replica, should work fine - index = new Engine.Index(newUid("1"), doc, index.version(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - replicaEngine.index(index); - assertThat(index.version(), equalTo(2L)); + index = new Engine.Index(newUid("1"), doc, indexResult.getVersion(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); + indexResult = replicaEngine.index(index); + assertThat(indexResult.getVersion(), equalTo(2L)); // now, the old one should not work index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - replicaEngine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = replicaEngine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // second version on replica should fail as well index = new Engine.Index(newUid("1"), doc, 2L , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - replicaEngine.index(index); - assertThat(index.version(), equalTo(2L)); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = replicaEngine.index(index); + assertThat(indexResult.getVersion(), equalTo(2L)); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testVersioningReplicaConflict2() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(1L)); // apply the first index to the replica, should work fine index = new Engine.Index(newUid("1"), doc, 1L , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - replicaEngine.index(index); - assertThat(index.version(), equalTo(1L)); + indexResult = replicaEngine.index(index); + assertThat(indexResult.getVersion(), equalTo(1L)); // index it again index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertThat(index.version(), equalTo(2L)); + indexResult = engine.index(index); + assertThat(indexResult.getVersion(), equalTo(2L)); // now delete it Engine.Delete delete = new Engine.Delete("test", "1", newUid("1")); - engine.delete(delete); - assertThat(delete.version(), equalTo(3L)); + Engine.DeleteResult deleteResult = engine.delete(delete); + assertThat(deleteResult.getVersion(), equalTo(3L)); // apply the delete on the replica (skipping the second index) delete = new Engine.Delete("test", "1", newUid("1"), 3L - , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false); - replicaEngine.delete(delete); - assertThat(delete.version(), equalTo(3L)); + , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); + deleteResult = replicaEngine.delete(delete); + assertThat(deleteResult.getVersion(), equalTo(3L)); // second time delete with same version should fail delete = new Engine.Delete("test", "1", newUid("1"), 3L - , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, false); - replicaEngine.delete(delete); - assertTrue(delete.hasFailure()); - assertThat(delete.getFailure(), instanceOf(VersionConflictEngineException.class)); + , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0); + deleteResult = replicaEngine.delete(delete); + assertTrue(deleteResult.hasFailure()); + assertThat(deleteResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // now do the second index on the replica, it should fail index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false); - replicaEngine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = replicaEngine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); } public void testBasicCreatedFlag() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertTrue(index.isCreated()); + Engine.IndexResult indexResult = engine.index(index); + assertTrue(indexResult.isCreated()); index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertFalse(index.isCreated()); + indexResult = engine.index(index); + assertFalse(indexResult.isCreated()); engine.delete(new Engine.Delete(null, "1", newUid("1"))); index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertTrue(index.isCreated()); + indexResult = engine.index(index); + assertTrue(indexResult.isCreated()); } public void testCreatedFlagAfterFlush() { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null); Engine.Index index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertTrue(index.isCreated()); + Engine.IndexResult indexResult = engine.index(index); + assertTrue(indexResult.isCreated()); engine.delete(new Engine.Delete(null, "1", newUid("1"))); engine.flush(); index = new Engine.Index(newUid("1"), doc); - engine.index(index); - assertTrue(index.isCreated()); + indexResult = engine.index(index); + assertTrue(indexResult.isCreated()); } private static class MockAppender extends AbstractAppender { @@ -1572,7 +1572,7 @@ public class InternalEngineTests extends ESTestCase { engine.index(new Engine.Index(newUid("1"), doc, 1, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false)); // Delete document we just added: - engine.delete(new Engine.Delete("test", "1", newUid("1"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false)); + engine.delete(new Engine.Delete("test", "1", newUid("1"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document Engine.GetResult getResult = engine.get(new Engine.Get(true, newUid("1"))); @@ -1586,7 +1586,7 @@ public class InternalEngineTests extends ESTestCase { } // Delete non-existent document - engine.delete(new Engine.Delete("test", "2", newUid("2"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false)); + engine.delete(new Engine.Delete("test", "2", newUid("2"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime())); // Get should not find the document (we never indexed uid=2): getResult = engine.get(new Engine.Get(true, newUid("2"))); @@ -1594,9 +1594,9 @@ public class InternalEngineTests extends ESTestCase { // Try to index uid=1 with a too-old version, should fail: Engine.Index index = new Engine.Index(newUid("1"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false); - engine.index(index); - assertTrue(index.hasFailure()); - assertThat(index.getFailure(), instanceOf(VersionConflictEngineException.class)); + Engine.IndexResult indexResult = engine.index(index); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should still not find the document getResult = engine.get(new Engine.Get(true, newUid("1"))); @@ -1604,9 +1604,9 @@ public class InternalEngineTests extends ESTestCase { // Try to index uid=2 with a too-old version, should fail: Engine.Index index1 = new Engine.Index(newUid("2"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false); - engine.index(index1); - assertTrue(index1.hasFailure()); - assertThat(index1.getFailure(), instanceOf(VersionConflictEngineException.class)); + indexResult = engine.index(index1); + assertTrue(indexResult.hasFailure()); + assertThat(indexResult.getFailure(), instanceOf(VersionConflictEngineException.class)); // Get should not find the document getResult = engine.get(new Engine.Get(true, newUid("2"))); @@ -1702,8 +1702,8 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null); Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - engine.index(firstIndexRequest); - assertThat(firstIndexRequest.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(firstIndexRequest); + assertThat(indexResult.getVersion(), equalTo(1L)); } engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { @@ -1752,8 +1752,8 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null); Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - engine.index(firstIndexRequest); - assertThat(firstIndexRequest.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(firstIndexRequest); + assertThat(indexResult.getVersion(), equalTo(1L)); } engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { @@ -1842,8 +1842,8 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < numExtraDocs; i++) { ParsedDocument doc = testParsedDocument("extra" + Integer.toString(i), "extra" + Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null); Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - engine.index(firstIndexRequest); - assertThat(firstIndexRequest.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(firstIndexRequest); + assertThat(indexResult.getVersion(), equalTo(1L)); } engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { @@ -1871,8 +1871,8 @@ public class InternalEngineTests extends ESTestCase { for (int i = 0; i < numDocs; i++) { ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null); Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false); - engine.index(firstIndexRequest); - assertThat(firstIndexRequest.version(), equalTo(1L)); + Engine.IndexResult indexResult = engine.index(firstIndexRequest); + assertThat(indexResult.getVersion(), equalTo(1L)); } engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { @@ -2165,25 +2165,25 @@ public class InternalEngineTests extends ESTestCase { Engine.Index operation = randomAppendOnly(1, doc, false); Engine.Index retry = randomAppendOnly(1, doc, true); if (randomBoolean()) { - engine.index(operation); + Engine.IndexResult indexResult = engine.index(operation); assertFalse(engine.indexWriterHasDeletions()); assertEquals(0, engine.getNumVersionLookups()); - assertNotNull(operation.getTranslogLocation()); - engine.index(retry); + assertNotNull(indexResult.getLocation()); + Engine.IndexResult retryResult = engine.index(retry); assertTrue(engine.indexWriterHasDeletions()); assertEquals(0, engine.getNumVersionLookups()); - assertNotNull(retry.getTranslogLocation()); - assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) > 0); + assertNotNull(retryResult.getLocation()); + assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) > 0); } else { - engine.index(retry); + Engine.IndexResult retryResult = engine.index(retry); assertTrue(engine.indexWriterHasDeletions()); assertEquals(0, engine.getNumVersionLookups()); - assertNotNull(retry.getTranslogLocation()); - engine.index(operation); + assertNotNull(retryResult.getLocation()); + Engine.IndexResult indexResult = engine.index(operation); assertTrue(engine.indexWriterHasDeletions()); assertEquals(0, engine.getNumVersionLookups()); - assertNotNull(retry.getTranslogLocation()); - assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) < 0); + assertNotNull(retryResult.getLocation()); + assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) < 0); } engine.refresh("test"); @@ -2194,17 +2194,17 @@ public class InternalEngineTests extends ESTestCase { operation = randomAppendOnly(1, doc, false); retry = randomAppendOnly(1, doc, true); if (randomBoolean()) { - engine.index(operation); - assertNotNull(operation.getTranslogLocation()); - engine.index(retry); - assertNotNull(retry.getTranslogLocation()); - assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) > 0); + Engine.IndexResult indexResult = engine.index(operation); + assertNotNull(indexResult.getLocation()); + Engine.IndexResult retryResult = engine.index(retry); + assertNotNull(retryResult.getLocation()); + assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) > 0); } else { - engine.index(retry); - assertNotNull(retry.getTranslogLocation()); - engine.index(operation); - assertNotNull(retry.getTranslogLocation()); - assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) < 0); + Engine.IndexResult retryResult = engine.index(retry); + assertNotNull(retryResult.getLocation()); + Engine.IndexResult indexResult = engine.index(operation); + assertNotNull(retryResult.getLocation()); + assertTrue(retryResult.getLocation().compareTo(indexResult.getLocation()) < 0); } engine.refresh("test"); @@ -2265,8 +2265,8 @@ public class InternalEngineTests extends ESTestCase { isRetry = false; Engine.Index secondIndexRequest = new Engine.Index(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry); - engine.index(secondIndexRequest); - assertTrue(secondIndexRequest.isCreated()); + Engine.IndexResult indexResult = engine.index(secondIndexRequest); + assertTrue(indexResult.isCreated()); engine.refresh("test"); try (Engine.Searcher searcher = engine.acquireSearcher("test")) { TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10); diff --git a/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java b/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java index 7313b7ec9bf..846d2c56669 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java @@ -219,7 +219,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase { assertEquals("b", fields[1].stringValue()); IndexShard shard = indexService.getShard(0); - shard.execute(new Engine.Index(new Term("_uid", "1"), doc)); + shard.index(new Engine.Index(new Term("_uid", "1"), doc)); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); @@ -258,7 +258,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase { assertEquals("b", fields[1].stringValue()); IndexShard shard = indexService.getShard(0); - shard.execute(new Engine.Index(new Term("_uid", "1"), doc)); + shard.index(new Engine.Index(new Term("_uid", "1"), doc)); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); diff --git a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index caf7fdf335b..89487c7aa0c 100644 --- a/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -366,19 +366,19 @@ public abstract class ESIndexLevelReplicationTestCase extends IndexShardTestCase @Override protected PrimaryResult performOnPrimary(IndexShard primary, IndexRequest request) throws Exception { - final Engine.Operation operation = executeIndexRequestOnPrimary(request, primary, + final Engine.IndexResult indexResult = executeIndexRequestOnPrimary(request, primary, null); request.primaryTerm(primary.getPrimaryTerm()); - TransportWriteActionTestHelper.performPostWriteActions(primary, request, operation.getTranslogLocation(), logger); - IndexResponse response = new IndexResponse(primary.shardId(), request.type(), request.id(), operation.version(), - ((Engine.Index) operation).isCreated()); + TransportWriteActionTestHelper.performPostWriteActions(primary, request, indexResult.getLocation(), logger); + IndexResponse response = new IndexResponse(primary.shardId(), request.type(), request.id(), indexResult.getVersion(), + indexResult.isCreated()); return new PrimaryResult(request, response); } @Override protected void performOnReplica(IndexRequest request, IndexShard replica) { - final Engine.Operation operation = executeIndexRequestOnReplica(request, replica); - TransportWriteActionTestHelper.performPostWriteActions(replica, request, operation.getTranslogLocation(), logger); + final Engine.IndexResult result = executeIndexRequestOnReplica(request, replica); + TransportWriteActionTestHelper.performPostWriteActions(replica, request, result.getLocation(), logger); } } } diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 900bea75724..2248ff156ac 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -321,7 +321,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); - shard.execute(index); + shard.index(index); assertTrue(shard.shouldFlush()); assertEquals(2, shard.getEngine().getTranslog().totalOperations()); client().prepareIndex("test", "test", "2").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); @@ -406,8 +406,23 @@ public class IndexShardIT extends ESSingleNodeTestCase { AtomicReference shardRef = new AtomicReference<>(); List failures = new ArrayList<>(); IndexingOperationListener listener = new IndexingOperationListener() { + @Override - public void postOperation(Engine.Operation operation) { + public void postIndex(Engine.Index index, boolean created) { + try { + assertNotNull(shardRef.get()); + // this is all IMC needs to do - check current memory and refresh + assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); + shardRef.get().refresh("test"); + } catch (Exception e) { + failures.add(e); + throw e; + } + } + + + @Override + public void postDelete(Engine.Delete delete) { try { assertNotNull(shardRef.get()); // this is all IMC needs to do - check current memory and refresh diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 8ab0566158e..9c6f21c6958 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -558,43 +558,40 @@ public class IndexShardTests extends IndexShardTestCase { shard.close("simon says", true); shard = reinitShard(shard, new IndexingOperationListener() { @Override - public void preOperation(Engine.Operation operation) { - switch (operation.operationType()) { - case INDEX: - preIndex.incrementAndGet(); - break; - case DELETE: - preDelete.incrementAndGet(); - break; + public Engine.Index preIndex(Engine.Index operation) { + preIndex.incrementAndGet(); + return operation; + } + + @Override + public void postIndex(Engine.Index index, boolean created) { + if (created) { + postIndexCreate.incrementAndGet(); + } else { + postIndexUpdate.incrementAndGet(); } } @Override - public void postOperation(Engine.Operation operation) { - switch (operation.operationType()) { - case INDEX: - if (((Engine.Index) operation).isCreated()) { - postIndexCreate.incrementAndGet(); - } else { - postIndexUpdate.incrementAndGet(); - } - break; - case DELETE: - postDelete.incrementAndGet(); - break; - } + public void postIndex(Engine.Index index, Exception ex) { + postIndexException.incrementAndGet(); } @Override - public void postOperation(Engine.Operation operation, Exception ex) { - switch (operation.operationType()) { - case INDEX: - postIndexException.incrementAndGet(); - break; - case DELETE: - postDeleteException.incrementAndGet(); - break; - } + public Engine.Delete preDelete(Engine.Delete delete) { + preDelete.incrementAndGet(); + return delete; + } + + @Override + public void postDelete(Engine.Delete delete) { + postDelete.incrementAndGet(); + } + + @Override + public void postDelete(Engine.Delete delete, Exception ex) { + postDeleteException.incrementAndGet(); + } }); recoveryShardFromStore(shard); @@ -602,7 +599,7 @@ public class IndexShardTests extends IndexShardTestCase { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); - shard.execute(index); + shard.index(index); assertEquals(1, preIndex.get()); assertEquals(1, postIndexCreate.get()); assertEquals(0, postIndexUpdate.get()); @@ -611,7 +608,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, postDelete.get()); assertEquals(0, postDeleteException.get()); - shard.execute(index); + shard.index(index); assertEquals(2, preIndex.get()); assertEquals(1, postIndexCreate.get()); assertEquals(1, postIndexUpdate.get()); @@ -621,7 +618,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, postDeleteException.get()); Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); - shard.execute(delete); + shard.delete(delete); assertEquals(2, preIndex.get()); assertEquals(1, postIndexCreate.get()); @@ -635,7 +632,7 @@ public class IndexShardTests extends IndexShardTestCase { shard.state = IndexShardState.STARTED; // It will generate exception try { - shard.execute(index); + shard.index(index); fail(); } catch (IllegalIndexShardStateException e) { @@ -649,7 +646,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(1, postDelete.get()); assertEquals(0, postDeleteException.get()); try { - shard.execute(delete); + shard.delete(delete); fail(); } catch (IllegalIndexShardStateException e) { @@ -1124,27 +1121,26 @@ public class IndexShardTests extends IndexShardTestCase { final AtomicInteger postDelete = new AtomicInteger(); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public void preOperation(Engine.Operation operation) { - switch (operation.operationType()) { - case INDEX: - preIndex.incrementAndGet(); - break; - case DELETE: - preDelete.incrementAndGet(); - break; - } + public Engine.Index preIndex(Engine.Index operation) { + preIndex.incrementAndGet(); + return operation; } @Override - public void postOperation(Engine.Operation operation) { - switch (operation.operationType()) { - case INDEX: - postIndex.incrementAndGet(); - break; - case DELETE: - postDelete.incrementAndGet(); - break; - } + public void postIndex(Engine.Index index, boolean created) { + postIndex.incrementAndGet(); + } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + preDelete.incrementAndGet(); + return delete; + } + + @Override + public void postDelete(Engine.Delete delete) { + postDelete.incrementAndGet(); + } }; final IndexShard newShard = reinitShard(shard, listener); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java index bb652c6630b..d1cf8b32f58 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java @@ -40,55 +40,63 @@ public class IndexingOperationListenerTests extends ESTestCase{ AtomicInteger postDeleteException = new AtomicInteger(); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public void preOperation(Engine.Operation operation) { - switch (operation.operationType()) { - case INDEX: - preIndex.incrementAndGet(); - break; - case DELETE: - preDelete.incrementAndGet(); - break; - } + public Engine.Index preIndex(Engine.Index operation) { + preIndex.incrementAndGet(); + return operation; } @Override - public void postOperation(Engine.Operation operation) { - switch (operation.operationType()) { - case INDEX: - postIndex.incrementAndGet(); - break; - case DELETE: - postDelete.incrementAndGet(); - break; - } + public void postIndex(Engine.Index index, boolean created) { + postIndex.incrementAndGet(); } @Override - public void postOperation(Engine.Operation operation, Exception ex) { - switch (operation.operationType()) { - case INDEX: - postIndexException.incrementAndGet(); - break; - case DELETE: - postDeleteException.incrementAndGet(); - break; - } + public void postIndex(Engine.Index index, Exception ex) { + postIndexException.incrementAndGet(); + } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + preDelete.incrementAndGet(); + return delete; + } + + @Override + public void postDelete(Engine.Delete delete) { + postDelete.incrementAndGet(); + } + + @Override + public void postDelete(Engine.Delete delete, Exception ex) { + postDeleteException.incrementAndGet(); } }; IndexingOperationListener throwingListener = new IndexingOperationListener() { @Override - public void preOperation(Engine.Operation operation) { + public Engine.Index preIndex(Engine.Index operation) { throw new RuntimeException(); } @Override - public void postOperation(Engine.Operation operation) { + public void postIndex(Engine.Index index, boolean created) { + throw new RuntimeException(); } + + @Override + public void postIndex(Engine.Index index, Exception ex) { + throw new RuntimeException(); } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { throw new RuntimeException(); } @Override - public void postOperation(Engine.Operation operation, Exception ex) { + public void postDelete(Engine.Delete delete) { + throw new RuntimeException(); } + + @Override + public void postDelete(Engine.Delete delete, Exception ex) { throw new RuntimeException(); } }; @@ -103,7 +111,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger); Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); - compositeListener.postOperation(delete); + compositeListener.postDelete(delete); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -111,7 +119,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(0, postDeleteException.get()); - compositeListener.postOperation(delete, new RuntimeException()); + compositeListener.postDelete(delete, new RuntimeException()); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -119,7 +127,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.preOperation(delete); + compositeListener.preDelete(delete); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -127,7 +135,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.postOperation(index); + compositeListener.postIndex(index, false); assertEquals(0, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -135,7 +143,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.postOperation(index, new RuntimeException()); + compositeListener.postIndex(index, new RuntimeException()); assertEquals(0, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(2, postIndexException.get()); @@ -143,7 +151,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.preOperation(index); + compositeListener.preIndex(index); assertEquals(2, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(2, postIndexException.get()); diff --git a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java index 05147d4a72a..f0f53d9fdc9 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java @@ -137,14 +137,14 @@ public class RefreshListenersTests extends ESTestCase { public void testTooMany() throws Exception { assertFalse(listeners.refreshNeeded()); - Engine.Index index = index("1"); + Engine.IndexResult index = index("1"); // Fill the listener slots List nonForcedListeners = new ArrayList<>(maxListeners); for (int i = 0; i < maxListeners; i++) { DummyRefreshListener listener = new DummyRefreshListener(); nonForcedListeners.add(listener); - listeners.addOrNotify(index.getTranslogLocation(), listener); + listeners.addOrNotify(index.getLocation(), listener); assertTrue(listeners.refreshNeeded()); } @@ -155,7 +155,7 @@ public class RefreshListenersTests extends ESTestCase { // Add one more listener which should cause a refresh. DummyRefreshListener forcingListener = new DummyRefreshListener(); - listeners.addOrNotify(index.getTranslogLocation(), forcingListener); + listeners.addOrNotify(index.getLocation(), forcingListener); assertTrue("Forced listener wasn't forced?", forcingListener.forcedRefresh.get()); forcingListener.assertNoError(); @@ -168,7 +168,7 @@ public class RefreshListenersTests extends ESTestCase { } public void testAfterRefresh() throws Exception { - Engine.Index index = index("1"); + Engine.IndexResult index = index("1"); engine.refresh("I said so"); if (randomBoolean()) { index(randomFrom("1" /* same document */, "2" /* different document */)); @@ -178,7 +178,7 @@ public class RefreshListenersTests extends ESTestCase { } DummyRefreshListener listener = new DummyRefreshListener(); - assertTrue(listeners.addOrNotify(index.getTranslogLocation(), listener)); + assertTrue(listeners.addOrNotify(index.getLocation(), listener)); assertFalse(listener.forcedRefresh.get()); listener.assertNoError(); } @@ -198,9 +198,9 @@ public class RefreshListenersTests extends ESTestCase { refresher.start(); try { for (int i = 0; i < 1000; i++) { - Engine.Index index = index("1"); + Engine.IndexResult index = index("1"); DummyRefreshListener listener = new DummyRefreshListener(); - boolean immediate = listeners.addOrNotify(index.getTranslogLocation(), listener); + boolean immediate = listeners.addOrNotify(index.getLocation(), listener); if (immediate) { assertNotNull(listener.forcedRefresh.get()); } else { @@ -234,18 +234,18 @@ public class RefreshListenersTests extends ESTestCase { for (int iteration = 1; iteration <= 50; iteration++) { try { String testFieldValue = String.format(Locale.ROOT, "%s%04d", threadId, iteration); - Engine.Index index = index(threadId, testFieldValue); - assertEquals(iteration, index.version()); + Engine.IndexResult index = index(threadId, testFieldValue); + assertEquals(iteration, index.getVersion()); DummyRefreshListener listener = new DummyRefreshListener(); - listeners.addOrNotify(index.getTranslogLocation(), listener); + listeners.addOrNotify(index.getLocation(), listener); assertBusy(() -> assertNotNull("listener never called", listener.forcedRefresh.get())); if (threadCount < maxListeners) { assertFalse(listener.forcedRefresh.get()); } listener.assertNoError(); - Engine.Get get = new Engine.Get(false, index.uid()); + Engine.Get get = new Engine.Get(false, new Term("_uid", "test:"+threadId)); try (Engine.GetResult getResult = engine.get(get)) { assertTrue("document not found", getResult.exists()); assertEquals(iteration, getResult.version()); @@ -267,11 +267,11 @@ public class RefreshListenersTests extends ESTestCase { refresher.cancel(); } - private Engine.Index index(String id) { + private Engine.IndexResult index(String id) { return index(id, "test"); } - private Engine.Index index(String id, String testFieldValue) { + private Engine.IndexResult index(String id, String testFieldValue) { String type = "test"; String uid = type + ":" + id; Document document = new Document(); @@ -283,8 +283,7 @@ public class RefreshListenersTests extends ESTestCase { BytesReference source = new BytesArray(new byte[] { 1 }); ParsedDocument doc = new ParsedDocument(versionField, id, type, null, -1, -1, Arrays.asList(document), source, null); Engine.Index index = new Engine.Index(new Term("_uid", uid), doc); - engine.index(index); - return index; + return engine.index(index); } private static class DummyRefreshListener implements Consumer { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java index dd1250999ba..8da47f1eeaf 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java @@ -197,14 +197,25 @@ public class CancelTests extends ReindexTestCase { } public static class BlockingOperationListener implements IndexingOperationListener { + @Override - public void preOperation(Engine.Operation operation) { - if ((TYPE.equals(operation.type()) == false) || (operation.origin() != Origin.PRIMARY)) { - return; + public Engine.Index preIndex(Engine.Index index) { + return preCheck(index, index.type()); + } + + @Override + public Engine.Delete preDelete(Engine.Delete delete) { + return preCheck(delete, delete.type()); + } + + private T preCheck(T operation, String type) { + if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) { + return operation; } + try { if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) { - return; + return operation; } } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index d86c08bbb05..c73f22fa73b 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -451,7 +451,7 @@ public abstract class IndexShardTestCase extends ESTestCase { SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); } - shard.execute(index); + shard.index(index); return ((Engine.Index) index); } @@ -462,7 +462,7 @@ public abstract class IndexShardTestCase extends ESTestCase { } else { delete = shard.prepareDeleteOnPrimary(type, id, 1, VersionType.EXTERNAL); } - shard.execute(delete); + shard.delete(delete); return delete; }