From f709fcd0833b80e276a2ad92e42c1dff71f8c27d Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Thu, 5 Apr 2018 09:33:28 +0200 Subject: [PATCH] Watcher: Refactor/Fix TransportWatcherServiceAction (elastic/x-pack-elasticsearch#4240) This commit fixes several issues with the current implementation of starting & stopping watcher 1. The WatcherServiceResponse was always returning a message, that the request was acknowledged, completely independent from the fact if it was or not. 2. A new cluster state instance was always returned, regardless if the state had changed or not (which is explicitely mentioned in the javadocs to check for this) 3. The AckedClusterStateUpdateTask now returns a proper WatcherServiceResponse 4. A failure now gets logged Relates elastic/x-pack-elasticsearch#4225 (this is just a hunch for now) Original commit: elastic/x-pack-elasticsearch@f4c1749f95a574dee3211c59250df3d206256b3c --- .../smoketest/XDocsClientYamlTestSuiteIT.java | 24 +++++-- .../TransportWatcherServiceAction.java | 37 ++++++---- .../AbstractWatcherIntegrationTestCase.java | 1 + .../build.gradle | 6 -- ...cherWithSecurityClientYamlTestSuiteIT.java | 68 +++++++++++++------ 5 files changed, 89 insertions(+), 47 deletions(-) diff --git a/docs/src/test/java/org/elasticsearch/smoketest/XDocsClientYamlTestSuiteIT.java b/docs/src/test/java/org/elasticsearch/smoketest/XDocsClientYamlTestSuiteIT.java index 612bcf20c50..269c12030cf 100644 --- a/docs/src/test/java/org/elasticsearch/smoketest/XDocsClientYamlTestSuiteIT.java +++ b/docs/src/test/java/org/elasticsearch/smoketest/XDocsClientYamlTestSuiteIT.java @@ -5,8 +5,8 @@ */ package org.elasticsearch.smoketest; -import org.apache.http.HttpHost; import com.carrotsearch.randomizedtesting.annotations.Name; +import org.apache.http.HttpHost; import org.elasticsearch.Version; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.settings.SecureString; @@ -78,10 +78,24 @@ public class XDocsClientYamlTestSuiteIT extends XPackRestIT { ClientYamlTestResponse response = getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap()); String state = (String) response.evaluate("stats.0.watcher_state"); - if (state.equals("started") == false || state.equals("starting") == false) { - getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap()); + + switch (state) { + case "stopped": + ClientYamlTestResponse startResponse = + getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap()); + boolean isAcknowledged = (boolean) startResponse.evaluate("acknowledged"); + assertThat(isAcknowledged, is(true)); + break; + case "stopping": + throw new AssertionError("waiting until stopping state reached stopped state to start again"); + case "starting": + throw new AssertionError("waiting until starting state reached started state"); + case "started": + // all good here, we are done + break; + default: + throw new AssertionError("unknown state[" + state + "]"); } - assertThat(state, is("started")); }); } } @@ -89,7 +103,7 @@ public class XDocsClientYamlTestSuiteIT extends XPackRestIT { @Override protected boolean isWatcherTest() { String testName = getTestName(); - return testName != null && testName.contains("watcher"); + return testName != null && testName.contains("watcher/"); } @Override diff --git a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java index 14510c9097d..fa78208494f 100644 --- a/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java +++ b/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/transport/actions/service/TransportWatcherServiceAction.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.watcher.transport.actions.service; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -61,7 +62,7 @@ public class TransportWatcherServiceAction extends TransportMasterNodeAction listener) throws Exception { + ActionListener listener) { switch (request.getCommand()) { case STOP: setWatcherMetaDataAndWait(true, listener); @@ -72,30 +73,38 @@ public class TransportWatcherServiceAction extends TransportMasterNodeAction listener) { + private void setWatcherMetaDataAndWait(boolean manuallyStopped, final ActionListener listener) { String source = manuallyStopped ? "update_watcher_manually_stopped" : "update_watcher_manually_started"; clusterService.submitStateUpdateTask(source, - new AckedClusterStateUpdateTask(ackedRequest, - ActionListener.wrap(b -> listener.onResponse(new WatcherServiceResponse(true)), listener::onFailure)) { + new AckedClusterStateUpdateTask(ackedRequest, listener) { @Override - protected Boolean newResponse(boolean result) { - return result; + protected WatcherServiceResponse newResponse(boolean acknowledged) { + return new WatcherServiceResponse(acknowledged); } @Override - public ClusterState execute(ClusterState clusterState) throws Exception { - ClusterState.Builder builder = new ClusterState.Builder(clusterState); - builder.metaData(MetaData.builder(clusterState.getMetaData()) - .putCustom(WatcherMetaData.TYPE, new WatcherMetaData(manuallyStopped))); - return builder.build(); + public ClusterState execute(ClusterState clusterState) { + WatcherMetaData newWatcherMetaData = new WatcherMetaData(manuallyStopped); + WatcherMetaData currentMetaData = clusterState.metaData().custom(WatcherMetaData.TYPE); + + // adhere to the contract of returning the original state if nothing has changed + if (newWatcherMetaData.equals(currentMetaData)) { + return clusterState; + } else { + ClusterState.Builder builder = new ClusterState.Builder(clusterState); + builder.metaData(MetaData.builder(clusterState.getMetaData()) + .putCustom(WatcherMetaData.TYPE, newWatcherMetaData)); + return builder.build(); + } } @Override - public void onFailure(String source, Exception throwable) { - listener.onFailure(throwable); + public void onFailure(String source, Exception e) { + logger.error(new ParameterizedMessage("could not update watcher stopped status to [{}], source [{}]", + manuallyStopped, source), e); + listener.onFailure(e); } }); } diff --git a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java index 2a59d993fcd..c6d73b03508 100644 --- a/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java +++ b/plugin/watcher/src/test/java/org/elasticsearch/xpack/watcher/test/AbstractWatcherIntegrationTestCase.java @@ -459,6 +459,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase String message = String.format(Locale.ROOT, "Expected watcher to be started, but state was %s", currentStatesFromStatsRequest); assertThat(message, states, everyItem(is(WatcherState.STARTED))); + assertThat(watcherStatsResponse.watcherMetaData().manuallyStopped(), is(false)); }); } diff --git a/qa/smoke-test-watcher-with-security/build.gradle b/qa/smoke-test-watcher-with-security/build.gradle index f00bc805064..3ad8c117337 100644 --- a/qa/smoke-test-watcher-with-security/build.gradle +++ b/qa/smoke-test-watcher-with-security/build.gradle @@ -12,12 +12,6 @@ task copyWatcherRestTests(type: Copy) { include 'rest-api-spec/test/watcher/**' } -integTestRunner { - systemProperty 'tests.rest.blacklist', - ['hijack/10_basic/*', - 'getting_started/10_monitor_cluster_health/Getting started - Monitor cluster health'].join(',') -} - integTestCluster { dependsOn copyWatcherRestTests setting 'xpack.monitoring.enabled', 'false' diff --git a/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java b/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java index 75ac3149af3..a989bb47611 100644 --- a/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java +++ b/qa/smoke-test-watcher-with-security/src/test/java/org/elasticsearch/smoketest/SmokeTestWatcherWithSecurityClientYamlTestSuiteIT.java @@ -20,7 +20,6 @@ import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistry import org.junit.After; import org.junit.Before; -import java.io.IOException; import java.util.Collections; import static java.util.Collections.emptyList; @@ -55,21 +54,34 @@ public class SmokeTestWatcherWithSecurityClientYamlTestSuiteIT extends ESClientY assertThat(resp.getStatusLine().getStatusCode(), is(201)); assertBusy(() -> { - try { - getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap()); + ClientYamlTestResponse response = + getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap()); + String state = (String) response.evaluate("stats.0.watcher_state"); - for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) { - ClientYamlTestResponse templateExistsResponse = getAdminExecutionContext().callApi("indices.exists_template", - singletonMap("name", template), emptyList(), emptyMap()); - assertThat(templateExistsResponse.getStatusCode(), is(200)); - } + switch (state) { + case "stopped": + ClientYamlTestResponse startResponse = + getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap()); + boolean isAcknowledged = (boolean) startResponse.evaluate("acknowledged"); + assertThat(isAcknowledged, is(true)); + break; + case "stopping": + throw new AssertionError("waiting until stopping state reached stopped state to start again"); + case "starting": + throw new AssertionError("waiting until starting state reached started state"); + case "started": + // all good here, we are done + break; + default: + throw new AssertionError("unknown state[" + state + "]"); + } + }); - ClientYamlTestResponse response = - getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap()); - String state = (String) response.evaluate("stats.0.watcher_state"); - assertThat(state, is("started")); - } catch (IOException e) { - throw new AssertionError(e); + assertBusy(() -> { + for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) { + ClientYamlTestResponse templateExistsResponse = getAdminExecutionContext().callApi("indices.exists_template", + singletonMap("name", template), emptyList(), emptyMap()); + assertThat(templateExistsResponse.getStatusCode(), is(200)); } }); } @@ -77,14 +89,26 @@ public class SmokeTestWatcherWithSecurityClientYamlTestSuiteIT extends ESClientY @After public void stopWatcher() throws Exception { assertBusy(() -> { - try { - getAdminExecutionContext().callApi("xpack.watcher.stop", emptyMap(), emptyList(), emptyMap()); - ClientYamlTestResponse response = - getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap()); - String state = (String) response.evaluate("stats.0.watcher_state"); - assertThat(state, is("stopped")); - } catch (IOException e) { - throw new AssertionError(e); + ClientYamlTestResponse response = + getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap()); + String state = (String) response.evaluate("stats.0.watcher_state"); + + switch (state) { + case "stopped": + // all good here, we are done + break; + case "stopping": + throw new AssertionError("waiting until stopping state reached stopped state"); + case "starting": + throw new AssertionError("waiting until starting state reached started state to stop"); + case "started": + ClientYamlTestResponse stopResponse = + getAdminExecutionContext().callApi("xpack.watcher.stop", emptyMap(), emptyList(), emptyMap()); + boolean isAcknowledged = (boolean) stopResponse.evaluate("acknowledged"); + assertThat(isAcknowledged, is(true)); + break; + default: + throw new AssertionError("unknown state[" + state + "]"); } }); }