diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java index 5c2f9e2ac67..4ef57a77d0b 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java @@ -137,19 +137,17 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase { } public void testWatcherRestart() throws Exception { - executeAgainstRandomNode(client -> { - assertOK(client.performRequest("POST", "/_xpack/watcher/_stop")); - assertBusy(() -> { - try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) { - // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node - // using a checkedbiconsumer, that provides info against which node the request runs - String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8)); - assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); - assertThat(responseBody, not(containsString("\"watcher_state\":\"started\""))); - assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); - } - }); - }); + executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_stop"))); + executeAgainstMasterNode(client -> assertBusy(() -> { + try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) { + // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node + // using a checkedbiconsumer, that provides info against which node the request runs + String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8)); + assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"started\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); + } + })); // currently the triggered watches index is not checked by the upgrade API, resulting in an existing index // that has not configured the `index.format: 6`, resulting in watcher not starting @@ -160,19 +158,17 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase { executeUpgradeIfNeeded(); - executeAgainstRandomNode(client -> { - assertOK(client.performRequest("POST", "/_xpack/watcher/_start")); - assertBusy(() -> { - try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) { - // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node - // using a checkedbiconsumer, that provides info against which node the request runs - String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8)); - assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); - assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); - assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\""))); - } - }); - }); + executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_start"))); + executeAgainstMasterNode(client -> assertBusy(() -> { + try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) { + // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node + // using a checkedbiconsumer, that provides info against which node the request runs + String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8)); + assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\""))); + } + })); } public void testWatchCrudApis() throws IOException { @@ -232,6 +228,12 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase { } } + private void executeAgainstMasterNode(CheckedConsumer consumer) throws Exception { + try (RestClient client = buildClient(restClientSettings(), new HttpHost[]{this.nodes.getMaster().publishAddress})) { + consumer.accept(client); + } + } + private void executeAgainstAllNodes(CheckedConsumer consumer) throws IOException { HttpHost[] newHosts = nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);