From b253af36f3bafe929fc0a91c8f94c330c78588c8 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 29 Jan 2020 12:08:18 +0100 Subject: [PATCH] The watcher indexing listener didn't handle document level exceptions. (#51466) Prior to the change the watcher index listener didn't implement the `postIndex(ShardId, Engine.Index, Engine.IndexResult)` method. This caused document level exceptions like VersionConflictEngineException to be ignored. This commit fixes this. The watcher indexing listener did implement the `postIndex(ShardId, Engine.Index, Exception)` method, but that only handles engine level exceptions. This change also unmutes the SmokeTestWatcherTestSuiteIT#testMonitorClusterHealth test again. Relates to #32299 --- .../watcher/WatcherIndexingListener.java | 15 +++++++- .../watcher/WatcherIndexingListenerTests.java | 38 ++++++++++++++++++- .../SmokeTestWatcherTestSuiteIT.java | 1 - 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java index 9e29c3110bc..ab03b7e4dd3 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java @@ -135,8 +135,19 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste } /** - * - * In case of an error, we have to ensure that the triggerservice does not leave anything behind + * In case of a document related failure (for example version conflict), then clean up resources for a watch + * in the trigger service. + */ + @Override + public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) { + if (result.getResultType() == Engine.Result.Type.FAILURE) { + assert result.getFailure() != null; + postIndex(shardId, index, result.getFailure()); + } + } + + /** + * In case of an engine related error, we have to ensure that the triggerservice does not leave anything behind * * TODO: If the configuration changes between preindex and postindex methods and we add a * watch, that could not be indexed diff --git a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java index ef3275c3f9a..5b1a5a196a1 100644 --- a/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java +++ b/x-pack/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/WatcherIndexingListenerTests.java @@ -45,6 +45,7 @@ import org.junit.Before; import java.io.IOException; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.BitSet; import java.util.Collections; import java.util.HashMap; @@ -200,7 +201,40 @@ public class WatcherIndexingListenerTests extends ESTestCase { assertThat(exc.getMessage(), containsString(id)); } - public void testPostIndexRemoveTriggerOnException() throws Exception { + public void testPostIndexRemoveTriggerOnDocumentRelatedException() throws Exception { + when(operation.id()).thenReturn("_id"); + when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE); + when(result.getFailure()).thenReturn(new RuntimeException()); + when(shardId.getIndexName()).thenReturn(Watch.INDEX); + + listener.postIndex(shardId, operation, result); + verify(triggerService).remove(eq("_id")); + } + + public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreOtherEngineResultTypes() throws Exception { + List types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values())); + types.remove(Engine.Result.Type.FAILURE); + + when(operation.id()).thenReturn("_id"); + when(result.getResultType()).thenReturn(randomFrom(types)); + when(result.getFailure()).thenReturn(new RuntimeException()); + when(shardId.getIndexName()).thenReturn(Watch.INDEX); + + listener.postIndex(shardId, operation, result); + verifyZeroInteractions(triggerService); + } + + public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreNonWatcherDocument() throws Exception { + when(operation.id()).thenReturn("_id"); + when(result.getResultType()).thenReturn(Engine.Result.Type.FAILURE); + when(result.getFailure()).thenReturn(new RuntimeException()); + when(shardId.getIndexName()).thenReturn(randomAlphaOfLength(4)); + + listener.postIndex(shardId, operation, result); + verifyZeroInteractions(triggerService); + } + + public void testPostIndexRemoveTriggerOnEngineLevelException() throws Exception { when(operation.id()).thenReturn("_id"); when(shardId.getIndexName()).thenReturn(Watch.INDEX); @@ -208,7 +242,7 @@ public class WatcherIndexingListenerTests extends ESTestCase { verify(triggerService).remove(eq("_id")); } - public void testPostIndexDontInvokeForOtherDocuments() throws Exception { + public void testPostIndexRemoveTriggerOnEngineLevelException_ignoreNonWatcherDocument() throws Exception { when(operation.id()).thenReturn("_id"); when(shardId.getIndexName()).thenReturn("anything"); when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS); diff --git a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java index e1d79858f35..1bae1e02d01 100644 --- a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java +++ b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java @@ -110,7 +110,6 @@ public class SmokeTestWatcherTestSuiteIT extends ESRestTestCase { return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", token).build(); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32299") public void testMonitorClusterHealth() throws Exception { final String watchId = "cluster_health_watch";