From 0a86f00d7e4ce5ed71e407ac4104a1ff611b21df Mon Sep 17 00:00:00 2001 From: Alexander Reelsen Date: Wed, 16 Aug 2017 16:29:52 +0200 Subject: [PATCH] Tests: Ensure responses are closed in watch backwards compat tests The HTTP client has to have it's response entities closed, otherwise it might block further requests because the underlying connection cannot be reused. Relates elastic/x-pack-elasticsearch#2004 Original commit: elastic/x-pack-elasticsearch@a24ecb976445a155e1d0f5efab85c88569a6375f --- .../WatchBackwardsCompatibilityIT.java | 45 +++++++++---------- 1 file changed, 22 insertions(+), 23 deletions(-) diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java index 2ca4eb4fd5c..a32b6aa8131 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/WatchBackwardsCompatibilityIT.java @@ -5,7 +5,6 @@ */ package org.elasticsearch.upgrades; -import com.google.common.base.Charsets; import org.elasticsearch.Version; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; @@ -15,7 +14,6 @@ import org.elasticsearch.client.http.entity.StringEntity; import org.elasticsearch.client.http.util.EntityUtils; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentHelper; @@ -28,8 +26,6 @@ import org.elasticsearch.xpack.watcher.condition.AlwaysCondition; import org.junit.Before; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -144,14 +140,13 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase { public void testWatcherRestart() throws Exception { executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_stop"))); executeAgainstMasterNode(client -> assertBusy(() -> { - try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) { - // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node - // using a checkedbiconsumer, that provides info against which node the request runs - String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8)); - assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); - assertThat(responseBody, not(containsString("\"watcher_state\":\"started\""))); - assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); - } + Response response = client.performRequest("GET", "_xpack/watcher/stats"); + String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node + // using a checkedbiconsumer, that provides info against which node the request runs + assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"started\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); })); // currently the triggered watches index is not checked by the upgrade API, resulting in an existing index @@ -159,20 +154,20 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase { Map params = new HashMap<>(); params.put("error_trace", "true"); params.put("ignore", "404"); - client().performRequest("DELETE", ".triggered_watches", params); + Response deleteTriggeredWatchesIndex = client().performRequest("DELETE", ".triggered_watches", params); + EntityUtils.consume(deleteTriggeredWatchesIndex.getEntity()); executeUpgradeIfNeeded(); executeAgainstRandomNode(client -> assertOK(client.performRequest("POST", "/_xpack/watcher/_start"))); executeAgainstMasterNode(client -> assertBusy(() -> { - try (InputStream is = client.performRequest("GET", "_xpack/watcher/stats").getEntity().getContent()) { - // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node - // using a checkedbiconsumer, that provides info against which node the request runs - String responseBody = Streams.copyToString(new InputStreamReader(is, Charsets.UTF_8)); - assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); - assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); - assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\""))); - } + Response response = client.performRequest("GET", "_xpack/watcher/stats"); + String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + // TODO once the serialization fix is in here, we can check for concrete fields if the run against a 5.x or a 6.x node + // using a checkedbiconsumer, that provides info against which node the request runs + assertThat(responseBody, not(containsString("\"watcher_state\":\"starting\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"stopping\""))); + assertThat(responseBody, not(containsString("\"watcher_state\":\"stopped\""))); })); } @@ -217,7 +212,8 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase { String watchIndexUpgradeRequired = ObjectPath.evaluate(response, "indices.\\.watches.action_required"); String triggeredWatchIndexUpgradeRequired = ObjectPath.evaluate(response, "indices.\\.triggered_watches.action_required"); if ("upgrade".equals(watchIndexUpgradeRequired) || "upgrade".equals(triggeredWatchIndexUpgradeRequired)) { - client.performRequest("POST", "_xpack/migration/upgrade/.watches", params); + Response upgradeResponse = client.performRequest("POST", "_xpack/migration/upgrade/.watches", params); + EntityUtils.consume(upgradeResponse.getEntity()); } } } @@ -258,8 +254,11 @@ public class WatchBackwardsCompatibilityIT extends ESRestTestCase { } } - private void assertOK(Response response) { + private void assertOK(Response response) throws IOException { assertThat(response.getStatusLine().getStatusCode(), anyOf(equalTo(200), equalTo(201))); + // consume that entity, otherwise the input stream will not be closed + // side effect is, that everything needs to be asserted ok directly, you cannot check the body! + EntityUtils.consume(response.getEntity()); } private Nodes buildNodeAndVersions() throws IOException {