From aee92d573cb6a7a32a402a1f55e7d4a5da968625 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 23 Aug 2019 13:38:52 +0200 Subject: [PATCH] Allow partial request body reads in AWS S3 retries tests (#45847) This commit changes the tests added in #45383 so that the fixture that emulates the S3 service now sometimes consumes all the request body before sending an error, sometimes consumes only a part of the request body and sometimes consumes nothing. The idea here is to beef up a bit the tests that writes blob because the client's retry logic relies on marking and resetting the blob's input stream. This pull request also changes the testWriteBlobWithRetries() so that it (rarely) tests with a large blob (up to 1mb), which is more than the client's default read limit on input streams (131Kb). Finally, it optimizes the ZeroInputStream so that it is a bit more effective (now works using an internal buffer and System.arraycopy() primitives). --- .../s3/S3BlobContainerRetriesTests.java | 104 +++++++++++++----- 1 file changed, 74 insertions(+), 30 deletions(-) diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index ab88cc93682..89ea3e55bfb 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -51,6 +51,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Locale; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -192,21 +193,31 @@ public class S3BlobContainerRetriesTests extends ESTestCase { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); - final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 512)); + final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb httpServer.createContext("/bucket/write_blob_max_retries", exchange -> { - final BytesReference body = Streams.readFully(exchange.getRequestBody()); - if (countDown.countDown()) { - if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) { - exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); - } else { - exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1); + if ("PUT".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getQuery() == null) { + if (countDown.countDown()) { + final BytesReference body = Streams.readFully(exchange.getRequestBody()); + if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) { + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + } else { + exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1); + } + exchange.close(); + return; + } + + if (randomBoolean()) { + if (randomBoolean()) { + Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]); + } else { + Streams.readFully(exchange.getRequestBody()); + exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + } } exchange.close(); - return; } - exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, - HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); - exchange.close(); }); final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null); @@ -217,17 +228,21 @@ public class S3BlobContainerRetriesTests extends ESTestCase { } public void testWriteBlobWithReadTimeouts() { + final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null); // HTTP server does not send a response httpServer.createContext("/bucket/write_blob_timeout", exchange -> { if (randomBoolean()) { - Streams.readFully(exchange.getRequestBody()); + if (randomBoolean()) { + Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]); + } else { + Streams.readFully(exchange.getRequestBody()); + } } }); - final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 128)); Exception exception = expectThrows(IOException.class, () -> { try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) { blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false); @@ -249,16 +264,18 @@ public class S3BlobContainerRetriesTests extends ESTestCase { final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB); final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize); - final int parts = randomIntBetween(1, 2); + final int parts = randomIntBetween(1, 5); final long lastPartSize = randomLongBetween(10, 512); final long blobSize = (parts * bufferSize.getBytes()) + lastPartSize; - final int maxRetries = 2; // we want all requests to fail at least once - final CountDown countDownInitiate = new CountDown(maxRetries); - final AtomicInteger countDownUploads = new AtomicInteger(maxRetries * (parts + 1)); - final CountDown countDownComplete = new CountDown(maxRetries); + final int nbErrors = 2; // we want all requests to fail at least once + final CountDown countDownInitiate = new CountDown(nbErrors); + final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * (parts + 1)); + final CountDown countDownComplete = new CountDown(nbErrors); httpServer.createContext("/bucket/write_large_blob", exchange -> { + final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length")); + if ("POST".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getQuery().equals("uploads")) { // initiate multipart upload request @@ -275,11 +292,14 @@ public class S3BlobContainerRetriesTests extends ESTestCase { exchange.close(); return; } - } else if ("PUT".equals(exchange.getRequestMethod())) { + } else if ("PUT".equals(exchange.getRequestMethod()) + && exchange.getRequestURI().getQuery().contains("uploadId=TEST") + && exchange.getRequestURI().getQuery().contains("partNumber=")) { // upload part request MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); BytesReference bytes = Streams.readFully(md5); assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); + assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); if (countDownUploads.decrementAndGet() % 2 == 0) { exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); @@ -289,10 +309,10 @@ public class S3BlobContainerRetriesTests extends ESTestCase { } } else if ("POST".equals(exchange.getRequestMethod()) - && exchange.getRequestURI().getQuery().equals("uploadId=TEST")) { + && exchange.getRequestURI().getQuery().equals("uploadId=TEST")) { // complete multipart upload request - Streams.readFully(exchange.getRequestBody()); if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); byte[] response = ("\n" + "\n" + " bucket\n" + @@ -308,8 +328,13 @@ public class S3BlobContainerRetriesTests extends ESTestCase { // sends an error back or let the request time out if (useTimeout == false) { - exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, - HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + if (randomBoolean() && contentLength > 0) { + Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]); + } else { + Streams.readFully(exchange.getRequestBody()); + exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + } exchange.close(); } }); @@ -323,9 +348,6 @@ public class S3BlobContainerRetriesTests extends ESTestCase { /** * A resettable InputStream that only serves zeros. - * - * Ideally it should be wrapped into a BufferedInputStream but it seems that the AWS SDK is calling InputStream{@link #reset()} - * before calling InputStream{@link #mark(int)}, which is not permitted by the {@link #reset()} method contract. **/ private static class ZeroInputStream extends InputStream { @@ -336,17 +358,32 @@ public class S3BlobContainerRetriesTests extends ESTestCase { private ZeroInputStream(final long length) { this.length = length; - this.reads = new AtomicLong(length); + this.reads = new AtomicLong(0); this.mark = -1; } @Override public int read() throws IOException { ensureOpen(); - if (reads.decrementAndGet() < 0) { + return (reads.incrementAndGet() <= length) ? 0 : -1; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + ensureOpen(); + if (len == 0) { + return 0; + } + + final int available = available(); + if (available == 0) { return -1; } - return 0; + + final int toCopy = Math.min(len, available); + Arrays.fill(b, off, off + toCopy, (byte) 0); + reads.addAndGet(toCopy); + return toCopy; } @Override @@ -368,7 +405,14 @@ public class S3BlobContainerRetriesTests extends ESTestCase { @Override public int available() throws IOException { ensureOpen(); - return Math.toIntExact(length - reads.get()); + if (reads.get() >= length) { + return 0; + } + try { + return Math.toIntExact(length - reads.get()); + } catch (ArithmeticException e) { + return Integer.MAX_VALUE; + } } @Override