Tests: Ensure waiting time between stopping and starting watcher (elastic/x-pack-elasticsearch#2008)
Otherwise we might run into race conditions that prevent a useful start up again. Those tests can be massively improved (no need to run against the real master node), once the watcher BWC compatible stats are in. relates elastic/x-pack-elasticsearch#2004 Original commit: elastic/x-pack-elasticsearch@52ca77809c
This commit is contained in:
parent
b4031ee96f
commit
443cfb94be
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.upgrades;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
import org.apache.http.HttpHost;
|
||||
import org.apache.http.entity.ContentType;
|
||||
import org.apache.http.entity.StringEntity;
|
||||
|
@ -14,6 +15,7 @@ import org.elasticsearch.client.Response;
|
|||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.common.CheckedConsumer;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||
|
@ -25,10 +27,14 @@ import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
|
|||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Base64;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -41,8 +47,10 @@ import static org.elasticsearch.xpack.watcher.input.InputBuilders.simpleInput;
|
|||
import static org.elasticsearch.xpack.watcher.trigger.TriggerBuilders.schedule;
|
||||
import static org.elasticsearch.xpack.watcher.trigger.schedule.Schedules.interval;
|
||||
import static org.hamcrest.Matchers.anyOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
|
||||
public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
|
||||
|
||||
|
@ -125,9 +133,34 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
|
|||
}
|
||||
|
||||
public void testWatcherRestart() throws Exception {
|
||||
executeAgainstAllNodes(client -> {
|
||||
// TODO we should be able to run this against any node, once the bwc serialization issues are fixed
|
||||
executeAgainstMasterNode(client -> {
|
||||
assertOK(client.performRequest("POST", "/_xpack/watcher/_stop"));
|
||||
assertBusy(() -> {
|
||||
try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) {
|
||||
// TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node
|
||||
// using a checkedbiconsumer, that provides info against which node the request runs
|
||||
String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8));
|
||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\"")));
|
||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"started\"")));
|
||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\"")));
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
// TODO we should be able to run this against any node, once the bwc serialization issues are fixed
|
||||
executeAgainstMasterNode(client -> {
|
||||
assertOK(client.performRequest("POST", "/_xpack/watcher/_start"));
|
||||
assertBusy(() -> {
|
||||
try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) {
|
||||
// TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node
|
||||
// using a checkedbiconsumer, that provides info against which node the request runs
|
||||
String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8));
|
||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\"")));
|
||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\"")));
|
||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\"")));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -173,7 +206,23 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void executeAgainstAllNodes(CheckedConsumer<RestClient, IOException> consumer)
|
||||
private void executeAgainstRandomNode(CheckedConsumer<RestClient, Exception> consumer) throws Exception {
|
||||
List<Node> nodes = new ArrayList<>(this.nodes.values());
|
||||
nodes.sort(Comparator.comparing(Node::getId));
|
||||
Node node = randomFrom(nodes);
|
||||
|
||||
try (RestClient client = buildClient(restClientSettings(), new HttpHost[] { node.getPublishAddress() })) {
|
||||
consumer.accept(client);
|
||||
}
|
||||
}
|
||||
|
||||
private void executeAgainstMasterNode(CheckedConsumer<RestClient, Exception> consumer) throws Exception {
|
||||
try (RestClient client = buildClient(restClientSettings(), new HttpHost[] { this.nodes.getMaster().publishAddress })) {
|
||||
consumer.accept(client);
|
||||
}
|
||||
}
|
||||
|
||||
private void executeAgainstAllNodes(CheckedConsumer<RestClient, IOException> consumer)
|
||||
throws IOException {
|
||||
HttpHost[] newHosts = nodes.getNewNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);
|
||||
HttpHost[] bwcHosts = nodes.getBWCNodes().stream().map(Node::getPublishAddress).toArray(HttpHost[]::new);
|
||||
|
|
Loading…
Reference in New Issue