diff --git a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 3021361b23e..7401fc6f63c 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -57,7 +57,8 @@ import org.elasticsearch.transport.TransportService; import java.util.Map; -import static org.elasticsearch.action.delete.TransportDeleteAction.*; +import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnPrimary; +import static org.elasticsearch.action.delete.TransportDeleteAction.executeDeleteRequestOnReplica; import static org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary; import static org.elasticsearch.action.support.replication.ReplicationOperation.ignoreReplicaException; import static org.elasticsearch.action.support.replication.ReplicationOperation.isConflictException; diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 3cfb2930447..5b83f288b07 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -129,7 +129,7 @@ public class TransportDeleteAction extends TransportWriteAction executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) { Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType()); - primary.delete(delete); + primary.execute(delete); if (delete.hasFailure()) { return new PrimaryOperationResult<>(delete.getFailure()); } else { @@ -145,7 +145,7 @@ public class TransportDeleteAction extends TransportWriteAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, + public static PrimaryOperationResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard primary, MappingUpdatedAction mappingUpdatedAction) throws Exception { Engine.Index operation; try { - operation = prepareIndexOperationOnPrimary(request, indexShard); + operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { return new PrimaryOperationResult<>(e); } Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); - final ShardId shardId = indexShard.shardId(); + final ShardId shardId = primary.shardId(); if (update != null) { try { // can throw timeout exception when updating mappings or ISE for attempting to update default mappings // which are bubbled up mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), request.type(), update); - operation = prepareIndexOperationOnPrimary(request, indexShard); + operation = prepareIndexOperationOnPrimary(request, primary); } catch (MapperParsingException | IllegalArgumentException e) { return new PrimaryOperationResult<>(e); } @@ -208,7 +208,7 @@ public class TransportIndexAction extends TransportWriteAction(operation.getFailure()); } else { diff --git a/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java index 513e87878d6..d2ebc4e78de 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java +++ b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java @@ -133,23 +133,20 @@ public final class IndexingSlowLog implements IndexingOperationListener { this.reformat = reformat; } - @Override - public void postIndex(Engine.Index index, boolean created) { - final long took = index.endTime() - index.startTime(); - postIndexing(index.parsedDoc(), took); - } - - - private void postIndexing(ParsedDocument doc, long tookInNanos) { - if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) { - indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) { - indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) { - indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); - } else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) { - indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + public void postOperation(Engine.Operation operation) { + if (operation.operationType() == Engine.Operation.TYPE.INDEX) { + final long tookInNanos = operation.endTime() - operation.startTime(); + ParsedDocument doc = ((Engine.Index) operation).parsedDoc(); + if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) { + indexLogger.warn("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) { + indexLogger.info("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) { + indexLogger.debug("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) { + indexLogger.trace("{}", new SlowLogParsedDocumentPrinter(index, doc, tookInNanos, reformat, maxSourceCharsToLog)); + } } } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 8b4150dc1b4..cfb2532280e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -880,11 +880,13 @@ public abstract class Engine implements Closeable { return this.endTime; } - abstract String type(); + public abstract String type(); abstract String id(); - abstract TYPE operationType(); + public abstract TYPE operationType(); + + public abstract String toString(); } public static class Index extends Operation { @@ -925,7 +927,7 @@ public abstract class Engine implements Closeable { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.INDEX; } @@ -989,6 +991,10 @@ public abstract class Engine implements Closeable { return isRetry; } + @Override + public String toString() { + return "index [{" + type() + "}][{" + id()+ "}] [{" + docs() + "}]"; + } } public static class Delete extends Operation { @@ -1023,10 +1029,15 @@ public abstract class Engine implements Closeable { } @Override - TYPE operationType() { + public TYPE operationType() { return TYPE.DELETE; } + @Override + public String toString() { + return "delete [{"+ uid().text() +"}]"; + } + public void updateVersion(long version, boolean found) { updateVersion(version); this.found = found; diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 0eb8d1b076e..adc1fa876cb 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -536,29 +536,36 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } - public void index(Engine.Index index) { - ensureWriteAllowed(index); + public void execute(Engine.Operation operation) { + ensureWriteAllowed(operation); Engine engine = getEngine(); - index(engine, index); + execute(engine, operation); } - private void index(Engine engine, Engine.Index index) { + private void execute(Engine engine, Engine.Operation operation) { active.set(true); - index = indexingOperationListeners.preIndex(index); + indexingOperationListeners.preOperation(operation); try { if (logger.isTraceEnabled()) { - logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs()); + logger.trace(operation.toString()); } - engine.index(index); - index.endTime(System.nanoTime()); + switch (operation.operationType()) { + case INDEX: + engine.index(((Engine.Index) operation)); + break; + case DELETE: + engine.delete(((Engine.Delete) operation)); + break; + } + operation.endTime(System.nanoTime()); } catch (Exception e) { - indexingOperationListeners.postIndex(index, e); + indexingOperationListeners.postOperation(operation, e); throw e; } - if (index.hasFailure()) { - indexingOperationListeners.postIndex(index, index.getFailure()); + if (operation.hasFailure()) { + indexingOperationListeners.postOperation(operation, operation.getFailure()); } else { - indexingOperationListeners.postIndex(index, index.isCreated()); + indexingOperationListeners.postOperation(operation); } } @@ -584,32 +591,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Delete(type, id, uid, version, versionType, origin, startTime, false); } - public void delete(Engine.Delete delete) { - ensureWriteAllowed(delete); - Engine engine = getEngine(); - delete(engine, delete); - } - - private void delete(Engine engine, Engine.Delete delete) { - active.set(true); - delete = indexingOperationListeners.preDelete(delete); - try { - if (logger.isTraceEnabled()) { - logger.trace("delete [{}]", delete.uid().text()); - } - engine.delete(delete); - delete.endTime(System.nanoTime()); - } catch (Exception e) { - indexingOperationListeners.postDelete(delete, e); - throw e; - } - if (delete.hasFailure()) { - indexingOperationListeners.postDelete(delete, delete.getFailure()); - } else { - indexingOperationListeners.postDelete(delete); - } - } - public Engine.GetResult get(Engine.Get get) { readAllowed(); return getEngine().get(get, this::acquireSearcher); @@ -1841,12 +1822,12 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl @Override protected void index(Engine engine, Engine.Index engineIndex) { - IndexShard.this.index(engine, engineIndex); + IndexShard.this.execute(engine, engineIndex); } @Override protected void delete(Engine engine, Engine.Delete engineDelete) { - IndexShard.this.delete(engine, engineDelete); + IndexShard.this.execute(engine, engineDelete); } } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java index 042ddec924e..ec0fab2629f 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java @@ -25,49 +25,19 @@ import org.elasticsearch.index.engine.Engine; import java.util.List; -/** - * An indexing listener for indexing, delete, events. - */ +/** An engine operation listener for index and delete execution. */ public interface IndexingOperationListener { - /** - * Called before the indexing occurs. - */ - default Engine.Index preIndex(Engine.Index operation) { - return operation; - } + /** Called before executing index or delete operation */ + default void preOperation(Engine.Operation operation) {} - /** - * Called after the indexing operation occurred. - */ - default void postIndex(Engine.Index index, boolean created) {} + /** Called after executing index or delete operation */ + default void postOperation(Engine.Operation operation) {} - /** - * Called after the indexing operation occurred with exception. - */ - default void postIndex(Engine.Index index, Exception ex) {} + /** Called after index or delete operation failed with exception */ + default void postOperation(Engine.Operation operation, Exception ex) {} - /** - * Called before the delete occurs. - */ - default Engine.Delete preDelete(Engine.Delete delete) { - return delete; - } - - - /** - * Called after the delete operation occurred. - */ - default void postDelete(Engine.Delete delete) {} - - /** - * Called after the delete operation occurred with exception. - */ - default void postDelete(Engine.Delete delete, Exception ex) {} - - /** - * A Composite listener that multiplexes calls to each of the listeners methods. - */ + /** A Composite listener that multiplexes calls to each of the listeners methods. */ final class CompositeListener implements IndexingOperationListener{ private final List listeners; private final Logger logger; @@ -78,79 +48,40 @@ public interface IndexingOperationListener { } @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preOperation(Engine.Operation operation) { assert operation != null; for (IndexingOperationListener listener : listeners) { try { - listener.preIndex(operation); + listener.preOperation(operation); } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e); - } - } - return operation; - } - - @Override - public void postIndex(Engine.Index index, boolean created) { - assert index != null; - for (IndexingOperationListener listener : listeners) { - try { - listener.postIndex(index, created); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), e); + logger.warn((Supplier) () -> new ParameterizedMessage("preOperation listener [{}] failed", listener), e); } } } @Override - public void postIndex(Engine.Index index, Exception ex) { - assert index != null && ex != null; + public void postOperation(Engine.Operation operation) { + assert operation != null; for (IndexingOperationListener listener : listeners) { try { - listener.postIndex(index, ex); + listener.postOperation(operation); + } catch (Exception e) { + logger.warn((Supplier) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), e); + } + } + } + + @Override + public void postOperation(Engine.Operation operation, Exception ex) { + assert operation != null && ex != null; + for (IndexingOperationListener listener : listeners) { + try { + listener.postOperation(operation, ex); } catch (Exception inner) { inner.addSuppressed(ex); - logger.warn((Supplier) () -> new ParameterizedMessage("postIndex listener [{}] failed", listener), inner); - } - } - } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { - assert delete != null; - for (IndexingOperationListener listener : listeners) { - try { - listener.preDelete(delete); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e); - } - } - return delete; - } - - @Override - public void postDelete(Engine.Delete delete) { - assert delete != null; - for (IndexingOperationListener listener : listeners) { - try { - listener.postDelete(delete); - } catch (Exception e) { - logger.warn((Supplier) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), e); - } - } - } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { - assert delete != null && ex != null; - for (IndexingOperationListener listener : listeners) { - try { - listener.postDelete(delete, ex); - } catch (Exception inner) { - inner.addSuppressed(ex); - logger.warn((Supplier) () -> new ParameterizedMessage("postDelete listener [{}] failed", listener), inner); + logger.warn((Supplier) () -> new ParameterizedMessage("postOperation listener [{}] failed", listener), inner); } } } } -} +} \ No newline at end of file diff --git a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java index f62b8f7fe3c..cd1b1526e0c 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java +++ b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -65,63 +65,60 @@ final class InternalIndexingStats implements IndexingOperationListener { } @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preOperation(Engine.Operation operation) { if (!operation.origin().isRecovery()) { - totalStats.indexCurrent.inc(); - typeStats(operation.type()).indexCurrent.inc(); - } - return operation; - } - - @Override - public void postIndex(Engine.Index index, boolean created) { - if (!index.origin().isRecovery()) { - long took = index.endTime() - index.startTime(); - totalStats.indexMetric.inc(took); - totalStats.indexCurrent.dec(); - StatsHolder typeStats = typeStats(index.type()); - typeStats.indexMetric.inc(took); - typeStats.indexCurrent.dec(); + StatsHolder statsHolder = typeStats(operation.type()); + switch (operation.operationType()) { + case INDEX: + totalStats.indexCurrent.inc(); + statsHolder.indexCurrent.inc(); + break; + case DELETE: + totalStats.deleteCurrent.inc(); + statsHolder.deleteCurrent.inc(); + break; + } } } @Override - public void postIndex(Engine.Index index, Exception ex) { - if (!index.origin().isRecovery()) { - totalStats.indexCurrent.dec(); - typeStats(index.type()).indexCurrent.dec(); - totalStats.indexFailed.inc(); - typeStats(index.type()).indexFailed.inc(); + public void postOperation(Engine.Operation operation) { + if (!operation.origin().isRecovery()) { + long took = operation.endTime() - operation.startTime(); + StatsHolder typeStats = typeStats(operation.type()); + switch (operation.operationType()) { + case INDEX: + totalStats.indexMetric.inc(took); + totalStats.indexCurrent.dec(); + typeStats.indexMetric.inc(took); + typeStats.indexCurrent.dec(); + break; + case DELETE: + totalStats.deleteMetric.inc(took); + totalStats.deleteCurrent.dec(); + typeStats.deleteMetric.inc(took); + typeStats.deleteCurrent.dec(); + break; + } } } @Override - public Engine.Delete preDelete(Engine.Delete delete) { - if (!delete.origin().isRecovery()) { - totalStats.deleteCurrent.inc(); - typeStats(delete.type()).deleteCurrent.inc(); - } - return delete; - - } - - @Override - public void postDelete(Engine.Delete delete) { - if (!delete.origin().isRecovery()) { - long took = delete.endTime() - delete.startTime(); - totalStats.deleteMetric.inc(took); - totalStats.deleteCurrent.dec(); - StatsHolder typeStats = typeStats(delete.type()); - typeStats.deleteMetric.inc(took); - typeStats.deleteCurrent.dec(); - } - } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { - if (!delete.origin().isRecovery()) { - totalStats.deleteCurrent.dec(); - typeStats(delete.type()).deleteCurrent.dec(); + public void postOperation(Engine.Operation operation, Exception ex) { + if (!operation.origin().isRecovery()) { + StatsHolder statsHolder = typeStats(operation.type()); + switch (operation.operationType()) { + case INDEX: + totalStats.indexCurrent.dec(); + statsHolder.indexCurrent.dec(); + totalStats.indexFailed.inc(); + statsHolder.indexFailed.inc(); + break; + case DELETE: + totalStats.deleteCurrent.dec(); + statsHolder.deleteCurrent.dec(); + break; + } } } @@ -158,10 +155,5 @@ final class InternalIndexingStats implements IndexingOperationListener { deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count(), noopUpdates.count(), isThrottled, TimeUnit.MILLISECONDS.toMillis(currentThrottleMillis)); } - - void clear() { - indexMetric.clear(); - deleteMetric.clear(); - } } } diff --git a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index 3b4258a8bdf..da5a9b7c28e 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -189,11 +189,6 @@ public class IndexingMemoryController extends AbstractComponent implements Index statusChecker.run(); } - /** called by IndexShard to record that this many bytes were written to translog */ - public void bytesWritten(int bytes) { - statusChecker.bytesWritten(bytes); - } - /** Asks this shard to throttle indexing to one thread */ protected void activateThrottling(IndexShard shard) { shard.activateThrottling(); @@ -205,17 +200,8 @@ public class IndexingMemoryController extends AbstractComponent implements Index } @Override - public void postIndex(Engine.Index index, boolean created) { - recordOperationBytes(index); - } - - @Override - public void postDelete(Engine.Delete delete) { - recordOperationBytes(delete); - } - - private void recordOperationBytes(Engine.Operation op) { - bytesWritten(op.sizeInBytes()); + public void postOperation(Engine.Operation operation) { + statusChecker.bytesWritten(operation.sizeInBytes()); } private static final class ShardAndBytesUsed implements Comparable { diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 5e9d1ffaf9e..1cf56c50234 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -233,9 +233,10 @@ public class IndexModuleTests extends ESTestCase { AtomicBoolean executed = new AtomicBoolean(false); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { - executed.set(true); - return operation; + public void preOperation(Engine.Operation operation) { + if (operation.operationType() == Engine.Operation.TYPE.INDEX) { + executed.set(true); + } } }; module.addIndexOperationListener(listener); @@ -251,7 +252,7 @@ public class IndexModuleTests extends ESTestCase { Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); for (IndexingOperationListener l : indexService.getIndexOperationListeners()) { - l.preIndex(index); + l.preOperation(index); } assertTrue(executed.get()); indexService.close("simon says", false); diff --git a/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java b/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java index 846d2c56669..7313b7ec9bf 100644 --- a/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java +++ b/core/src/test/java/org/elasticsearch/index/mapper/TextFieldMapperTests.java @@ -219,7 +219,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase { assertEquals("b", fields[1].stringValue()); IndexShard shard = indexService.getShard(0); - shard.index(new Engine.Index(new Term("_uid", "1"), doc)); + shard.execute(new Engine.Index(new Term("_uid", "1"), doc)); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); @@ -258,7 +258,7 @@ public class TextFieldMapperTests extends ESSingleNodeTestCase { assertEquals("b", fields[1].stringValue()); IndexShard shard = indexService.getShard(0); - shard.index(new Engine.Index(new Term("_uid", "1"), doc)); + shard.execute(new Engine.Index(new Term("_uid", "1"), doc)); shard.refresh("test"); try (Engine.Searcher searcher = shard.acquireSearcher("test")) { LeafReader leaf = searcher.getDirectoryReader().leaves().get(0).reader(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java index 2248ff156ac..900bea75724 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java @@ -321,7 +321,7 @@ public class IndexShardIT extends ESSingleNodeTestCase { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); - shard.index(index); + shard.execute(index); assertTrue(shard.shouldFlush()); assertEquals(2, shard.getEngine().getTranslog().totalOperations()); client().prepareIndex("test", "test", "2").setSource("{}").setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get(); @@ -406,23 +406,8 @@ public class IndexShardIT extends ESSingleNodeTestCase { AtomicReference shardRef = new AtomicReference<>(); List failures = new ArrayList<>(); IndexingOperationListener listener = new IndexingOperationListener() { - @Override - public void postIndex(Engine.Index index, boolean created) { - try { - assertNotNull(shardRef.get()); - // this is all IMC needs to do - check current memory and refresh - assertTrue(shardRef.get().getIndexBufferRAMBytesUsed() > 0); - shardRef.get().refresh("test"); - } catch (Exception e) { - failures.add(e); - throw e; - } - } - - - @Override - public void postDelete(Engine.Delete delete) { + public void postOperation(Engine.Operation operation) { try { assertNotNull(shardRef.get()); // this is all IMC needs to do - check current memory and refresh diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index b531aae0532..99ab4e03a9c 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -560,40 +560,43 @@ public class IndexShardTests extends IndexShardTestCase { shard.close("simon says", true); shard = reinitShard(shard, new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { - preIndex.incrementAndGet(); - return operation; - } - - @Override - public void postIndex(Engine.Index index, boolean created) { - if (created) { - postIndexCreate.incrementAndGet(); - } else { - postIndexUpdate.incrementAndGet(); + public void preOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + preIndex.incrementAndGet(); + break; + case DELETE: + preDelete.incrementAndGet(); + break; } } @Override - public void postIndex(Engine.Index index, Exception ex) { - postIndexException.incrementAndGet(); + public void postOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + if (((Engine.Index) operation).isCreated()) { + postIndexCreate.incrementAndGet(); + } else { + postIndexUpdate.incrementAndGet(); + } + break; + case DELETE: + postDelete.incrementAndGet(); + break; + } } @Override - public Engine.Delete preDelete(Engine.Delete delete) { - preDelete.incrementAndGet(); - return delete; - } - - @Override - public void postDelete(Engine.Delete delete) { - postDelete.incrementAndGet(); - } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { - postDeleteException.incrementAndGet(); - + public void postOperation(Engine.Operation operation, Exception ex) { + switch (operation.operationType()) { + case INDEX: + postIndexException.incrementAndGet(); + break; + case DELETE: + postDeleteException.incrementAndGet(); + break; + } } }); recoveryShardFromStore(shard); @@ -601,7 +604,7 @@ public class IndexShardTests extends IndexShardTestCase { ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, new ParseContext.Document(), new BytesArray(new byte[]{1}), null); Engine.Index index = new Engine.Index(new Term("_uid", "1"), doc); - shard.index(index); + shard.execute(index); assertEquals(1, preIndex.get()); assertEquals(1, postIndexCreate.get()); assertEquals(0, postIndexUpdate.get()); @@ -610,7 +613,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, postDelete.get()); assertEquals(0, postDeleteException.get()); - shard.index(index); + shard.execute(index); assertEquals(2, preIndex.get()); assertEquals(1, postIndexCreate.get()); assertEquals(1, postIndexUpdate.get()); @@ -620,7 +623,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(0, postDeleteException.get()); Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); - shard.delete(delete); + shard.execute(delete); assertEquals(2, preIndex.get()); assertEquals(1, postIndexCreate.get()); @@ -634,7 +637,7 @@ public class IndexShardTests extends IndexShardTestCase { shard.state = IndexShardState.STARTED; // It will generate exception try { - shard.index(index); + shard.execute(index); fail(); } catch (IllegalIndexShardStateException e) { @@ -648,7 +651,7 @@ public class IndexShardTests extends IndexShardTestCase { assertEquals(1, postDelete.get()); assertEquals(0, postDeleteException.get()); try { - shard.delete(delete); + shard.execute(delete); fail(); } catch (IllegalIndexShardStateException e) { @@ -1123,26 +1126,27 @@ public class IndexShardTests extends IndexShardTestCase { final AtomicInteger postDelete = new AtomicInteger(); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { - preIndex.incrementAndGet(); - return operation; + public void preOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + preIndex.incrementAndGet(); + break; + case DELETE: + preDelete.incrementAndGet(); + break; + } } @Override - public void postIndex(Engine.Index index, boolean created) { - postIndex.incrementAndGet(); - } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { - preDelete.incrementAndGet(); - return delete; - } - - @Override - public void postDelete(Engine.Delete delete) { - postDelete.incrementAndGet(); - + public void postOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + postIndex.incrementAndGet(); + break; + case DELETE: + postDelete.incrementAndGet(); + break; + } } }; final IndexShard newShard = reinitShard(shard, listener); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java index d1cf8b32f58..bb652c6630b 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java @@ -40,63 +40,55 @@ public class IndexingOperationListenerTests extends ESTestCase{ AtomicInteger postDeleteException = new AtomicInteger(); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { - preIndex.incrementAndGet(); - return operation; + public void preOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + preIndex.incrementAndGet(); + break; + case DELETE: + preDelete.incrementAndGet(); + break; + } } @Override - public void postIndex(Engine.Index index, boolean created) { - postIndex.incrementAndGet(); + public void postOperation(Engine.Operation operation) { + switch (operation.operationType()) { + case INDEX: + postIndex.incrementAndGet(); + break; + case DELETE: + postDelete.incrementAndGet(); + break; + } } @Override - public void postIndex(Engine.Index index, Exception ex) { - postIndexException.incrementAndGet(); - } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { - preDelete.incrementAndGet(); - return delete; - } - - @Override - public void postDelete(Engine.Delete delete) { - postDelete.incrementAndGet(); - } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { - postDeleteException.incrementAndGet(); + public void postOperation(Engine.Operation operation, Exception ex) { + switch (operation.operationType()) { + case INDEX: + postIndexException.incrementAndGet(); + break; + case DELETE: + postDeleteException.incrementAndGet(); + break; + } } }; IndexingOperationListener throwingListener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preOperation(Engine.Operation operation) { throw new RuntimeException(); } @Override - public void postIndex(Engine.Index index, boolean created) { - throw new RuntimeException(); } - - @Override - public void postIndex(Engine.Index index, Exception ex) { - throw new RuntimeException(); } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { + public void postOperation(Engine.Operation operation) { throw new RuntimeException(); } @Override - public void postDelete(Engine.Delete delete) { - throw new RuntimeException(); } - - @Override - public void postDelete(Engine.Delete delete, Exception ex) { + public void postOperation(Engine.Operation operation, Exception ex) { throw new RuntimeException(); } }; @@ -111,7 +103,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ IndexingOperationListener.CompositeListener compositeListener = new IndexingOperationListener.CompositeListener(indexingOperationListeners, logger); Engine.Delete delete = new Engine.Delete("test", "1", new Term("_uid", "1")); Engine.Index index = new Engine.Index(new Term("_uid", "1"), null); - compositeListener.postDelete(delete); + compositeListener.postOperation(delete); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -119,7 +111,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(0, postDeleteException.get()); - compositeListener.postDelete(delete, new RuntimeException()); + compositeListener.postOperation(delete, new RuntimeException()); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -127,7 +119,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.preDelete(delete); + compositeListener.preOperation(delete); assertEquals(0, preIndex.get()); assertEquals(0, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -135,7 +127,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.postIndex(index, false); + compositeListener.postOperation(index); assertEquals(0, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(0, postIndexException.get()); @@ -143,7 +135,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.postIndex(index, new RuntimeException()); + compositeListener.postOperation(index, new RuntimeException()); assertEquals(0, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(2, postIndexException.get()); @@ -151,7 +143,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ assertEquals(2, postDelete.get()); assertEquals(2, postDeleteException.get()); - compositeListener.preIndex(index); + compositeListener.preOperation(index); assertEquals(2, preIndex.get()); assertEquals(2, postIndex.get()); assertEquals(2, postIndexException.get()); diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java index 8da47f1eeaf..dd1250999ba 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java @@ -197,25 +197,14 @@ public class CancelTests extends ReindexTestCase { } public static class BlockingOperationListener implements IndexingOperationListener { - @Override - public Engine.Index preIndex(Engine.Index index) { - return preCheck(index, index.type()); - } - - @Override - public Engine.Delete preDelete(Engine.Delete delete) { - return preCheck(delete, delete.type()); - } - - private T preCheck(T operation, String type) { - if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) { - return operation; + public void preOperation(Engine.Operation operation) { + if ((TYPE.equals(operation.type()) == false) || (operation.origin() != Origin.PRIMARY)) { + return; } - try { if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) { - return operation; + return; } } catch (InterruptedException e) { throw new RuntimeException(e); diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index fbb87d9f8d1..c520e3fbbfc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -451,7 +451,7 @@ public abstract class IndexShardTestCase extends ESTestCase { SourceToParse.source(SourceToParse.Origin.PRIMARY, shard.shardId().getIndexName(), type, id, new BytesArray(source)), 1, VersionType.EXTERNAL, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, false); } - shard.index(index); + shard.execute(index); return index; } @@ -462,7 +462,7 @@ public abstract class IndexShardTestCase extends ESTestCase { } else { delete = shard.prepareDeleteOnPrimary(type, id, 1, VersionType.EXTERNAL); } - shard.delete(delete); + shard.execute(delete); return delete; }