Resume partial download from S3 on connection drop (#46589)

Today if the connection to S3 times out or drops after starting to download an
object then the SDK does not attempt to recover or resume the download, causing
the restore of the whole shard to fail and retry. This commit allows
Elasticsearch to detect such a mid-stream failure and to resume the download
from where it failed.
This commit is contained in:
David Turner 2019-09-17 13:10:55 +01:00
parent b0f09b279f
commit 65dc888623
5 changed files with 264 additions and 36 deletions

View File

@ -21,7 +21,6 @@ package org.elasticsearch.repositories.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
@ -31,7 +30,6 @@ import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import org.apache.lucene.util.SetOnce;
@ -48,7 +46,6 @@ import org.elasticsearch.common.collect.Tuple;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@ -81,18 +78,7 @@ class S3BlobContainer extends AbstractBlobContainer {
@Override
public InputStream readBlob(String blobName) throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(blobStore.bucket(),
buildKey(blobName)));
return s3Object.getObjectContent();
} catch (final AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw new NoSuchFileException("Blob object [" + blobName + "] not found: " + e.getMessage());
}
}
throw e;
}
return new S3RetryingInputStream(blobStore, buildKey(blobName));
}
/**

View File

@ -68,6 +68,10 @@ class S3BlobStore implements BlobStore {
return service.client(repositoryMetaData);
}
int getMaxRetries() {
return service.settings(repositoryMetaData).maxRetries;
}
public String bucket() {
return bucket;
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories.s3;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.ArrayList;
import java.util.List;
/**
* Wrapper around an S3 object that will retry the {@link GetObjectRequest} if the download fails part-way through, resuming from where
* the failure occurred. This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing
* the {@link Version#V_7_0_0} version constant) and removed when the SDK handles retries itself.
*
* See https://github.com/aws/aws-sdk-java/issues/856 for the related SDK issue
*/
class S3RetryingInputStream extends InputStream {
private static final Logger logger = LogManager.getLogger(S3RetryingInputStream.class);
static final int MAX_SUPPRESSED_EXCEPTIONS = 10;
private final S3BlobStore blobStore;
private final String blobKey;
private final int maxAttempts;
private InputStream currentStream;
private int attempt = 1;
private List<IOException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
private long currentOffset;
private boolean closed;
S3RetryingInputStream(S3BlobStore blobStore, String blobKey) throws IOException {
this.blobStore = blobStore;
this.blobKey = blobKey;
this.maxAttempts = blobStore.getMaxRetries() + 1;
currentStream = openStream();
}
private InputStream openStream() throws IOException {
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final GetObjectRequest getObjectRequest = new GetObjectRequest(blobStore.bucket(), blobKey);
if (currentOffset > 0) {
getObjectRequest.setRange(currentOffset);
}
final S3Object s3Object = SocketAccess.doPrivileged(() -> clientReference.client().getObject(getObjectRequest));
return s3Object.getObjectContent();
} catch (final AmazonClientException e) {
if (e instanceof AmazonS3Exception) {
if (404 == ((AmazonS3Exception) e).getStatusCode()) {
throw addSuppressedExceptions(new NoSuchFileException("Blob object [" + blobKey + "] not found: " + e.getMessage()));
}
}
throw addSuppressedExceptions(e);
}
}
@Override
public int read() throws IOException {
ensureOpen();
while (true) {
try {
final int result = currentStream.read();
currentOffset += 1;
return result;
} catch (IOException e) {
reopenStreamOrFail(e);
}
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
while (true) {
try {
final int bytesRead = currentStream.read(b, off, len);
if (bytesRead == -1) {
return -1;
}
currentOffset += bytesRead;
return bytesRead;
} catch (IOException e) {
reopenStreamOrFail(e);
}
}
}
private void ensureOpen() {
if (closed) {
assert false : "using S3RetryingInputStream after close";
throw new IllegalStateException("using S3RetryingInputStream after close");
}
}
private void reopenStreamOrFail(IOException e) throws IOException {
if (attempt >= maxAttempts) {
throw addSuppressedExceptions(e);
}
logger.debug(new ParameterizedMessage("failed reading [{}/{}] at offset [{}], attempt [{}] of [{}], retrying",
blobStore.bucket(), blobKey, currentOffset, attempt, maxAttempts), e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);
}
IOUtils.closeWhileHandlingException(currentStream);
currentStream = openStream();
}
@Override
public void close() throws IOException {
currentStream.close();
closed = true;
}
@Override
public long skip(long n) {
throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking");
}
@Override
public void reset() {
throw new UnsupportedOperationException("S3RetryingInputStream does not support seeking");
}
private <T extends Exception> T addSuppressedExceptions(T e) {
for (IOException failure : failures) {
e.addSuppressed(failure);
}
return e;
}
}

