diff --git a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java index 54baf94e8d5..9e29c3110bc 100644 --- a/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java +++ b/x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/WatcherIndexingListener.java @@ -167,6 +167,7 @@ final class WatcherIndexingListener implements IndexingOperationListener, Cluste @Override public Engine.Delete preDelete(ShardId shardId, Engine.Delete delete) { if (isWatchDocument(shardId.getIndexName())) { + logger.debug("removing watch [{}] to trigger service via delete", delete.id()); triggerService.remove(delete.id()); } diff --git a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java index b64a20db78f..e1d79858f35 100644 --- a/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java +++ b/x-pack/qa/smoke-test-watcher/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherTestSuiteIT.java @@ -7,11 +7,13 @@ package org.elasticsearch.smoketest; import org.elasticsearch.client.Request; import org.elasticsearch.client.Response; +import org.elasticsearch.client.ResponseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.settings.SecureString; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.yaml.ObjectPath; import org.elasticsearch.xpack.test.rest.XPackRestTestConstants; @@ -110,7 +112,7 @@ public class SmokeTestWatcherTestSuiteIT extends ESRestTestCase { @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/32299") public void testMonitorClusterHealth() throws Exception { - String watchId = "cluster_health_watch"; + final String watchId = "cluster_health_watch"; // get master publish address Response clusterStateResponse = adminClient().performRequest(new Request("GET", "/_cluster/state")); @@ -124,7 +126,7 @@ public class SmokeTestWatcherTestSuiteIT extends ESRestTestCase { assertThat(address, is(notNullValue())); String[] splitAddress = address.split(":", 2); String host = splitAddress[0]; - int port = Integer.valueOf(splitAddress[1]); + int port = Integer.parseInt(splitAddress[1]); // put watch try (XContentBuilder builder = jsonBuilder()) { @@ -156,11 +158,17 @@ public class SmokeTestWatcherTestSuiteIT extends ESRestTestCase { // check watch history ObjectPath objectPath = getWatchHistoryEntry(watchId); - boolean conditionMet = objectPath.evaluate("hits.hits.0._source.result.condition.met"); - assertThat(conditionMet, is(true)); + Boolean conditionMet = objectPath.evaluate("hits.hits.0._source.result.condition.met"); + String historyEntriesAsString = Strings.toString(objectPath.toXContentBuilder(XContentType.JSON.xContent())); + assertThat("condition not met in response [" + historyEntriesAsString + "]", conditionMet, is(true)); deleteWatch(watchId); - assertWatchCount(0); + // Wrap inside an assertBusy(...), because watch may execute just after being deleted, + // This tries to re-add the watch which fails, because of version conflict, + // but for a moment the watch count from watcher stats api may be incorrect. + // (via WatcherIndexingListener#preIndex) + // The WatcherIndexingListener#postIndex() detects this version conflict and corrects the watch count. + assertBusy(() -> assertWatchCount(0)); } private void indexWatch(String watchId, XContentBuilder builder) throws Exception { @@ -182,16 +190,54 @@ public class SmokeTestWatcherTestSuiteIT extends ESRestTestCase { private ObjectPath getWatchHistoryEntry(String watchId) throws Exception { final AtomicReference objectPathReference = new AtomicReference<>(); assertBusy(() -> { - client().performRequest(new Request("POST", "/.watcher-history-*/_refresh")); + try { + client().performRequest(new Request("POST", "/.watcher-history-*/_refresh")); + } catch (ResponseException e) { + final String err = "Failed to perform refresh of watcher history"; + logger.error(err, e); + throw new AssertionError(err, e); + } try (XContentBuilder builder = jsonBuilder()) { builder.startObject(); - builder.startObject("query").startObject("bool").startArray("must"); - builder.startObject().startObject("term").startObject("watch_id").field("value", watchId).endObject().endObject() - .endObject(); - builder.endArray().endObject().endObject(); - builder.startArray("sort").startObject().startObject("trigger_event.triggered_time").field("order", "desc").endObject() - .endObject().endArray(); + { + builder.startObject("query"); + { + builder.startObject("bool"); + builder.startArray("must"); + builder.startObject(); + { + builder.startObject("term"); + builder.startObject("watch_id"); + builder.field("value", watchId); + builder.endObject(); + builder.endObject(); + } + builder.endObject(); + builder.startObject(); + { + builder.startObject("term"); + builder.startObject("state"); + builder.field("value", "executed"); + builder.endObject(); + builder.endObject(); + } + builder.endObject(); + builder.endArray(); + builder.endObject(); + } + builder.endObject(); + builder.startArray("sort"); + builder.startObject(); + { + + builder.startObject("result.execution_time"); + builder.field("order", "desc"); + builder.endObject(); + } + builder.endObject(); + builder.endArray(); + } builder.endObject(); Request searchRequest = new Request("POST", "/.watcher-history-*/_search"); @@ -204,6 +250,10 @@ public class SmokeTestWatcherTestSuiteIT extends ESRestTestCase { String watchid = objectPath.evaluate("hits.hits.0._source.watch_id"); assertThat(watchid, is(watchId)); objectPathReference.set(objectPath); + } catch (ResponseException e) { + final String err = "Failed to perform search of watcher history"; + logger.error(err, e); + throw new AssertionError(err, e); } }); return objectPathReference.get();