Watcher: Refactor/Fix TransportWatcherServiceAction (elastic/x-pack-elasticsearch#4240)

This commit fixes several issues with the current implementation of
starting & stopping watcher

1. The WatcherServiceResponse was always returning a message, that the
   request was acknowledged, completely independent from the fact if it was
   or not.
2. A new cluster state instance was always returned, regardless if the
   state had changed or not (which is explicitely mentioned in the
   javadocs to check for this)
3. The AckedClusterStateUpdateTask now returns a proper WatcherServiceResponse
4. A failure now gets logged

Relates elastic/x-pack-elasticsearch#4225 (this is just a hunch for now)

Original commit: elastic/x-pack-elasticsearch@f4c1749f95
This commit is contained in:
Alexander Reelsen 2018-04-05 09:33:28 +02:00 committed by GitHub
parent 3852b41330
commit f709fcd083
5 changed files with 89 additions and 47 deletions

View File

@ -5,8 +5,8 @@
*/
package org.elasticsearch.smoketest;
import org.apache.http.HttpHost;
import com.carrotsearch.randomizedtesting.annotations.Name;
import org.apache.http.HttpHost;
import org.elasticsearch.Version;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.common.settings.SecureString;
@ -78,10 +78,24 @@ public class XDocsClientYamlTestSuiteIT extends XPackRestIT {
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");
if (state.equals("started") == false || state.equals("starting") == false) {
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
switch (state) {
case "stopped":
ClientYamlTestResponse startResponse =
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
boolean isAcknowledged = (boolean) startResponse.evaluate("acknowledged");
assertThat(isAcknowledged, is(true));
break;
case "stopping":
throw new AssertionError("waiting until stopping state reached stopped state to start again");
case "starting":
throw new AssertionError("waiting until starting state reached started state");
case "started":
// all good here, we are done
break;
default:
throw new AssertionError("unknown state[" + state + "]");
}
assertThat(state, is("started"));
});
}
}
@ -89,7 +103,7 @@ public class XDocsClientYamlTestSuiteIT extends XPackRestIT {
@Override
protected boolean isWatcherTest() {
String testName = getTestName();
return testName != null && testName.contains("watcher");
return testName != null && testName.contains("watcher/");
}
@Override

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.watcher.transport.actions.service;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
@ -61,7 +62,7 @@ public class TransportWatcherServiceAction extends TransportMasterNodeAction<Wat
@Override
protected void masterOperation(WatcherServiceRequest request, ClusterState state,
ActionListener<WatcherServiceResponse> listener) throws Exception {
ActionListener<WatcherServiceResponse> listener) {
switch (request.getCommand()) {
case STOP:
setWatcherMetaDataAndWait(true, listener);
@ -72,30 +73,38 @@ public class TransportWatcherServiceAction extends TransportMasterNodeAction<Wat
}
}
private void setWatcherMetaDataAndWait(boolean manuallyStopped, final ActionListener
<WatcherServiceResponse> listener) {
private void setWatcherMetaDataAndWait(boolean manuallyStopped, final ActionListener<WatcherServiceResponse> listener) {
String source = manuallyStopped ? "update_watcher_manually_stopped" : "update_watcher_manually_started";
clusterService.submitStateUpdateTask(source,
new AckedClusterStateUpdateTask<Boolean>(ackedRequest,
ActionListener.wrap(b -> listener.onResponse(new WatcherServiceResponse(true)), listener::onFailure)) {
new AckedClusterStateUpdateTask<WatcherServiceResponse>(ackedRequest, listener) {
@Override
protected Boolean newResponse(boolean result) {
return result;
protected WatcherServiceResponse newResponse(boolean acknowledged) {
return new WatcherServiceResponse(acknowledged);
}
@Override
public ClusterState execute(ClusterState clusterState) throws Exception {
ClusterState.Builder builder = new ClusterState.Builder(clusterState);
builder.metaData(MetaData.builder(clusterState.getMetaData())
.putCustom(WatcherMetaData.TYPE, new WatcherMetaData(manuallyStopped)));
return builder.build();
public ClusterState execute(ClusterState clusterState) {
WatcherMetaData newWatcherMetaData = new WatcherMetaData(manuallyStopped);
WatcherMetaData currentMetaData = clusterState.metaData().custom(WatcherMetaData.TYPE);
// adhere to the contract of returning the original state if nothing has changed
if (newWatcherMetaData.equals(currentMetaData)) {
return clusterState;
} else {
ClusterState.Builder builder = new ClusterState.Builder(clusterState);
builder.metaData(MetaData.builder(clusterState.getMetaData())
.putCustom(WatcherMetaData.TYPE, newWatcherMetaData));
return builder.build();
}
}
@Override
public void onFailure(String source, Exception throwable) {
listener.onFailure(throwable);
public void onFailure(String source, Exception e) {
logger.error(new ParameterizedMessage("could not update watcher stopped status to [{}], source [{}]",
manuallyStopped, source), e);
listener.onFailure(e);
}
});
}

View File

@ -459,6 +459,7 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
String message = String.format(Locale.ROOT, "Expected watcher to be started, but state was %s", currentStatesFromStatsRequest);
assertThat(message, states, everyItem(is(WatcherState.STARTED)));
assertThat(watcherStatsResponse.watcherMetaData().manuallyStopped(), is(false));
});
}

View File

@ -12,12 +12,6 @@ task copyWatcherRestTests(type: Copy) {
include 'rest-api-spec/test/watcher/**'
}
integTestRunner {
systemProperty 'tests.rest.blacklist',
['hijack/10_basic/*',
'getting_started/10_monitor_cluster_health/Getting started - Monitor cluster health'].join(',')
}
integTestCluster {
dependsOn copyWatcherRestTests
setting 'xpack.monitoring.enabled', 'false'

View File

@ -20,7 +20,6 @@ import org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistry
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.util.Collections;
import static java.util.Collections.emptyList;
@ -55,21 +54,34 @@ public class SmokeTestWatcherWithSecurityClientYamlTestSuiteIT extends ESClientY
assertThat(resp.getStatusLine().getStatusCode(), is(201));
assertBusy(() -> {
try {
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");
for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) {
ClientYamlTestResponse templateExistsResponse = getAdminExecutionContext().callApi("indices.exists_template",
singletonMap("name", template), emptyList(), emptyMap());
assertThat(templateExistsResponse.getStatusCode(), is(200));
}
switch (state) {
case "stopped":
ClientYamlTestResponse startResponse =
getAdminExecutionContext().callApi("xpack.watcher.start", emptyMap(), emptyList(), emptyMap());
boolean isAcknowledged = (boolean) startResponse.evaluate("acknowledged");
assertThat(isAcknowledged, is(true));
break;
case "stopping":
throw new AssertionError("waiting until stopping state reached stopped state to start again");
case "starting":
throw new AssertionError("waiting until starting state reached started state");
case "started":
// all good here, we are done
break;
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");
assertThat(state, is("started"));
} catch (IOException e) {
throw new AssertionError(e);
assertBusy(() -> {
for (String template : WatcherIndexTemplateRegistryField.TEMPLATE_NAMES) {
ClientYamlTestResponse templateExistsResponse = getAdminExecutionContext().callApi("indices.exists_template",
singletonMap("name", template), emptyList(), emptyMap());
assertThat(templateExistsResponse.getStatusCode(), is(200));
}
});
}
@ -77,14 +89,26 @@ public class SmokeTestWatcherWithSecurityClientYamlTestSuiteIT extends ESClientY
@After
public void stopWatcher() throws Exception {
assertBusy(() -> {
try {
getAdminExecutionContext().callApi("xpack.watcher.stop", emptyMap(), emptyList(), emptyMap());
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");
assertThat(state, is("stopped"));
} catch (IOException e) {
throw new AssertionError(e);
ClientYamlTestResponse response =
getAdminExecutionContext().callApi("xpack.watcher.stats", emptyMap(), emptyList(), emptyMap());
String state = (String) response.evaluate("stats.0.watcher_state");
switch (state) {
case "stopped":
// all good here, we are done
break;
case "stopping":
throw new AssertionError("waiting until stopping state reached stopped state");
case "starting":
throw new AssertionError("waiting until starting state reached started state to stop");
case "started":
ClientYamlTestResponse stopResponse =
getAdminExecutionContext().callApi("xpack.watcher.stop", emptyMap(), emptyList(), emptyMap());
boolean isAcknowledged = (boolean) stopResponse.evaluate("acknowledged");
assertThat(isAcknowledged, is(true));
break;
default:
throw new AssertionError("unknown state[" + state + "]");
}
});
}