From bb785483ae41e30a756a17f3fb6cdb5d06a1ad6c Mon Sep 17 00:00:00 2001 From: Areek Zillur Date: Tue, 25 Oct 2016 09:33:04 -0400 Subject: [PATCH] cleanup indexing operation listener --- .../elasticsearch/index/shard/IndexShard.java | 12 ++--- .../shard/IndexingOperationListener.java | 47 +++++-------------- .../index/shard/InternalIndexingStats.java | 7 +-- .../elasticsearch/index/IndexModuleTests.java | 3 +- .../index/shard/IndexShardTests.java | 12 ++--- .../shard/IndexingOperationListenerTests.java | 10 ++-- .../index/reindex/CancelTests.java | 14 +++--- 7 files changed, 37 insertions(+), 68 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 442adc98196..9413fb5bcc6 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -536,16 +536,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } - public Engine.IndexResult index(Engine.Index index) { + public Engine.IndexResult index(final Engine.Index index) { ensureWriteAllowed(index); Engine engine = getEngine(); return index(engine, index); } - private Engine.IndexResult index(Engine engine, Engine.Index index) { + private Engine.IndexResult index(final Engine engine, final Engine.Index index) { active.set(true); final Engine.IndexResult result; - index = indexingOperationListeners.preIndex(index); + indexingOperationListeners.preIndex(index); try { if (logger.isTraceEnabled()) { logger.trace("index [{}][{}]{}", index.type(), index.id(), index.docs()); @@ -585,16 +585,16 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl return new Engine.Delete(type, id, uid, version, versionType, origin, startTime); } - public Engine.DeleteResult delete(Engine.Delete delete) { + public Engine.DeleteResult delete(final Engine.Delete delete) { ensureWriteAllowed(delete); Engine engine = getEngine(); return delete(engine, delete); } - private Engine.DeleteResult delete(Engine engine, Engine.Delete delete) { + private Engine.DeleteResult delete(final Engine engine, final Engine.Delete delete) { active.set(true); final Engine.DeleteResult result; - delete = indexingOperationListeners.preDelete(delete); + indexingOperationListeners.preDelete(delete); try { if (logger.isTraceEnabled()) { logger.trace("delete [{}]", delete.uid().text()); diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java index 0e605954248..9c870859650 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexingOperationListener.java @@ -30,44 +30,25 @@ import java.util.List; */ public interface IndexingOperationListener { - /** - * Called before the indexing occurs. - */ - default Engine.Index preIndex(Engine.Index operation) { - return operation; - } + /** Called before the indexing occurs */ + default void preIndex(Engine.Index operation) {} - /** - * Called after the indexing operation occurred. - */ + /** Called after the indexing operation occurred */ default void postIndex(Engine.Index index, Engine.IndexResult result) {} - /** - * Called after the indexing operation occurred with exception. - */ + /** Called after the indexing operation occurred with exception */ default void postIndex(Engine.Index index, Exception ex) {} - /** - * Called before the delete occurs. - */ - default Engine.Delete preDelete(Engine.Delete delete) { - return delete; - } + /** Called before the delete occurs */ + default void preDelete(Engine.Delete delete) {} - - /** - * Called after the delete operation occurred. - */ + /** Called after the delete operation occurred */ default void postDelete(Engine.Delete delete, Engine.DeleteResult result) {} - /** - * Called after the delete operation occurred with exception. - */ + /** Called after the delete operation occurred with exception */ default void postDelete(Engine.Delete delete, Exception ex) {} - /** - * A Composite listener that multiplexes calls to each of the listeners methods. - */ + /** A Composite listener that multiplexes calls to each of the listeners methods */ final class CompositeListener implements IndexingOperationListener{ private final List listeners; private final Logger logger; @@ -78,7 +59,7 @@ public interface IndexingOperationListener { } @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preIndex(Engine.Index operation) { assert operation != null; for (IndexingOperationListener listener : listeners) { try { @@ -87,12 +68,11 @@ public interface IndexingOperationListener { logger.warn((Supplier) () -> new ParameterizedMessage("preIndex listener [{}] failed", listener), e); } } - return operation; } @Override public void postIndex(Engine.Index index, Engine.IndexResult result) { - assert index != null; + assert index != null && result != null; for (IndexingOperationListener listener : listeners) { try { listener.postIndex(index, result); @@ -116,7 +96,7 @@ public interface IndexingOperationListener { } @Override - public Engine.Delete preDelete(Engine.Delete delete) { + public void preDelete(Engine.Delete delete) { assert delete != null; for (IndexingOperationListener listener : listeners) { try { @@ -125,12 +105,11 @@ public interface IndexingOperationListener { logger.warn((Supplier) () -> new ParameterizedMessage("preDelete listener [{}] failed", listener), e); } } - return delete; } @Override public void postDelete(Engine.Delete delete, Engine.DeleteResult result) { - assert delete != null; + assert delete != null && result != null; for (IndexingOperationListener listener : listeners) { try { listener.postDelete(delete, result); diff --git a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java index 39a415ca8eb..61cfe2fc4b5 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java +++ b/core/src/main/java/org/elasticsearch/index/shard/InternalIndexingStats.java @@ -65,12 +65,11 @@ final class InternalIndexingStats implements IndexingOperationListener { } @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preIndex(Engine.Index operation) { if (!operation.origin().isRecovery()) { totalStats.indexCurrent.inc(); typeStats(operation.type()).indexCurrent.inc(); } - return operation; } @Override @@ -96,13 +95,11 @@ final class InternalIndexingStats implements IndexingOperationListener { } @Override - public Engine.Delete preDelete(Engine.Delete delete) { + public void preDelete(Engine.Delete delete) { if (!delete.origin().isRecovery()) { totalStats.deleteCurrent.inc(); typeStats(delete.type()).deleteCurrent.inc(); } - return delete; - } @Override diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java index 5e9d1ffaf9e..8dcad1b8158 100644 --- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java +++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java @@ -233,9 +233,8 @@ public class IndexModuleTests extends ESTestCase { AtomicBoolean executed = new AtomicBoolean(false); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preIndex(Engine.Index operation) { executed.set(true); - return operation; } }; module.addIndexOperationListener(listener); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 2b417c10ae7..901dcc3b724 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -558,9 +558,8 @@ public class IndexShardTests extends IndexShardTestCase { shard.close("simon says", true); shard = reinitShard(shard, new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preIndex(Engine.Index operation) { preIndex.incrementAndGet(); - return operation; } @Override @@ -578,9 +577,8 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public Engine.Delete preDelete(Engine.Delete delete) { + public void preDelete(Engine.Delete delete) { preDelete.incrementAndGet(); - return delete; } @Override @@ -1121,9 +1119,8 @@ public class IndexShardTests extends IndexShardTestCase { final AtomicInteger postDelete = new AtomicInteger(); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preIndex(Engine.Index operation) { preIndex.incrementAndGet(); - return operation; } @Override @@ -1132,9 +1129,8 @@ public class IndexShardTests extends IndexShardTestCase { } @Override - public Engine.Delete preDelete(Engine.Delete delete) { + public void preDelete(Engine.Delete delete) { preDelete.incrementAndGet(); - return delete; } @Override diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java index 15b40e4e09c..d3cf1640baa 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexingOperationListenerTests.java @@ -40,9 +40,8 @@ public class IndexingOperationListenerTests extends ESTestCase{ AtomicInteger postDeleteException = new AtomicInteger(); IndexingOperationListener listener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preIndex(Engine.Index operation) { preIndex.incrementAndGet(); - return operation; } @Override @@ -56,9 +55,8 @@ public class IndexingOperationListenerTests extends ESTestCase{ } @Override - public Engine.Delete preDelete(Engine.Delete delete) { + public void preDelete(Engine.Delete delete) { preDelete.incrementAndGet(); - return delete; } @Override @@ -74,7 +72,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ IndexingOperationListener throwingListener = new IndexingOperationListener() { @Override - public Engine.Index preIndex(Engine.Index operation) { + public void preIndex(Engine.Index operation) { throw new RuntimeException(); } @@ -89,7 +87,7 @@ public class IndexingOperationListenerTests extends ESTestCase{ } @Override - public Engine.Delete preDelete(Engine.Delete delete) { + public void preDelete(Engine.Delete delete) { throw new RuntimeException(); } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java index 8da47f1eeaf..21b3a1aa7a6 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTests.java @@ -199,23 +199,23 @@ public class CancelTests extends ReindexTestCase { public static class BlockingOperationListener implements IndexingOperationListener { @Override - public Engine.Index preIndex(Engine.Index index) { - return preCheck(index, index.type()); + public void preIndex(Engine.Index index) { + preCheck(index, index.type()); } @Override - public Engine.Delete preDelete(Engine.Delete delete) { - return preCheck(delete, delete.type()); + public void preDelete(Engine.Delete delete) { + preCheck(delete, delete.type()); } - private T preCheck(T operation, String type) { + private void preCheck(Engine.Operation operation, String type) { if ((TYPE.equals(type) == false) || (operation.origin() != Origin.PRIMARY)) { - return operation; + return; } try { if (ALLOWED_OPERATIONS.tryAcquire(30, TimeUnit.SECONDS)) { - return operation; + return; } } catch (InterruptedException e) { throw new RuntimeException(e);