Use streaming reads for GCS (#55506)

To read from GCS repositories we're currently using Google SDK's official BlobReadChannel,
which issues a new request every 2MB (default chunk size for BlobReadChannel) using range
requests, and fully downloads the chunk before exposing it to the returned InputStream. This
means that the SDK issues an awfully high number of requests to download large blobs.
Increasing the chunk size is not an option, as that will mean that an awfully high amount of
heap memory will be consumed by the download process.

The Google SDK does not provide the right abstractions for a streaming download. This PR
uses the lower-level primitives of the SDK to implement a streaming download, similar to what
S3's SDK does.

Also closes #55505
This commit is contained in:
Yannick Welsch 2020-04-21 13:13:09 +02:00
parent 59d377462f
commit ba39c261e8
5 changed files with 150 additions and 125 deletions

View File

@ -197,7 +197,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
* @return the InputStream used to read the blob's content
*/
InputStream readBlob(String blobName) throws IOException {
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), 0, Long.MAX_VALUE);
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName));
}
/**
@ -218,7 +218,8 @@ class GoogleCloudStorageBlobStore implements BlobStore {
if (length == 0) {
return new ByteArrayInputStream(new byte[0]);
} else {
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), position, length);
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), position,
Math.addExact(position, length - 1));
}
}

View File

@ -18,26 +18,31 @@
*/
package org.elasticsearch.repositories.gcs;
import com.google.cloud.ReadChannel;
import com.google.api.client.http.HttpResponse;
import com.google.api.services.storage.Storage.Objects.Get;
import com.google.cloud.BaseService;
import com.google.cloud.RetryHelper;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.spi.v1.HttpStorageRpc;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.SpecialPermission;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.core.internal.io.IOUtils;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.SeekableByteChannel;
import java.lang.reflect.Field;
import java.nio.file.NoSuchFileException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import java.util.stream.Stream;
/**
* Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred.
@ -51,13 +56,14 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
static final int MAX_SUPPRESSED_EXCEPTIONS = 10;
private final Storage client;
private final com.google.api.services.storage.Storage storage;
private final BlobId blobId;
private final long start;
private final long length;
private final long end;
private final int maxRetries;
private final int maxAttempts;
private InputStream currentStream;
private int attempt = 1;
@ -65,100 +71,127 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
private long currentOffset;
private boolean closed;
GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId, long start, long length) throws IOException {
GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException {
this(client, blobId, 0, Long.MAX_VALUE - 1);
}
// both start and end are inclusive bounds, following the definition in https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35
GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId, long start, long end) throws IOException {
if (start < 0L) {
throw new IllegalArgumentException("start must be non-negative");
}
if (end < start || end == Long.MAX_VALUE) {
throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE");
}
this.client = client;
this.blobId = blobId;
this.start = start;
this.length = length;
this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1;
this.end = end;
this.maxAttempts = client.getOptions().getRetrySettings().getMaxAttempts();
SpecialPermission.check();
storage = getStorage(client);
currentStream = openStream();
}
private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
@SuppressForbidden(reason = "need access to storage client")
private static com.google.api.services.storage.Storage getStorage(Storage client) {
return AccessController.doPrivileged((PrivilegedAction<com.google.api.services.storage.Storage>) () -> {
assert client.getOptions().getRpc() instanceof HttpStorageRpc;
assert Stream.of(client.getOptions().getRpc().getClass().getDeclaredFields()).anyMatch(f -> f.getName().equals("storage"));
try {
final Field storageField = client.getOptions().getRpc().getClass().getDeclaredField("storage");
storageField.setAccessible(true);
return (com.google.api.services.storage.Storage) storageField.get(client.getOptions().getRpc());
} catch (Exception e) {
throw new IllegalStateException("storage could not be set up", e);
}
});
}
private InputStream openStream() throws IOException {
try {
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId));
final long end = start + length < 0L ? Long.MAX_VALUE : start + length; // inclusive
final SeekableByteChannel adaptedChannel = new SeekableByteChannel() {
try {
return RetryHelper.runWithRetries(() -> {
try {
return SocketAccess.doPrivilegedIOException(() -> {
final Get get = storage.objects().get(blobId.getBucket(), blobId.getName());
get.setReturnRawInputStream(true);
long position;
@SuppressForbidden(reason = "Channel is based of a socket not a file")
@Override
public int read(ByteBuffer dst) throws IOException {
final long remainingBytesToRead = end - position;
assert remainingBytesToRead >= 0L;
// The SDK uses the maximum between chunk size and dst.remaining() to determine fetch size
// We can be smarter here and only fetch what's needed when we know the length
if (remainingBytesToRead < DEFAULT_CHUNK_SIZE) {
readChannel.setChunkSize(Math.toIntExact(remainingBytesToRead));
}
if (remainingBytesToRead < dst.remaining()) {
dst.limit(dst.position() + Math.toIntExact(remainingBytesToRead));
}
try {
int read = SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
if (read > 0) {
position += read;
if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) {
get.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end);
}
final HttpResponse resp = get.executeMedia();
final Long contentLength = resp.getHeaders().getContentLength();
InputStream content = resp.getContent();
if (contentLength != null) {
content = new ContentLengthValidatingInputStream(content, contentLength);
}
return content;
});
} catch (IOException e) {
throw StorageException.translate(e);
}
return read;
} catch (StorageException e) {
if (e.getCode() == HTTP_NOT_FOUND) {
throw new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + e.getMessage());
}
throw e;
} finally {
readChannel.setChunkSize(0); // set to default again
}
}
@Override
public int write(ByteBuffer src) {
throw new UnsupportedOperationException();
}
@Override
public long position() {
return position;
}
@Override
public SeekableByteChannel position(long newPosition) throws IOException {
readChannel.seek(newPosition);
this.position = newPosition;
return this;
}
@Override
public long size() {
return length;
}
@Override
public SeekableByteChannel truncate(long size) {
throw new UnsupportedOperationException();
}
@Override
public boolean isOpen() {
return readChannel.isOpen();
}
@Override
public void close() throws IOException {
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
}
};
if (currentOffset > 0 || start > 0) {
adaptedChannel.position(Math.addExact(start, currentOffset));
}, client.getOptions().getRetrySettings(), BaseService.EXCEPTION_HANDLER, client.getOptions().getClock());
} catch (RetryHelper.RetryHelperException e) {
throw StorageException.translateAndThrow(e);
}
return Channels.newInputStream(adaptedChannel);
} catch (StorageException e) {
if (e.getCode() == 404) {
throw addSuppressedExceptions(
new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + e.getMessage()));
}
throw addSuppressedExceptions(e);
}
}
// Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream
// We have to implement our own validation logic here
static final class ContentLengthValidatingInputStream extends FilterInputStream {
private final long contentLength;
private long read = 0L;
ContentLengthValidatingInputStream(InputStream in, long contentLength) {
super(in);
this.contentLength = contentLength;
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
final int n = in.read(b, off, len);
if (n == -1) {
checkContentLengthOnEOF();
} else {
read += n;
}
return n;
}
@Override
public int read() throws IOException {
final int n = in.read();
if (n == -1) {
checkContentLengthOnEOF();
} else {
read++;
}
return n;
}
@Override
public long skip(long len) throws IOException {
final long n = in.skip(len);
read += n;
return n;
}
private void checkContentLengthOnEOF() throws IOException {
if (read < contentLength) {
throw new IOException("Connection closed prematurely: read = " + read + ", Content-Length = " + contentLength);
}
}
}
@Override
public int read() throws IOException {
ensureOpen();
@ -167,8 +200,8 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
final int result = currentStream.read();
currentOffset += 1;
return result;
} catch (StorageException e) {
reopenStreamOrFail(e);
} catch (IOException e) {
reopenStreamOrFail(StorageException.translate(e));
}
}
}
@ -184,8 +217,8 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
}
currentOffset += bytesRead;
return bytesRead;
} catch (StorageException e) {
reopenStreamOrFail(e);
} catch (IOException e) {
reopenStreamOrFail(StorageException.translate(e));
}
}
}
@ -197,12 +230,13 @@ class GoogleCloudStorageRetryingInputStream extends InputStream {
}
}
// TODO: check that object did not change when stream is reopened (e.g. based on etag)
private void reopenStreamOrFail(StorageException e) throws IOException {
if (attempt >= maxRetries) {
if (attempt >= maxAttempts) {
throw addSuppressedExceptions(e);
}
logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying",
blobId, currentOffset, attempt, maxRetries), e);
blobId, currentOffset, attempt, maxAttempts), e);
attempt += 1;
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
failures.add(e);

View File

@ -99,13 +99,6 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
GoogleCloudStorageBlobStoreRepositoryTests.assumeNotJava8();
}
// Google's SDK ignores Content-Length header when no bytes are sent, see SizeValidatingInputStream
// TODO: fix this in the SDK
@Override
protected int minIncompleteContentToSend() {
return 1;
}
@Override
protected String downloadStorageEndpoint(String blob) {
return "/download/storage/v1/b/bucket/o/" + blob;
@ -153,7 +146,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
.setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier())
.setMaxRpcTimeout(Duration.ofSeconds(1));
if (maxRetries != null) {
retrySettingsBuilder.setMaxAttempts(maxRetries);
retrySettingsBuilder.setMaxAttempts(maxRetries + 1);
}
return options.toBuilder()
.setHost(options.getHost())
@ -191,10 +184,9 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
httpServer.createContext("/download/storage/v1/b/bucket/o/large_blob_retries", exchange -> {
Streams.readFully(exchange.getRequestBody());
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
final String[] range = exchange.getRequestHeaders().get("Range").get(0).substring("bytes=".length()).split("-");
final int offset = Integer.parseInt(range[0]);
final int end = Integer.parseInt(range[1]);
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.min(end + 1, bytes.length));
final Tuple<Long, Long> range = getRange(exchange);
final int offset = Math.toIntExact(range.v1());
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.toIntExact(Math.min(range.v2() + 1, bytes.length)));
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), chunk.length);
if (randomBoolean() && countDown.decrementAndGet() >= 0) {
exchange.getResponseBody().write(chunk, 0, chunk.length - 1);
@ -400,10 +392,4 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon
}
};
}
@Override
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55505")
public void testReadRangeBlobWithRetries() throws Exception {
}
}

View File

@ -138,12 +138,19 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler {
BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", ""));
if (blob != null) {
final String range = exchange.getRequestHeaders().getFirst("Range");
Matcher matcher = RANGE_MATCHER.matcher(range);
if (matcher.find() == false) {
throw new AssertionError("Range bytes header does not match expected format: " + range);
final int offset;
final int end;
if (range == null) {
offset = 0;
end = blob.length() - 1;
} else {
Matcher matcher = RANGE_MATCHER.matcher(range);
if (matcher.find() == false) {
throw new AssertionError("Range bytes header does not match expected format: " + range);
}
offset = Integer.parseInt(matcher.group(1));
end = Integer.parseInt(matcher.group(2));
}
final int offset = Integer.parseInt(matcher.group(1));
final int end = Integer.parseInt(matcher.group(2));
BytesReference response = blob;
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
final int bufferedLength = response.length();

View File

@ -250,7 +250,8 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
assertThat(exception, either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class))
.or(instanceOf(RuntimeException.class)));
assertThat(exception.getMessage().toLowerCase(Locale.ROOT), either(containsString("read timed out")).or(
containsString("premature end of chunk coded message body: closing chunk expected")).or(containsString("Read timed out")));
containsString("premature end of chunk coded message body: closing chunk expected")).or(containsString("Read timed out"))
.or(containsString("unexpected end of file from server")));
assertThat(exception.getSuppressed().length, equalTo(maxRetries));
}
@ -278,7 +279,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null);
// HTTP server sends a partial response
final byte[] bytes = randomBlobContent(minIncompleteContentToSend() + 1);
final byte[] bytes = randomBlobContent(1);
httpServer.createContext(downloadStorageEndpoint("read_blob_incomplete"), exchange -> {
sendIncompleteContent(exchange, bytes);
exchange.close();
@ -286,7 +287,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
final Exception exception = expectThrows(Exception.class, () -> {
try (InputStream stream = randomBoolean() ?
blobContainer.readBlob("read_blob_incomplete", 0, minIncompleteContentToSend() + 1):
blobContainer.readBlob("read_blob_incomplete", 0, 1):
blobContainer.readBlob("read_blob_incomplete")) {
Streams.readFully(stream);
}
@ -308,7 +309,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$");
private static Tuple<Long, Long> getRange(HttpExchange exchange) {
protected static Tuple<Long, Long> getRange(HttpExchange exchange) {
final String rangeHeader = exchange.getRequestHeaders().getFirst("Range");
if (rangeHeader == null) {
return Tuple.tuple(0L, MAX_RANGE_VAL);
@ -348,7 +349,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
}
exchange.getResponseHeaders().add("Content-Type", bytesContentType());
exchange.sendResponseHeaders(HttpStatus.SC_OK, length);
int minSend = Math.min(minIncompleteContentToSend(), length - 1);
int minSend = Math.min(0, length - 1);
final int bytesToSend = randomIntBetween(minSend, length - 1);
if (bytesToSend > 0) {
exchange.getResponseBody().write(bytes, rangeStart, bytesToSend);
@ -358,10 +359,6 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase {
}
}
protected int minIncompleteContentToSend() {
return 0;
}
/**
* A resettable InputStream that only serves zeros.
**/