diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java index ab88cc93682..89ea3e55bfb 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -51,6 +51,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.Locale; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; @@ -192,21 +193,31 @@ public class S3BlobContainerRetriesTests extends ESTestCase { final int maxRetries = randomInt(5); final CountDown countDown = new CountDown(maxRetries + 1); - final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 512)); + final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb httpServer.createContext("/bucket/write_blob_max_retries", exchange -> { - final BytesReference body = Streams.readFully(exchange.getRequestBody()); - if (countDown.countDown()) { - if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) { - exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); - } else { - exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1); + if ("PUT".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getQuery() == null) { + if (countDown.countDown()) { + final BytesReference body = Streams.readFully(exchange.getRequestBody()); + if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) { + exchange.sendResponseHeaders(HttpStatus.SC_OK, -1); + } else { + exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1); + } + exchange.close(); + return; + } + + if (randomBoolean()) { + if (randomBoolean()) { + Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]); + } else { + Streams.readFully(exchange.getRequestBody()); + exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + } } exchange.close(); - return; } - exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, - HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); - exchange.close(); }); final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null); @@ -217,17 +228,21 @@ public class S3BlobContainerRetriesTests extends ESTestCase { } public void testWriteBlobWithReadTimeouts() { + final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128)); final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500)); final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null); // HTTP server does not send a response httpServer.createContext("/bucket/write_blob_timeout", exchange -> { if (randomBoolean()) { - Streams.readFully(exchange.getRequestBody()); + if (randomBoolean()) { + Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]); + } else { + Streams.readFully(exchange.getRequestBody()); + } } }); - final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 128)); Exception exception = expectThrows(IOException.class, () -> { try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) { blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false); @@ -249,16 +264,18 @@ public class S3BlobContainerRetriesTests extends ESTestCase { final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB); final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize); - final int parts = randomIntBetween(1, 2); + final int parts = randomIntBetween(1, 5); final long lastPartSize = randomLongBetween(10, 512); final long blobSize = (parts * bufferSize.getBytes()) + lastPartSize; - final int maxRetries = 2; // we want all requests to fail at least once - final CountDown countDownInitiate = new CountDown(maxRetries); - final AtomicInteger countDownUploads = new AtomicInteger(maxRetries * (parts + 1)); - final CountDown countDownComplete = new CountDown(maxRetries); + final int nbErrors = 2; // we want all requests to fail at least once + final CountDown countDownInitiate = new CountDown(nbErrors); + final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * (parts + 1)); + final CountDown countDownComplete = new CountDown(nbErrors); httpServer.createContext("/bucket/write_large_blob", exchange -> { + final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length")); + if ("POST".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getQuery().equals("uploads")) { // initiate multipart upload request @@ -275,11 +292,14 @@ public class S3BlobContainerRetriesTests extends ESTestCase { exchange.close(); return; } - } else if ("PUT".equals(exchange.getRequestMethod())) { + } else if ("PUT".equals(exchange.getRequestMethod()) + && exchange.getRequestURI().getQuery().contains("uploadId=TEST") + && exchange.getRequestURI().getQuery().contains("partNumber=")) { // upload part request MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody()); BytesReference bytes = Streams.readFully(md5); assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); + assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes()))); if (countDownUploads.decrementAndGet() % 2 == 0) { exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest())); @@ -289,10 +309,10 @@ public class S3BlobContainerRetriesTests extends ESTestCase { } } else if ("POST".equals(exchange.getRequestMethod()) - && exchange.getRequestURI().getQuery().equals("uploadId=TEST")) { + && exchange.getRequestURI().getQuery().equals("uploadId=TEST")) { // complete multipart upload request - Streams.readFully(exchange.getRequestBody()); if (countDownComplete.countDown()) { + Streams.readFully(exchange.getRequestBody()); byte[] response = ("\n" + "\n" + " bucket\n" + @@ -308,8 +328,13 @@ public class S3BlobContainerRetriesTests extends ESTestCase { // sends an error back or let the request time out if (useTimeout == false) { - exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, - HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + if (randomBoolean() && contentLength > 0) { + Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]); + } else { + Streams.readFully(exchange.getRequestBody()); + exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY, + HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1); + } exchange.close(); } }); @@ -323,9 +348,6 @@ public class S3BlobContainerRetriesTests extends ESTestCase { /** * A resettable InputStream that only serves zeros. - * - * Ideally it should be wrapped into a BufferedInputStream but it seems that the AWS SDK is calling InputStream{@link #reset()} - * before calling InputStream{@link #mark(int)}, which is not permitted by the {@link #reset()} method contract. **/ private static class ZeroInputStream extends InputStream { @@ -336,17 +358,32 @@ public class S3BlobContainerRetriesTests extends ESTestCase { private ZeroInputStream(final long length) { this.length = length; - this.reads = new AtomicLong(length); + this.reads = new AtomicLong(0); this.mark = -1; } @Override public int read() throws IOException { ensureOpen(); - if (reads.decrementAndGet() < 0) { + return (reads.incrementAndGet() <= length) ? 0 : -1; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + ensureOpen(); + if (len == 0) { + return 0; + } + + final int available = available(); + if (available == 0) { return -1; } - return 0; + + final int toCopy = Math.min(len, available); + Arrays.fill(b, off, off + toCopy, (byte) 0); + reads.addAndGet(toCopy); + return toCopy; } @Override @@ -368,7 +405,14 @@ public class S3BlobContainerRetriesTests extends ESTestCase { @Override public int available() throws IOException { ensureOpen(); - return Math.toIntExact(length - reads.get()); + if (reads.get() >= length) { + return 0; + } + try { + return Math.toIntExact(length - reads.get()); + } catch (ArithmeticException e) { + return Integer.MAX_VALUE; + } } @Override