Allow partial request body reads in AWS S3 retries tests (#45847)

This commit changes the tests added in #45383 so that the fixture that 
emulates the S3 service now sometimes consumes all the request body 
before sending an error, sometimes consumes only a part of the request 
body and sometimes consumes nothing. The idea here is to beef up a bit 
the tests that writes blob because the client's retry logic relies on 
marking and resetting the blob's input stream.

This pull request also changes the testWriteBlobWithRetries() so that it 
(rarely) tests with a large blob (up to 1mb), which is more than the client's 
default read limit on input streams (131Kb).

Finally, it optimizes the ZeroInputStream so that it is a bit more effective 
(now works using an internal buffer and System.arraycopy() primitives).
This commit is contained in:
Tanguy Leroux 2019-08-23 13:38:52 +02:00
parent 6e696296fe
commit aee92d573c
1 changed files with 74 additions and 30 deletions

View File

@ -51,6 +51,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
@ -192,21 +193,31 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 512));
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
httpServer.createContext("/bucket/write_blob_max_retries", exchange -> {
final BytesReference body = Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) {
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
} else {
exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1);
if ("PUT".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getQuery() == null) {
if (countDown.countDown()) {
final BytesReference body = Streams.readFully(exchange.getRequestBody());
if (Objects.deepEquals(bytes, BytesReference.toBytes(body))) {
exchange.sendResponseHeaders(HttpStatus.SC_OK, -1);
} else {
exchange.sendResponseHeaders(HttpStatus.SC_BAD_REQUEST, -1);
}
exchange.close();
return;
}
if (randomBoolean()) {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]);
} else {
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
}
}
exchange.close();
return;
}
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
exchange.close();
});
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, true, null);
@ -217,17 +228,21 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
}
public void testWriteBlobWithReadTimeouts() {
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, true, null);
// HTTP server does not send a response
httpServer.createContext("/bucket/write_blob_timeout", exchange -> {
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody());
if (randomBoolean()) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, bytes.length - 1)]);
} else {
Streams.readFully(exchange.getRequestBody());
}
}
});
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 128));
Exception exception = expectThrows(IOException.class, () -> {
try (InputStream stream = new InputStreamIndexInput(new ByteArrayIndexInput("desc", bytes), bytes.length)) {
blobContainer.writeBlob("write_blob_timeout", stream, bytes.length, false);
@ -249,16 +264,18 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
final ByteSizeValue bufferSize = new ByteSizeValue(5, ByteSizeUnit.MB);
final BlobContainer blobContainer = createBlobContainer(null, readTimeout, true, bufferSize);
final int parts = randomIntBetween(1, 2);
final int parts = randomIntBetween(1, 5);
final long lastPartSize = randomLongBetween(10, 512);
final long blobSize = (parts * bufferSize.getBytes()) + lastPartSize;
final int maxRetries = 2; // we want all requests to fail at least once
final CountDown countDownInitiate = new CountDown(maxRetries);
final AtomicInteger countDownUploads = new AtomicInteger(maxRetries * (parts + 1));
final CountDown countDownComplete = new CountDown(maxRetries);
final int nbErrors = 2; // we want all requests to fail at least once
final CountDown countDownInitiate = new CountDown(nbErrors);
final AtomicInteger countDownUploads = new AtomicInteger(nbErrors * (parts + 1));
final CountDown countDownComplete = new CountDown(nbErrors);
httpServer.createContext("/bucket/write_large_blob", exchange -> {
final long contentLength = Long.parseLong(exchange.getRequestHeaders().getFirst("Content-Length"));
if ("POST".equals(exchange.getRequestMethod())
&& exchange.getRequestURI().getQuery().equals("uploads")) {
// initiate multipart upload request
@ -275,11 +292,14 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
exchange.close();
return;
}
} else if ("PUT".equals(exchange.getRequestMethod())) {
} else if ("PUT".equals(exchange.getRequestMethod())
&& exchange.getRequestURI().getQuery().contains("uploadId=TEST")
&& exchange.getRequestURI().getQuery().contains("partNumber=")) {
// upload part request
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream(exchange.getRequestBody());
BytesReference bytes = Streams.readFully(md5);
assertThat((long) bytes.length(), anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
assertThat(contentLength, anyOf(equalTo(lastPartSize), equalTo(bufferSize.getBytes())));
if (countDownUploads.decrementAndGet() % 2 == 0) {
exchange.getResponseHeaders().add("ETag", Base16.encodeAsString(md5.getMd5Digest()));
@ -289,10 +309,10 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
}
} else if ("POST".equals(exchange.getRequestMethod())
&& exchange.getRequestURI().getQuery().equals("uploadId=TEST")) {
&& exchange.getRequestURI().getQuery().equals("uploadId=TEST")) {
// complete multipart upload request
Streams.readFully(exchange.getRequestBody());
if (countDownComplete.countDown()) {
Streams.readFully(exchange.getRequestBody());
byte[] response = ("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<CompleteMultipartUploadResult>\n" +
" <Bucket>bucket</Bucket>\n" +
@ -308,8 +328,13 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
// sends an error back or let the request time out
if (useTimeout == false) {
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
if (randomBoolean() && contentLength > 0) {
Streams.readFully(exchange.getRequestBody(), new byte[randomIntBetween(1, Math.toIntExact(contentLength - 1))]);
} else {
Streams.readFully(exchange.getRequestBody());
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
}
exchange.close();
}
});
@ -323,9 +348,6 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
/**
* A resettable InputStream that only serves zeros.
*
* Ideally it should be wrapped into a BufferedInputStream but it seems that the AWS SDK is calling InputStream{@link #reset()}
* before calling InputStream{@link #mark(int)}, which is not permitted by the {@link #reset()} method contract.
**/
private static class ZeroInputStream extends InputStream {
@ -336,17 +358,32 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
private ZeroInputStream(final long length) {
this.length = length;
this.reads = new AtomicLong(length);
this.reads = new AtomicLong(0);
this.mark = -1;
}
@Override
public int read() throws IOException {
ensureOpen();
if (reads.decrementAndGet() < 0) {
return (reads.incrementAndGet() <= length) ? 0 : -1;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
if (len == 0) {
return 0;
}
final int available = available();
if (available == 0) {
return -1;
}
return 0;
final int toCopy = Math.min(len, available);
Arrays.fill(b, off, off + toCopy, (byte) 0);
reads.addAndGet(toCopy);
return toCopy;
}
@Override
@ -368,7 +405,14 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
@Override
public int available() throws IOException {
ensureOpen();
return Math.toIntExact(length - reads.get());
if (reads.get() >= length) {
return 0;
}
try {
return Math.toIntExact(length - reads.get());
} catch (ArithmeticException e) {
return Integer.MAX_VALUE;
}
}
@Override