View File

@ -107,7 +107,7 @@ class S3Service implements Closeable {
* @param repositoryMetaData Repository Metadata
* @return S3ClientSettings
*/
private S3ClientSettings settings(RepositoryMetaData repositoryMetaData) {
S3ClientSettings settings(RepositoryMetaData repositoryMetaData) {
final String clientName = S3Repository.CLIENT_NAME.get(repositoryMetaData.settings());
final S3ClientSettings staticSettings = staticClientSettings.get(clientName);
if (staticSettings != null) {

View File

@ -21,8 +21,11 @@ package org.elasticsearch.repositories.s3;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.internal.MD5DigestCalculatingInputStream;
import com.amazonaws.util.Base16;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpStatus;
import org.apache.http.NoHttpResponseException;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.SuppressForbidden;
@ -51,12 +54,15 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.nio.charset.StandardCharsets;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.elasticsearch.repositories.s3.S3ClientSettings.DISABLE_CHUNKED_ENCODING;
import static org.elasticsearch.repositories.s3.S3ClientSettings.ENDPOINT_SETTING;
@ -67,6 +73,7 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
/**
* This class tests how a {@link S3BlobContainer} and its underlying AWS S3 client are retrying requests when reading or writing blobs.
@ -130,26 +137,41 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
repositoryMetaData));
}
public void testReadNonexistentBlobThrowsNoSuchFileException() {
final BlobContainer blobContainer = createBlobContainer(between(1, 5), null, null, null);
final Exception exception = expectThrows(NoSuchFileException.class, () -> blobContainer.readBlob("read_nonexistent_blob"));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("blob object [read_nonexistent_blob] not found"));
}
public void testReadBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, 512));
final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/read_blob_max_retries", exchange -> {
Streams.readFully(exchange.getRequestBody());
if (countDown.countDown()) {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
exchange.getResponseBody().write(bytes);
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
exchange.getResponseBody().write(bytes, rangeStart, bytes.length - rangeStart);
exchange.close();
return;
}
if (randomBoolean()) {
exchange.sendResponseHeaders(randomFrom(HttpStatus.SC_INTERNAL_SERVER_ERROR, HttpStatus.SC_BAD_GATEWAY,
HttpStatus.SC_SERVICE_UNAVAILABLE, HttpStatus.SC_GATEWAY_TIMEOUT), -1);
} else if (randomBoolean()) {
sendIncompleteContent(exchange, bytes);
}
if (randomBoolean()) {
exchange.close();
}
});
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 500));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
try (InputStream inputStream = blobContainer.readBlob("read_blob_max_retries")) {
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
assertThat(countDown.isCountedDown(), is(true));
@ -157,8 +179,9 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
}
public void testReadBlobWithReadTimeouts() {
final TimeValue readTimeout = TimeValue.timeValueMillis(randomIntBetween(100, 500));
final BlobContainer blobContainer = createBlobContainer(1, readTimeout, null, null);
final int maxRetries = randomInt(5);
final TimeValue readTimeout = TimeValue.timeValueMillis(between(100, 200));
final BlobContainer blobContainer = createBlobContainer(maxRetries, readTimeout, null, null);
// HTTP server does not send a response
httpServer.createContext("/bucket/read_blob_unresponsive", exchange -> {});
@ -168,15 +191,8 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
assertThat(exception.getCause(), instanceOf(SocketTimeoutException.class));
// HTTP server sends a partial response
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(10, 128));
httpServer.createContext("/bucket/read_blob_incomplete", exchange -> {
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length);
exchange.getResponseBody().write(bytes, 0, randomIntBetween(1, bytes.length - 1));
if (randomBoolean()) {
exchange.getResponseBody().flush();
}
});
final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/read_blob_incomplete", exchange -> sendIncompleteContent(exchange, bytes));
exception = expectThrows(SocketTimeoutException.class, () -> {
try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) {
@ -184,13 +200,47 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("read timed out"));
assertThat(exception.getSuppressed().length, equalTo(maxRetries));
}
public void testReadBlobWithNoHttpResponse() {
final BlobContainer blobContainer = createBlobContainer(randomInt(5), null, null, null);
// HTTP server closes connection immediately
httpServer.createContext("/bucket/read_blob_no_response", HttpExchange::close);
Exception exception = expectThrows(SdkClientException.class, () -> blobContainer.readBlob("read_blob_no_response"));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), containsString("the target server failed to respond"));
assertThat(exception.getCause(), instanceOf(NoHttpResponseException.class));
assertThat(exception.getSuppressed().length, equalTo(0));
}
public void testReadBlobWithPrematureConnectionClose() {
final int maxRetries = randomInt(20);
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
// HTTP server sends a partial response
final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/read_blob_incomplete", exchange -> {
sendIncompleteContent(exchange, bytes);
exchange.close();
});
final Exception exception = expectThrows(ConnectionClosedException.class, () -> {
try (InputStream stream = blobContainer.readBlob("read_blob_incomplete")) {
Streams.readFully(stream);
}
});
assertThat(exception.getMessage().toLowerCase(Locale.ROOT),
containsString("premature end of content-length delimited message body"));
assertThat(exception.getSuppressed().length, equalTo(Math.min(S3RetryingInputStream.MAX_SUPPRESSED_EXCEPTIONS, maxRetries)));
}
public void testWriteBlobWithRetries() throws Exception {
final int maxRetries = randomInt(5);
final CountDown countDown = new CountDown(maxRetries + 1);
final byte[] bytes = randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
final byte[] bytes = randomBlobContent();
httpServer.createContext("/bucket/write_blob_max_retries", exchange -> {
if ("PUT".equals(exchange.getRequestMethod()) && exchange.getRequestURI().getQuery() == null) {
if (countDown.countDown()) {
@ -343,6 +393,35 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
assertThat(countDownComplete.isCountedDown(), is(true));
}
private static byte[] randomBlobContent() {
return randomByteArrayOfLength(randomIntBetween(1, frequently() ? 512 : 1 << 20)); // rarely up to 1mb
}
private static int getRangeStart(HttpExchange exchange) {
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
if (rangeHeader == null) {
return 0;
}
final Matcher matcher = Pattern.compile("^bytes=([0-9]+)-9223372036854775806$").matcher(rangeHeader);
assertTrue(rangeHeader + " matches expected pattern", matcher.matches());
return Math.toIntExact(Long.parseLong(matcher.group(1)));
}
private static void sendIncompleteContent(HttpExchange exchange, byte[] bytes) throws IOException {
final int rangeStart = getRangeStart(exchange);
assertThat(rangeStart, lessThan(bytes.length));
exchange.getResponseHeaders().add("Content-Type", "text/plain; charset=utf-8");
exchange.sendResponseHeaders(HttpStatus.SC_OK, bytes.length - rangeStart);
final int bytesToSend = randomIntBetween(0, bytes.length - rangeStart - 1);
if (bytesToSend > 0) {
exchange.getResponseBody().write(bytes, rangeStart, bytesToSend);
}
if (randomBoolean()) {
exchange.getResponseBody().flush();
}
}
/**
* A resettable InputStream that only serves zeros.
**/
@ -413,7 +492,7 @@ public class S3BlobContainerRetriesTests extends ESTestCase {
}
@Override
public void close() throws IOException {
public void close() {
closed.set(true);
}