From 1bdeada8aa9e699d5c864d7257fea926426ce474 Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Fri, 14 Oct 2016 17:50:43 -0400 Subject: [PATCH] Generify index shard method to execute engine write operation Now index and delete methods in index shard share code for indexing stats. This commit collapses seperate methods for index and delete operations into a generic execute method for performing engine write operations. As an added benefit, this commit cleans up the interface for indexing operation listener making it more simple and concise to use. --- .../action/bulk/TransportShardBulkAction.java | 3 +- .../action/delete/TransportDeleteAction.java | 4 +- .../action/index/TransportIndexAction.java | 22 +-- .../elasticsearch/index/IndexingSlowLog.java | 29 ++-- .../elasticsearch/index/engine/Engine.java | 19 ++- .../elasticsearch/index/shard/IndexShard.java | 61 +++------ .../shard/IndexingOperationListener.java | 125 ++++-------------- .../index/shard/InternalIndexingStats.java | 98 +++++++------- .../indices/IndexingMemoryController.java | 18 +-- .../elasticsearch/index/IndexModuleTests.java | 9 +- .../index/mapper/TextFieldMapperTests.java | 4 +- .../index/shard/IndexShardIT.java | 19 +-- .../index/shard/IndexShardTests.java | 102 +++++++------- .../shard/IndexingOperationListenerTests.java | 80 +++++------ .../index/reindex/CancelTests.java | 19 +-- .../index/shard/IndexShardTestCase.java | 4 +- 16 files changed, 243 insertions(+), 373 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 3021361b23e..7401fc6f63c 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -57,7 +57,8 @@ import org.elasticsearch.transport.TransportService; import java.util.Map; -import static org.elasticsearch.action.delete.TransportDeleteAction.*; +import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnPrimary; +import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnReplica; import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary; import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException; import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException; diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 3cfb2930447..5b83f288b07 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -129,7 +129,7 @@ public class TransportDeleteAction extends TransportWriteAction executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) { Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType()); - primary.delete(delete); + primary.execute(delete); if (delete.hasFailure()) { return new PrimaryOperationResult<>(delete.getFailure()); } else { @@ -145,7 +145,7 @@ public class TransportDeleteAction extends TransportWriteAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, + public static PrimaryOperationResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, MappingUpdatedAction mappingUpdatedAction) throws Exception { Engine.Index operation; try { - operation = prepareIndexOperationOnPrimary(request, indexShard); + operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { return new PrimaryOperationResult<>(e); } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); - final ShardId shardId = indexShard.shardId(); + final ShardId shardId = primary.shardId(); if (update != null) { try { // can throw timeout exception when updating mappings or ISE for attempting to update default mappings // which are bubbled up mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update); - operation = prepareIndexOperationOnPrimary(request, indexShard); + operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { return new PrimaryOperationResult<>(e); } @@ -208,7 +208,7 @@ public class TransportIndexAction extends TransportWriteAction(operation.getFailure()); } else { diff --git a/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java index 513e87878d6..d2ebc4e78de 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java +++ b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java @@ -133,23 +133,20 @@ public final class IndexingSlowLog implements IndexingOperationListener { this.reformat = reformat; } - @Override - public void postIndex(Engine.Index index, boolean created) { - final long took = index.endTime() - index.startTime(); - postIndexing(index.parsedDoc(), took); - } - - - private void postIndexing(ParsedDocument doc, long tookInNanos) { - if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) { - indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) { - indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) { - indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) { - indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + public void postOperation(Engine.Operation operation) { + if (operation.operationType() == Engine.Operation.TYPE.INDEX) { + final long tookInNanos = operation.endTime() - operation.startTime(); + ParsedDocument doc = ((Engine.Index) operation).parsedDoc(); + if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) { + indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) { + indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) { + indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) { + indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 8b4150dc1b4..cfb2532280e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -880,11 +880,13 @@ public abstract class Engine implements Closeable { return this.endTime; } - abstract String type(); + public abstract String type(); abstract String id(); - abstract TYPE operationType(); + public abstract TYPE operationType(); + + public abstract String toString(); } public static class Index extends Operation { @@ -925,7 +927,7 @@ public abstract class Engine implements Closeable { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.INDEX; } @@ -989,6 +991,10 @@ public abstract class Engine implements Closeable { return isRetry; } + @Override + public String toString() { + return "index [{" + type() + "}][{" + id()+ "}] [{" + docs() + "}]"; + } } public static class Delete extends Operation { @@ -1023,10 +1029,15 @@ public abstract class Engine implements Closeable { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.DELETE; } + @Override + public String toString() { + return "delete [{"+ uid().text() +"}]"; + } + public void updateVersion(long version, boolean found) { updateVersion(version); this.found = found; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0eb8d1b076e..adc1fa876cb 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -536,29 +536,36 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } - public void index(Engine.Index index) { - ensureWriteAllowed(index); + public void execute(Engine.Operation operation) { + ensureWriteAllowed(operation); Engine engine = getEngine(); - index(engine, index); + execute(engine, operation); } - private void index(Engine engine, Engine.Index index) { + private void execute(Engine engine, Engine.Operation operation) { active.set(true); - index = indexingOperationListeners.preIndex(index); + indexingOperationListeners.preOperation(operation); try { if (logger.isTraceEnabled()) { - logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs()); + logger.trace(operation.toString()); } - engine.index(index); - index.endTime(System.nanoTime()); + switch (operation.operationType()) { + case INDEX: + engine.index(((Engine.Index) operation)); + break; + case DELETE: + engine.delete(((Engine.Delete) operation)); + break; + } + operation.endTime(System.nanoTime()); } catch (Exception e) { - indexingOperationListeners.postIndex(index, e); + indexingOperationListeners.postOperation(operation, e); throw e; } - if (index.hasFailure()) { - indexingOperationListeners.postIndex(index, index.getFailure()); + if (operation.hasFailure()) { + indexingOperationListeners.postOperation(operation, operation.getFailure()); } else { - indexingOperationListeners.postIndex(index, index.isCreated()); + indexingOperationListeners.postOperation(operation); } } @@ -584,32 +591,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Delete(type, id, uid, version, versionType, origin, startTime, false); } - public void delete(Engine.Delete delete) { - ensureWriteAllowed(delete); - Engine engine = getEngine(); - delete(engine, delete); - } - - private void delete(Engine engine, Engine.Delete delete) { - active.set(true); - delete = indexingOperationListeners.preDelete(delete); - try { - if (logger.isTraceEnabled()) { - logger.trace("delete [{}]", delete.uid().text()); - } - engine.delete(delete); - delete.endTime(System.nanoTime()); - } catch (Exception e) { - indexingOperationListeners.postDelete(delete, e); - throw e; - } - if (delete.hasFailure()) { - indexingOperationListeners.postDelete(delete, delete.getFailure()); - } else { - indexingOperationListeners.postDelete(delete); - } - } - public Engine.GetResult get(Engine.Get get) { readAllowed(); return getEngine().get(get, this::acquireSearcher); @@ -1841,12 +1822,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl @Override protected void index(Engine engine, Engine.Index engineIndex) { - IndexShard.this.index(engine, engineIndex); + IndexShard.this.execute(engine, engineIndex); } @Override protected void delete(Engine engine, Engine.Delete engineDelete) { - IndexShard.this.delete(engine, engineDelete); + IndexShard.this.execute(engine, engineDelete); } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java index 042ddec924e..ec0fab2629f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java @@ -25,49 +25,19 @@ import org.elasticsearch.index.engine.Engine; import java.util.List; -/** - * An indexing listener for indexing, delete, events. - */ +/** An engine operation listener for index and delete execution. */ public interface IndexingOperationListener { - /** - * Called before the indexing occurs. - */ - default Engine.Index preIndex(Engine.Index operation) { - return operation; - } + /** Called before executing index or delete operation */ + default void preOperation(Engine.Operation operation) {} - /** - * Called after the indexing operation occurred. - */ - default void postIndex(Engine.Index index, boolean created) {} + /** Called after executing index or delete operation */ + default void postOperation(Engine.Operation operation) {} - /** - * Called after the indexing operation occurred with exception. - */ - default void postIndex(Engine.Index index, Exception ex) {} + /** Called after index or delete operation failed with exception */ + default void postOperation(Engine.Operation operation, Exception ex) {} - /** - * Called before the delete occurs. - */ - default Engine.Delete preDelete(Engine.Delete delete) { - return delete; - } - - - /** - * Called after the delete operation occurred. - */ - default void postDelete(Engine.Delete delete) {} - - /** - * Called after the delete operation occurred with exception. - */ - default void postDelete(Engine.Delete delete, Exception ex) {} - - /** - * A Composite listener that multiplexes calls to each of the listeners methods. - */ + /** A Composite listener that multiplexes calls to each of the listeners methods. */ final class CompositeListener implements IndexingOperationListener{ private final List listeners; private final Logger logger; @@ -78,79 +48,40 @@ public interface IndexingOperationListener { } @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preOperation(Engine.Operation operation) { assert operation != null; for (IndexingOperationListener listener : listeners) { try { - listener.preIndex(operation); + listener.preOperation(operation); } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e); - } - } - return operation; - } - - @Override - public void postIndex(Engine.Index index, boolean created) { - assert index != null; - for (IndexingOperationListener listener : listeners) { - try { - listener.postIndex(index, created); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), e); + logger.warn((Supplier) () -> new ParameterizedMessage("preOperation listener [{}] failed", listener), e); } } } @Override - public void postIndex(Engine.Index index, Exception ex) { - assert index != null && ex != null; + public void postOperation(Engine.Operation operation) { + assert operation != null; for (IndexingOperationListener listener : listeners) { try { - listener.postIndex(index, ex); + listener.postOperation(operation); + } catch (Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), e); + } + } + } + + @Override + public void postOperation(Engine.Operation operation, Exception ex) { + assert operation != null && ex != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postOperation(operation, ex); } catch (Exception inner) { inner.addSuppressed(ex); - logger.warn((Supplier) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), inner); - } - } - } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { - assert delete != null; - for (IndexingOperationListener listener : listeners) { - try { - listener.preDelete(delete); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e); - } - } - return delete; - } - - @Override - public void postDelete(Engine.Delete delete) { - assert delete != null; - for (IndexingOperationListener listener : listeners) { - try { - listener.postDelete(delete); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), e); - } - } - } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { - assert delete != null && ex != null; - for (IndexingOperationListener listener : listeners) { - try { - listener.postDelete(delete, ex); - } catch (Exception inner) { - inner.addSuppressed(ex); - logger.warn((Supplier) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), inner); + logger.warn((Supplier) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), inner); } } } } -} +} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java index f62b8f7fe3c..cd1b1526e0c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java +++ b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -65,63 +65,60 @@ final class InternalIndexingStats implements IndexingOperationListener { } @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preOperation(Engine.Operation operation) { if (!operation.origin().isRecovery()) { - totalStats.indexCurrent.inc(); - typeStats(operation.type()).indexCurrent.inc(); - } - return operation; - } - - @Override - public void postIndex(Engine.Index index, boolean created) { - if (!index.origin().isRecovery()) { - long took = index.endTime() - index.startTime(); - totalStats.indexMetric.inc(took); - totalStats.indexCurrent.dec(); - StatsHolder typeStats = typeStats(index.type()); - typeStats.indexMetric.inc(took); - typeStats.indexCurrent.dec(); + StatsHolder statsHolder = typeStats(operation.type()); + switch (operation.operationType()) { + case INDEX: + totalStats.indexCurrent.inc(); + statsHolder.indexCurrent.inc(); + break; + case DELETE: + totalStats.deleteCurrent.inc(); + statsHolder.deleteCurrent.inc(); + break; + } } } @Override - public void postIndex(Engine.Index index, Exception ex) { - if (!index.origin().isRecovery()) { - totalStats.indexCurrent.dec(); - typeStats(index.type()).indexCurrent.dec(); - totalStats.indexFailed.inc(); - typeStats(index.type()).indexFailed.inc(); + public void postOperation(Engine.Operation operation) { + if (!operation.origin().isRecovery()) { + long took = operation.endTime() - operation.startTime(); + StatsHolder typeStats = typeStats(operation.type()); + switch (operation.operationType()) { + case INDEX: + totalStats.indexMetric.inc(took); + totalStats.indexCurrent.dec(); + typeStats.indexMetric.inc(took); + typeStats.indexCurrent.dec(); + break; + case DELETE: + totalStats.deleteMetric.inc(took); + totalStats.deleteCurrent.dec(); + typeStats.deleteMetric.inc(took); + typeStats.deleteCurrent.dec(); + break; + } } } @Override - public Engine.Delete preDelete(Engine.Delete delete) { - if (!delete.origin().isRecovery()) { - totalStats.deleteCurrent.inc(); - typeStats(delete.type()).deleteCurrent.inc(); - } - return delete; - - } - - @Override - public void postDelete(Engine.Delete delete) { - if (!delete.origin().isRecovery()) { - long took = delete.endTime() - delete.startTime(); - totalStats.deleteMetric.inc(took); - totalStats.deleteCurrent.dec(); - StatsHolder typeStats = typeStats(delete.type()); - typeStats.deleteMetric.inc(took); - typeStats.deleteCurrent.dec(); - } - } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { - if (!delete.origin().isRecovery()) { - totalStats.deleteCurrent.dec(); - typeStats(delete.type()).deleteCurrent.dec(); + public void postOperation(Engine.Operation operation, Exception ex) { + if (!operation.origin().isRecovery()) { + StatsHolder statsHolder = typeStats(operation.type()); + switch (operation.operationType()) { + case INDEX: + totalStats.indexCurrent.dec(); + statsHolder.indexCurrent.dec(); + totalStats.indexFailed.inc(); + statsHolder.indexFailed.inc(); + break; + case DELETE: + totalStats.deleteCurrent.dec(); + statsHolder.deleteCurrent.dec(); + break; + } } } @@ -158,10 +155,5 @@ final class InternalIndexingStats implements IndexingOperationListener { deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(), noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis)); } - - void clear() { - indexMetric.clear(); - deleteMetric.clear(); - } } } diff --git a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index 3b4258a8bdf..da5a9b7c28e 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -189,11 +189,6 @@ public class IndexingMemoryController extends AbstractComponent implements Index statusChecker.run(); } - /** called by IndexShard to record that this many bytes were written to translog */ - public void bytesWritten(int bytes) { - statusChecker.bytesWritten(bytes); - } - /** Asks this shard to throttle indexing to one thread */ protected void activateThrottling(IndexShard shard) { shard.activateThrottling(); @@ -205,17 +200,8 @@ public class IndexingMemoryController extends AbstractComponent implements Index } @Override - public void postIndex(Engine.Index index, boolean created) { - recordOperationBytes(index); - } - - @Override - public void postDelete(Engine.Delete delete) { - recordOperationBytes(delete); - } - - private void recordOperationBytes(Engine.Operation op) { - bytesWritten(op.sizeInBytes()); + public void postOperation(Engine.Operation operation) { + statusChecker.bytesWritten(operation.sizeInBytes()); } private static final class ShardAndBytesUsed implements Comparable { diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 5e9d1ffaf9e..1cf56c50234 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -233,9 +233,10 @@ public class IndexModuleTests extends ESTestCase { AtomicBoolean executed = new AtomicBoolean(false); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { - executed.set(true); - return operation; + public void preOperation(Engine.Operation operation) { + if (operation.operationType() == Engine.Operation.TYPE.INDEX) { + executed.set(true); + } } }; module.addIndexOperationListener(listener); @@ -251,7 +252,7 @@ public class IndexModuleTests extends ESTestCase { Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); for (IndexingOperationListener l : indexService.getIndexOperationListeners()) { - l.preIndex(index); + l.preOperation(index); } assertTrue(executed.get()); indexService.close("simon says", false); diff --git a/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java b/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java index 846d2c56669..7313b7ec9bf 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java @@ -219,7 +219,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase { assertEquals("b", fields[1].stringValue()); IndexShard shard = indexService.getShard(0); - shard.index(new Engine.Index(new Term("_uid", "1"), doc)); + shard.execute(new Engine.Index(new Term("_uid", "1"), doc)); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); @@ -258,7 +258,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase { assertEquals("b", fields[1].stringValue()); IndexShard shard = indexService.getShard(0); - shard.index(new Engine.Index(new Term("_uid", "1"), doc)); + shard.execute(new Engine.Index(new Term("_uid", "1"), doc)); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 2248ff156ac..900bea75724 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -321,7 +321,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); - shard.index(index); + shard.execute(index); assertTrue(shard.shouldFlush()); assertEquals(2, shard.getEngine().getTranslog().totalOperations()); client().prepareIndex("test", "test", "2").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); @@ -406,23 +406,8 @@ public class IndexShardIT extends ESSingleNodeTestCase { AtomicReference shardRef = new AtomicReference<>(); List failures = new ArrayList<>(); IndexingOperationListener listener = new IndexingOperationListener() { - @Override - public void postIndex(Engine.Index index, boolean created) { - try { - assertNotNull(shardRef.get()); - // this is all IMC needs to do - check current memory and refresh - assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); - shardRef.get().refresh("test"); - } catch (Exception e) { - failures.add(e); - throw e; - } - } - - - @Override - public void postDelete(Engine.Delete delete) { + public void postOperation(Engine.Operation operation) { try { assertNotNull(shardRef.get()); // this is all IMC needs to do - check current memory and refresh diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b531aae0532..99ab4e03a9c 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -560,40 +560,43 @@ public class IndexShardTests extends IndexShardTestCase { shard.close("simon says", true); shard = reinitShard(shard, new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { - preIndex.incrementAndGet(); - return operation; - } - - @Override - public void postIndex(Engine.Index index, boolean created) { - if (created) { - postIndexCreate.incrementAndGet(); - } else { - postIndexUpdate.incrementAndGet(); + public void preOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + preIndex.incrementAndGet(); + break; + case DELETE: + preDelete.incrementAndGet(); + break; } } @Override - public void postIndex(Engine.Index index, Exception ex) { - postIndexException.incrementAndGet(); + public void postOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + if (((Engine.Index) operation).isCreated()) { + postIndexCreate.incrementAndGet(); + } else { + postIndexUpdate.incrementAndGet(); + } + break; + case DELETE: + postDelete.incrementAndGet(); + break; + } } @Override - public Engine.Delete preDelete(Engine.Delete delete) { - preDelete.incrementAndGet(); - return delete; - } - - @Override - public void postDelete(Engine.Delete delete) { - postDelete.incrementAndGet(); - } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { - postDeleteException.incrementAndGet(); - + public void postOperation(Engine.Operation operation, Exception ex) { + switch (operation.operationType()) { + case INDEX: + postIndexException.incrementAndGet(); + break; + case DELETE: + postDeleteException.incrementAndGet(); + break; + } } }); recoveryShardFromStore(shard); @@ -601,7 +604,7 @@ public class IndexShardTests extends IndexShardTestCase { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); - shard.index(index); + shard.execute(index); assertEquals(1, preIndex.get()); assertEquals(1, postIndexCreate.get()); assertEquals(0, postIndexUpdate.get()); @@ -610,7 +613,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, postDelete.get()); assertEquals(0, postDeleteException.get()); - shard.index(index); + shard.execute(index); assertEquals(2, preIndex.get()); assertEquals(1, postIndexCreate.get()); assertEquals(1, postIndexUpdate.get()); @@ -620,7 +623,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, postDeleteException.get()); Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); - shard.delete(delete); + shard.execute(delete); assertEquals(2, preIndex.get()); assertEquals(1, postIndexCreate.get()); @@ -634,7 +637,7 @@ public class IndexShardTests extends IndexShardTestCase { shard.state = IndexShardState.STARTED; // It will generate exception try { - shard.index(index); + shard.execute(index); fail(); } catch (IllegalIndexShardStateException e) { @@ -648,7 +651,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(1, postDelete.get()); assertEquals(0, postDeleteException.get()); try { - shard.delete(delete); + shard.execute(delete); fail(); } catch (IllegalIndexShardStateException e) { @@ -1123,26 +1126,27 @@ public class IndexShardTests extends IndexShardTestCase { final AtomicInteger postDelete = new AtomicInteger(); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { - preIndex.incrementAndGet(); - return operation; + public void preOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + preIndex.incrementAndGet(); + break; + case DELETE: + preDelete.incrementAndGet(); + break; + } } @Override - public void postIndex(Engine.Index index, boolean created) { - postIndex.incrementAndGet(); - } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { - preDelete.incrementAndGet(); - return delete; - } - - @Override - public void postDelete(Engine.Delete delete) { - postDelete.incrementAndGet(); - + public void postOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + postIndex.incrementAndGet(); + break; + case DELETE: + postDelete.incrementAndGet(); + break; + } } }; final IndexShard newShard = reinitShard(shard, listener); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java index d1cf8b32f58..bb652c6630b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java @@ -40,63 +40,55 @@ public class IndexingOperationListenerTests extends ESTestCase{ AtomicInteger postDeleteException = new AtomicInteger(); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { - preIndex.incrementAndGet(); - return operation; + public void preOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + preIndex.incrementAndGet(); + break; + case DELETE: + preDelete.incrementAndGet(); + break; + } } @Override - public void postIndex(Engine.Index index, boolean created) { - postIndex.incrementAndGet(); + public void postOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + postIndex.incrementAndGet(); + break; + case DELETE: + postDelete.incrementAndGet(); + break; + } } @Override - public void postIndex(Engine.Index index, Exception ex) { - postIndexException.incrementAndGet(); - } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { - preDelete.incrementAndGet(); - return delete; - } - - @Override - public void postDelete(Engine.Delete delete) { - postDelete.incrementAndGet(); - } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { - postDeleteException.incrementAndGet(); + public void postOperation(Engine.Operation operation, Exception ex) { + switch (operation.operationType()) { + case INDEX: + postIndexException.incrementAndGet(); + break; + case DELETE: + postDeleteException.incrementAndGet(); + break; + } } }; IndexingOperationListener throwingListener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preOperation(Engine.Operation operation) { throw new RuntimeException(); } @Override - public void postIndex(Engine.Index index, boolean created) { - throw new RuntimeException(); } - - @Override - public void postIndex(Engine.Index index, Exception ex) { - throw new RuntimeException(); } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { + public void postOperation(Engine.Operation operation) { throw new RuntimeException(); } @Override - public void postDelete(Engine.Delete delete) { - throw new RuntimeException(); } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { + public void postOperation(Engine.Operation operation, Exception ex) { throw new RuntimeException(); } }; @@ -111,7 +103,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger); Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); - compositeListener.postDelete(delete); + compositeListener.postOperation(delete); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -119,7 +111,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(0, postDeleteException.get()); - compositeListener.postDelete(delete, new RuntimeException()); + compositeListener.postOperation(delete, new RuntimeException()); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -127,7 +119,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.preDelete(delete); + compositeListener.preOperation(delete); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -135,7 +127,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.postIndex(index, false); + compositeListener.postOperation(index); assertEquals(0, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -143,7 +135,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.postIndex(index, new RuntimeException()); + compositeListener.postOperation(index, new RuntimeException()); assertEquals(0, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(2, postIndexException.get()); @@ -151,7 +143,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.preIndex(index); + compositeListener.preOperation(index); assertEquals(2, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(2, postIndexException.get()); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java index 8da47f1eeaf..dd1250999ba 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java @@ -197,25 +197,14 @@ public class CancelTests extends ReindexTestCase { } public static class BlockingOperationListener implements IndexingOperationListener { - @Override - public Engine.Index preIndex(Engine.Index index) { - return preCheck(index, index.type()); - } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { - return preCheck(delete, delete.type()); - } - - private T preCheck(T operation, String type) { - if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) { - return operation; + public void preOperation(Engine.Operation operation) { + if ((TYPE.equals(operation.type()) == false) || (operation.origin() != Origin.PRIMARY)) { + return; } - try { if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) { - return operation; + return; } } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index fbb87d9f8d1..c520e3fbbfc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -451,7 +451,7 @@ public abstract class IndexShardTestCase extends ESTestCase { SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); } - shard.index(index); + shard.execute(index); return index; } @@ -462,7 +462,7 @@ public abstract class IndexShardTestCase extends ESTestCase { } else { delete = shard.prepareDeleteOnPrimary(type, id, 1, VersionType.EXTERNAL); } - shard.delete(delete); + shard.execute(delete); return delete; }