Tests: Ensure responses are closed in watch backwards compat tests
The HTTP client has to have it's response entities closed, otherwise it might block further requests because the underlying connection cannot be reused. Relates elastic/x-pack-elasticsearch#2004 Original commit: elastic/x-pack-elasticsearch@a24ecb9764
This commit is contained in:
parent
8b23f133c7
commit
0a86f00d7e
|
@ -5,7 +5,6 @@
|
||||||
*/
|
*/
|
||||||
package org.elasticsearch.upgrades;
|
package org.elasticsearch.upgrades;
|
||||||
|
|
||||||
import com.google.common.base.Charsets;
|
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.client.Response;
|
import org.elasticsearch.client.Response;
|
||||||
import org.elasticsearch.client.RestClient;
|
import org.elasticsearch.client.RestClient;
|
||||||
|
@ -15,7 +14,6 @@ import org.elasticsearch.client.http.entity.StringEntity;
|
||||||
import org.elasticsearch.client.http.util.EntityUtils;
|
import org.elasticsearch.client.http.util.EntityUtils;
|
||||||
import org.elasticsearch.common.CheckedConsumer;
|
import org.elasticsearch.common.CheckedConsumer;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.io.Streams;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
|
@ -28,8 +26,6 @@ import org.elasticsearch.xpack.watcher.condition.AlwaysCondition;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.io.InputStreamReader;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -144,14 +140,13 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
|
||||||
public void testWatcherRestart() throws Exception {
|
public void testWatcherRestart() throws Exception {
|
||||||
executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_stop")));
|
executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_stop")));
|
||||||
executeAgainstMasterNode(client -> assertBusy(() -> {
|
executeAgainstMasterNode(client -> assertBusy(() -> {
|
||||||
try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) {
|
Response response = client.performRequest("GET", "_xpack/watcher/stats");
|
||||||
// TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node
|
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
||||||
// using a checkedbiconsumer, that provides info against which node the request runs
|
// TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node
|
||||||
String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8));
|
// using a checkedbiconsumer, that provides info against which node the request runs
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\"")));
|
assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\"")));
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"started\"")));
|
assertThat(responseBody, not(containsString("\"watcher_state\":\"started\"")));
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\"")));
|
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\"")));
|
||||||
}
|
|
||||||
}));
|
}));
|
||||||
|
|
||||||
// currently the triggered watches index is not checked by the upgrade API, resulting in an existing index
|
// currently the triggered watches index is not checked by the upgrade API, resulting in an existing index
|
||||||
|
@ -159,20 +154,20 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
|
||||||
Map<String, String> params = new HashMap<>();
|
Map<String, String> params = new HashMap<>();
|
||||||
params.put("error_trace", "true");
|
params.put("error_trace", "true");
|
||||||
params.put("ignore", "404");
|
params.put("ignore", "404");
|
||||||
client().performRequest("DELETE", ".triggered_watches", params);
|
Response deleteTriggeredWatchesIndex = client().performRequest("DELETE", ".triggered_watches", params);
|
||||||
|
EntityUtils.consume(deleteTriggeredWatchesIndex.getEntity());
|
||||||
|
|
||||||
executeUpgradeIfNeeded();
|
executeUpgradeIfNeeded();
|
||||||
|
|
||||||
executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_start")));
|
executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_start")));
|
||||||
executeAgainstMasterNode(client -> assertBusy(() -> {
|
executeAgainstMasterNode(client -> assertBusy(() -> {
|
||||||
try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) {
|
Response response = client.performRequest("GET", "_xpack/watcher/stats");
|
||||||
// TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node
|
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
|
||||||
// using a checkedbiconsumer, that provides info against which node the request runs
|
// TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node
|
||||||
String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8));
|
// using a checkedbiconsumer, that provides info against which node the request runs
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\"")));
|
assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\"")));
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\"")));
|
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\"")));
|
||||||
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\"")));
|
assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\"")));
|
||||||
}
|
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -217,7 +212,8 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
|
||||||
String watchIndexUpgradeRequired = ObjectPath.evaluate(response, "indices.\\.watches.action_required");
|
String watchIndexUpgradeRequired = ObjectPath.evaluate(response, "indices.\\.watches.action_required");
|
||||||
String triggeredWatchIndexUpgradeRequired = ObjectPath.evaluate(response, "indices.\\.triggered_watches.action_required");
|
String triggeredWatchIndexUpgradeRequired = ObjectPath.evaluate(response, "indices.\\.triggered_watches.action_required");
|
||||||
if ("upgrade".equals(watchIndexUpgradeRequired) || "upgrade".equals(triggeredWatchIndexUpgradeRequired)) {
|
if ("upgrade".equals(watchIndexUpgradeRequired) || "upgrade".equals(triggeredWatchIndexUpgradeRequired)) {
|
||||||
client.performRequest("POST", "_xpack/migration/upgrade/.watches", params);
|
Response upgradeResponse = client.performRequest("POST", "_xpack/migration/upgrade/.watches", params);
|
||||||
|
EntityUtils.consume(upgradeResponse.getEntity());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -258,8 +254,11 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertOK(Response response) {
|
private void assertOK(Response response) throws IOException {
|
||||||
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
|
assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201)));
|
||||||
|
// consume that entity, otherwise the input stream will not be closed
|
||||||
|
// side effect is, that everything needs to be asserted ok directly, you cannot check the body!
|
||||||
|
EntityUtils.consume(response.getEntity());
|
||||||
}
|
}
|
||||||
|
|
||||||
private Nodes buildNodeAndVersions() throws IOException {
|
private Nodes buildNodeAndVersions() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue