From 443cfb94be405f9785a72ca690818556a0ac8b85 Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Tue, 18 Jul 2017 10:19:33 +0200 Subject: [PATCH] Tests: Ensure waiting time between stopping and starting watcher (elastic/x-pack-elasticsearch#2008) Otherwise we might run into race conditions that prevent a useful start up again. Those tests can be massively improved (no need to run against the real master node), once the watcher BWC compatible stats are in. relates elastic/x-pack-elasticsearch#2004 Original commit: elastic/x-pack-elasticsearch@52ca77809cb3638ff5717b620b497dbbcdb5019b --- .../WatchBackwardsCompatibilityIT.java | 53 ++++++++++++++++++- 1 file changed, 51 insertions(+), 2 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java index 3c7e2b607aa..0ec7049a432 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.upgrades; +import com.google.common.base.Charsets; import org.apache.http.HttpHost; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; @@ -14,6 +15,7 @@ import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentHelper; @@ -25,10 +27,14 @@ import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; import org.junit.Before; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.Base64; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,8 +47,10 @@ import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput; import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule; import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval; import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.not; public class WatchBackwardsCompatibilityIT extends ESRestTestCase { @@ -125,9 +133,34 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase { } public void testWatcherRestart() throws Exception { - executeAgainstAllNodes(client -> { + // TODO we should be able to run this against any node, once the bwc serialization issues are fixed + executeAgainstMasterNode(client -> { assertOK(client.performRequest("POST", "/_xpack/watcher/_stop")); + assertBusy(() -> { + try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) { + // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node + // using a checkedbiconsumer, that provides info against which node the request runs + String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8)); + assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"started\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); + } + }); + }); + + // TODO we should be able to run this against any node, once the bwc serialization issues are fixed + executeAgainstMasterNode(client -> { assertOK(client.performRequest("POST", "/_xpack/watcher/_start")); + assertBusy(() -> { + try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) { + // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node + // using a checkedbiconsumer, that provides info against which node the request runs + String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8)); + assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\""))); + } + }); }); } @@ -173,7 +206,23 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase { } } - public void executeAgainstAllNodes(CheckedConsumer consumer) + private void executeAgainstRandomNode(CheckedConsumer consumer) throws Exception { + List nodes = new ArrayList<>(this.nodes.values()); + nodes.sort(Comparator.comparing(Node::getId)); + Node node = randomFrom(nodes); + + try (RestClient client = buildClient(restClientSettings(), new HttpHost[] { node.getPublishAddress() })) { + consumer.accept(client); + } + } + + private void executeAgainstMasterNode(CheckedConsumer consumer) throws Exception { + try (RestClient client = buildClient(restClientSettings(), new HttpHost[] { this.nodes.getMaster().publishAddress })) { + consumer.accept(client); + } + } + + private void executeAgainstAllNodes(CheckedConsumer consumer) throws IOException { HttpHost[] newHosts = nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new); HttpHost[] bwcHosts = nodes.getBWCNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);