diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java index 275b5c94052..b9a33e50278 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreRepositoryTests.java @@ -213,7 +213,7 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe if ("/token".equals(exchange.getRequestURI().getPath())) { try { // token content is unique per node (not per request) - return Streams.readFully(exchange.getRequestBody()).utf8ToString(); + return Streams.readFully(Streams.noCloseStream(exchange.getRequestBody())).utf8ToString(); } catch (IOException e) { throw new AssertionError("Unable to read token request body", e); } diff --git a/server/src/main/java/org/elasticsearch/common/io/Streams.java b/server/src/main/java/org/elasticsearch/common/io/Streams.java index 4a8f2f5de5b..222f94e65ef 100644 --- a/server/src/main/java/org/elasticsearch/common/io/Streams.java +++ b/server/src/main/java/org/elasticsearch/common/io/Streams.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.BufferedReader; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -219,6 +220,21 @@ public abstract class Streams { } } + /** + * Wraps an {@link InputStream} such that it's {@code close} method becomes a noop + * + * @param stream {@code InputStream} to wrap + * @return wrapped {@code InputStream} + */ + public static InputStream noCloseStream(InputStream stream) { + return new FilterInputStream(stream) { + @Override + public void close() { + // noop + } + }; + } + /** * Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when * close is called. diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java index bb62f7692d1..7dcaaf16f4a 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/FakeOAuth2HttpHandler.java @@ -30,12 +30,20 @@ import static java.nio.charset.StandardCharsets.UTF_8; @SuppressForbidden(reason = "Uses a HttpServer to emulate a fake OAuth2 authentication service") public class FakeOAuth2HttpHandler implements HttpHandler { + private static final byte[] BUFFER = new byte[1024]; + @Override public void handle(final HttpExchange exchange) throws IOException { - byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8); - exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); - exchange.getResponseBody().write(response); - exchange.close(); + try { + byte[] response = ("{\"access_token\":\"foo\",\"token_type\":\"Bearer\",\"expires_in\":3600}").getBytes(UTF_8); + exchange.getResponseHeaders().add("Content-Type", "application/json"); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); + exchange.getResponseBody().write(response); + while (exchange.getRequestBody().read(BUFFER) >= 0) ; + } finally { + int read = exchange.getRequestBody().read(); + assert read == -1 : "Request body should have been fully read here but saw [" + read + "]"; + exchange.close(); + } } } diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index 9d8fa756d16..ea7cf131343 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -81,6 +81,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { assert read == -1 : "Request body should have been empty but saw [" + read + "]"; } try { + // Request body is closed in the finally block + final InputStream wrappedRequest = Streams.noCloseStream(exchange.getRequestBody()); if (Regex.simpleMatch("GET /storage/v1/b/" + bucket + "/o*", request)) { // List Objects https://cloud.google.com/storage/docs/json_api/v1/objects/list final Map params = new HashMap<>(); @@ -159,7 +161,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { // Batch https://cloud.google.com/storage/docs/json_api/v1/how-tos/batch final String uri = "/storage/v1/b/" + bucket + "/o/"; final StringBuilder batch = new StringBuilder(); - for (String line : Streams.readAllLines(new BufferedInputStream(exchange.getRequestBody()))) { + for (String line : Streams.readAllLines(new BufferedInputStream(wrappedRequest))) { if (line.length() == 0 || line.startsWith("--") || line.toLowerCase(Locale.ROOT).startsWith("content")) { batch.append(line).append('\n'); } else if (line.startsWith("DELETE")) { @@ -180,7 +182,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { } else if (Regex.simpleMatch("POST /upload/storage/v1/b/" + bucket + "/*uploadType=multipart*", request)) { // Multipart upload - Optional> content = parseMultipartRequestBody(exchange.getRequestBody()); + Optional> content = parseMultipartRequestBody(wrappedRequest); if (content.isPresent()) { blobs.put(content.get().v1(), content.get().v2()); @@ -199,7 +201,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { final String blobName = params.get("name"); blobs.put(blobName, BytesArray.EMPTY); - byte[] response = Streams.readFully(exchange.getRequestBody()).utf8ToString().getBytes(UTF_8); + byte[] response = Streams.readFully(wrappedRequest).utf8ToString().getBytes(UTF_8); exchange.getResponseHeaders().add("Content-Type", "application/json"); exchange.getResponseHeaders().add("Location", httpServerUrl(exchange) + "/upload/storage/v1/b/" + bucket + "/o?" + "uploadType=resumable" @@ -225,7 +227,7 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { final int end = getContentRangeEnd(range); final ByteArrayOutputStream out = new ByteArrayOutputStream(); - long bytesRead = Streams.copy(exchange.getRequestBody(), out); + long bytesRead = Streams.copy(wrappedRequest, out); int length = Math.max(end + 1, limit != null ? limit : 0); if ((int) bytesRead > length) { throw new AssertionError("Requesting more bytes than available for blob"); @@ -250,6 +252,8 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { exchange.sendResponseHeaders(RestStatus.INTERNAL_SERVER_ERROR.getStatus(), -1); } } finally { + int read = exchange.getRequestBody().read(); + assert read == -1 : "Request body should have been fully read here but saw [" + read + "]"; exchange.close(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java index 040961ed52b..03f89125ad9 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/ESMockAPIBasedRepositoryIntegTestCase.java @@ -190,16 +190,26 @@ public abstract class ESMockAPIBasedRepositoryIntegTestCase extends ESBlobStoreR @Override public void handle(final HttpExchange exchange) throws IOException { - final String requestId = requestUniqueId(exchange); - assert Strings.hasText(requestId); + try { + final String requestId = requestUniqueId(exchange); + assert Strings.hasText(requestId); - final boolean canFailRequest = canFailRequest(exchange); - final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet(); - if (count >= maxErrorsPerRequest || canFailRequest == false) { - requests.remove(requestId); - delegate.handle(exchange); - } else { - handleAsError(exchange); + final boolean canFailRequest = canFailRequest(exchange); + final int count = requests.computeIfAbsent(requestId, req -> new AtomicInteger(0)).incrementAndGet(); + if (count >= maxErrorsPerRequest || canFailRequest == false) { + requests.remove(requestId); + delegate.handle(exchange); + } else { + handleAsError(exchange); + } + } finally { + try { + int read = exchange.getRequestBody().read(); + assert read == -1 : "Request body should have been fully read here but saw [" + read + "]"; + } catch (IOException e) { + // ignored, stream is assumed to have been closed by previous handler + } + exchange.close(); } }