From ba39c261e8e0e657f4f59a7b7dc21ea175244493 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Tue, 21 Apr 2020 13:13:09 +0200 Subject: [PATCH] Use streaming reads for GCS (#55506) To read from GCS repositories we're currently using Google SDK's official BlobReadChannel, which issues a new request every 2MB (default chunk size for BlobReadChannel) using range requests, and fully downloads the chunk before exposing it to the returned InputStream. This means that the SDK issues an awfully high number of requests to download large blobs. Increasing the chunk size is not an option, as that will mean that an awfully high amount of heap memory will be consumed by the download process. The Google SDK does not provide the right abstractions for a streaming download. This PR uses the lower-level primitives of the SDK to implement a streaming download, similar to what S3's SDK does. Also closes #55505 --- .../gcs/GoogleCloudStorageBlobStore.java | 5 +- ...GoogleCloudStorageRetryingInputStream.java | 216 ++++++++++-------- ...CloudStorageBlobContainerRetriesTests.java | 22 +- .../gcs/GoogleCloudStorageHttpHandler.java | 17 +- .../AbstractBlobContainerRetriesTestCase.java | 15 +- 5 files changed, 150 insertions(+), 125 deletions(-) diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 85105f51393..5078cf8e22e 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -197,7 +197,7 @@ class GoogleCloudStorageBlobStore implements BlobStore { * @return the InputStream used to read the blob's content */ InputStream readBlob(String blobName) throws IOException { - return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), 0, Long.MAX_VALUE); + return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName)); } /** @@ -218,7 +218,8 @@ class GoogleCloudStorageBlobStore implements BlobStore { if (length == 0) { return new ByteArrayInputStream(new byte[0]); } else { - return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), position, length); + return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName), position, + Math.addExact(position, length - 1)); } } diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java index c2ad0155dc6..e0a09b7903f 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageRetryingInputStream.java @@ -18,26 +18,31 @@ */ package org.elasticsearch.repositories.gcs; -import com.google.cloud.ReadChannel; +import com.google.api.client.http.HttpResponse; +import com.google.api.services.storage.Storage.Objects.Get; +import com.google.cloud.BaseService; +import com.google.cloud.RetryHelper; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.spi.v1.HttpStorageRpc; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.SpecialPermission; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.core.internal.io.IOUtils; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.ByteBuffer; -import java.nio.channels.Channels; -import java.nio.channels.SeekableByteChannel; +import java.lang.reflect.Field; import java.nio.file.NoSuchFileException; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.List; - -import static java.net.HttpURLConnection.HTTP_NOT_FOUND; +import java.util.stream.Stream; /** * Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred. @@ -51,13 +56,14 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { static final int MAX_SUPPRESSED_EXCEPTIONS = 10; private final Storage client; + private final com.google.api.services.storage.Storage storage; private final BlobId blobId; private final long start; - private final long length; + private final long end; - private final int maxRetries; + private final int maxAttempts; private InputStream currentStream; private int attempt = 1; @@ -65,100 +71,127 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { private long currentOffset; private boolean closed; - GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId, long start, long length) throws IOException { + GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException { + this(client, blobId, 0, Long.MAX_VALUE - 1); + } + + // both start and end are inclusive bounds, following the definition in https://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.35 + GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId, long start, long end) throws IOException { + if (start < 0L) { + throw new IllegalArgumentException("start must be non-negative"); + } + if (end < start || end == Long.MAX_VALUE) { + throw new IllegalArgumentException("end must be >= start and not Long.MAX_VALUE"); + } this.client = client; this.blobId = blobId; this.start = start; - this.length = length; - this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1; + this.end = end; + this.maxAttempts = client.getOptions().getRetrySettings().getMaxAttempts(); + SpecialPermission.check(); + storage = getStorage(client); currentStream = openStream(); } - private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024; + @SuppressForbidden(reason = "need access to storage client") + private static com.google.api.services.storage.Storage getStorage(Storage client) { + return AccessController.doPrivileged((PrivilegedAction) () -> { + assert client.getOptions().getRpc() instanceof HttpStorageRpc; + assert Stream.of(client.getOptions().getRpc().getClass().getDeclaredFields()).anyMatch(f -> f.getName().equals("storage")); + try { + final Field storageField = client.getOptions().getRpc().getClass().getDeclaredField("storage"); + storageField.setAccessible(true); + return (com.google.api.services.storage.Storage) storageField.get(client.getOptions().getRpc()); + } catch (Exception e) { + throw new IllegalStateException("storage could not be set up", e); + } + }); + } private InputStream openStream() throws IOException { try { - final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId)); - final long end = start + length < 0L ? Long.MAX_VALUE : start + length; // inclusive - final SeekableByteChannel adaptedChannel = new SeekableByteChannel() { + try { + return RetryHelper.runWithRetries(() -> { + try { + return SocketAccess.doPrivilegedIOException(() -> { + final Get get = storage.objects().get(blobId.getBucket(), blobId.getName()); + get.setReturnRawInputStream(true); - long position; - - @SuppressForbidden(reason = "Channel is based of a socket not a file") - @Override - public int read(ByteBuffer dst) throws IOException { - final long remainingBytesToRead = end - position; - assert remainingBytesToRead >= 0L; - // The SDK uses the maximum between chunk size and dst.remaining() to determine fetch size - // We can be smarter here and only fetch what's needed when we know the length - if (remainingBytesToRead < DEFAULT_CHUNK_SIZE) { - readChannel.setChunkSize(Math.toIntExact(remainingBytesToRead)); - } - if (remainingBytesToRead < dst.remaining()) { - dst.limit(dst.position() + Math.toIntExact(remainingBytesToRead)); - } - try { - int read = SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst)); - if (read > 0) { - position += read; + if (currentOffset > 0 || start > 0 || end < Long.MAX_VALUE - 1) { + get.getRequestHeaders().setRange("bytes=" + Math.addExact(start, currentOffset) + "-" + end); + } + final HttpResponse resp = get.executeMedia(); + final Long contentLength = resp.getHeaders().getContentLength(); + InputStream content = resp.getContent(); + if (contentLength != null) { + content = new ContentLengthValidatingInputStream(content, contentLength); + } + return content; + }); + } catch (IOException e) { + throw StorageException.translate(e); } - return read; - } catch (StorageException e) { - if (e.getCode() == HTTP_NOT_FOUND) { - throw new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + e.getMessage()); - } - throw e; - } finally { - readChannel.setChunkSize(0); // set to default again - } - } - - @Override - public int write(ByteBuffer src) { - throw new UnsupportedOperationException(); - } - - @Override - public long position() { - return position; - } - - @Override - public SeekableByteChannel position(long newPosition) throws IOException { - readChannel.seek(newPosition); - this.position = newPosition; - return this; - } - - @Override - public long size() { - return length; - } - - @Override - public SeekableByteChannel truncate(long size) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isOpen() { - return readChannel.isOpen(); - } - - @Override - public void close() throws IOException { - SocketAccess.doPrivilegedVoidIOException(readChannel::close); - } - }; - if (currentOffset > 0 || start > 0) { - adaptedChannel.position(Math.addExact(start, currentOffset)); + }, client.getOptions().getRetrySettings(), BaseService.EXCEPTION_HANDLER, client.getOptions().getClock()); + } catch (RetryHelper.RetryHelperException e) { + throw StorageException.translateAndThrow(e); } - return Channels.newInputStream(adaptedChannel); } catch (StorageException e) { + if (e.getCode() == 404) { + throw addSuppressedExceptions( + new NoSuchFileException("Blob object [" + blobId.getName() + "] not found: " + e.getMessage())); + } throw addSuppressedExceptions(e); } } + // Google's SDK ignores the Content-Length header when no bytes are sent, see NetHttpResponse.SizeValidatingInputStream + // We have to implement our own validation logic here + static final class ContentLengthValidatingInputStream extends FilterInputStream { + private final long contentLength; + + private long read = 0L; + + ContentLengthValidatingInputStream(InputStream in, long contentLength) { + super(in); + this.contentLength = contentLength; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + final int n = in.read(b, off, len); + if (n == -1) { + checkContentLengthOnEOF(); + } else { + read += n; + } + return n; + } + + @Override + public int read() throws IOException { + final int n = in.read(); + if (n == -1) { + checkContentLengthOnEOF(); + } else { + read++; + } + return n; + } + + @Override + public long skip(long len) throws IOException { + final long n = in.skip(len); + read += n; + return n; + } + + private void checkContentLengthOnEOF() throws IOException { + if (read < contentLength) { + throw new IOException("Connection closed prematurely: read = " + read + ", Content-Length = " + contentLength); + } + } + } + @Override public int read() throws IOException { ensureOpen(); @@ -167,8 +200,8 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { final int result = currentStream.read(); currentOffset += 1; return result; - } catch (StorageException e) { - reopenStreamOrFail(e); + } catch (IOException e) { + reopenStreamOrFail(StorageException.translate(e)); } } } @@ -184,8 +217,8 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { } currentOffset += bytesRead; return bytesRead; - } catch (StorageException e) { - reopenStreamOrFail(e); + } catch (IOException e) { + reopenStreamOrFail(StorageException.translate(e)); } } } @@ -197,12 +230,13 @@ class GoogleCloudStorageRetryingInputStream extends InputStream { } } + // TODO: check that object did not change when stream is reopened (e.g. based on etag) private void reopenStreamOrFail(StorageException e) throws IOException { - if (attempt >= maxRetries) { + if (attempt >= maxAttempts) { throw addSuppressedExceptions(e); } logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying", - blobId, currentOffset, attempt, maxRetries), e); + blobId, currentOffset, attempt, maxAttempts), e); attempt += 1; if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) { failures.add(e); diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java index 0224759ed49..d7fb7411e0c 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobContainerRetriesTests.java @@ -99,13 +99,6 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon GoogleCloudStorageBlobStoreRepositoryTests.assumeNotJava8(); } - // Google's SDK ignores Content-Length header when no bytes are sent, see SizeValidatingInputStream - // TODO: fix this in the SDK - @Override - protected int minIncompleteContentToSend() { - return 1; - } - @Override protected String downloadStorageEndpoint(String blob) { return "/download/storage/v1/b/bucket/o/" + blob; @@ -153,7 +146,7 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon .setRpcTimeoutMultiplier(options.getRetrySettings().getRpcTimeoutMultiplier()) .setMaxRpcTimeout(Duration.ofSeconds(1)); if (maxRetries != null) { - retrySettingsBuilder.setMaxAttempts(maxRetries); + retrySettingsBuilder.setMaxAttempts(maxRetries + 1); } return options.toBuilder() .setHost(options.getHost()) @@ -191,10 +184,9 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon httpServer.createContext("/download/storage/v1/b/bucket/o/large_blob_retries", exchange -> { Streams.readFully(exchange.getRequestBody()); exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); - final String[] range = exchange.getRequestHeaders().get("Range").get(0).substring("bytes=".length()).split("-"); - final int offset = Integer.parseInt(range[0]); - final int end = Integer.parseInt(range[1]); - final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.min(end + 1, bytes.length)); + final Tuple range = getRange(exchange); + final int offset = Math.toIntExact(range.v1()); + final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.toIntExact(Math.min(range.v2() + 1, bytes.length))); exchange.sendResponseHeaders(RestStatus.OK.getStatus(), chunk.length); if (randomBoolean() && countDown.decrementAndGet() >= 0) { exchange.getResponseBody().write(chunk, 0, chunk.length - 1); @@ -400,10 +392,4 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends AbstractBlobCon } }; } - - @Override - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/55505") - public void testReadRangeBlobWithRetries() throws Exception { - - } } diff --git a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java index a05a4603cb3..ed2c33360ef 100644 --- a/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java +++ b/test/fixtures/gcs-fixture/src/main/java/fixture/gcs/GoogleCloudStorageHttpHandler.java @@ -138,12 +138,19 @@ public class GoogleCloudStorageHttpHandler implements HttpHandler { BytesReference blob = blobs.get(exchange.getRequestURI().getPath().replace("/download/storage/v1/b/" + bucket + "/o/", "")); if (blob != null) { final String range = exchange.getRequestHeaders().getFirst("Range"); - Matcher matcher = RANGE_MATCHER.matcher(range); - if (matcher.find() == false) { - throw new AssertionError("Range bytes header does not match expected format: " + range); + final int offset; + final int end; + if (range == null) { + offset = 0; + end = blob.length() - 1; + } else { + Matcher matcher = RANGE_MATCHER.matcher(range); + if (matcher.find() == false) { + throw new AssertionError("Range bytes header does not match expected format: " + range); + } + offset = Integer.parseInt(matcher.group(1)); + end = Integer.parseInt(matcher.group(2)); } - final int offset = Integer.parseInt(matcher.group(1)); - final int end = Integer.parseInt(matcher.group(2)); BytesReference response = blob; exchange.getResponseHeaders().add("Content-Type", "application/octet-stream"); final int bufferedLength = response.length(); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java index e394f10dafc..13428f914c0 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/AbstractBlobContainerRetriesTestCase.java @@ -250,7 +250,8 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { assertThat(exception, either(instanceOf(SocketTimeoutException.class)).or(instanceOf(ConnectionClosedException.class)) .or(instanceOf(RuntimeException.class))); assertThat(exception.getMessage().toLowerCase(Locale.ROOT), either(containsString("read timed out")).or( - containsString("premature end of chunk coded message body: closing chunk expected")).or(containsString("Read timed out"))); + containsString("premature end of chunk coded message body: closing chunk expected")).or(containsString("Read timed out")) + .or(containsString("unexpected end of file from server"))); assertThat(exception.getSuppressed().length, equalTo(maxRetries)); } @@ -278,7 +279,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { final BlobContainer blobContainer = createBlobContainer(maxRetries, null, null, null); // HTTP server sends a partial response - final byte[] bytes = randomBlobContent(minIncompleteContentToSend() + 1); + final byte[] bytes = randomBlobContent(1); httpServer.createContext(downloadStorageEndpoint("read_blob_incomplete"), exchange -> { sendIncompleteContent(exchange, bytes); exchange.close(); @@ -286,7 +287,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { final Exception exception = expectThrows(Exception.class, () -> { try (InputStream stream = randomBoolean() ? - blobContainer.readBlob("read_blob_incomplete", 0, minIncompleteContentToSend() + 1): + blobContainer.readBlob("read_blob_incomplete", 0, 1): blobContainer.readBlob("read_blob_incomplete")) { Streams.readFully(stream); } @@ -308,7 +309,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { private static final Pattern RANGE_PATTERN = Pattern.compile("^bytes=([0-9]+)-([0-9]+)$"); - private static Tuple getRange(HttpExchange exchange) { + protected static Tuple getRange(HttpExchange exchange) { final String rangeHeader = exchange.getRequestHeaders().getFirst("Range"); if (rangeHeader == null) { return Tuple.tuple(0L, MAX_RANGE_VAL); @@ -348,7 +349,7 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { } exchange.getResponseHeaders().add("Content-Type", bytesContentType()); exchange.sendResponseHeaders(HttpStatus.SC_OK, length); - int minSend = Math.min(minIncompleteContentToSend(), length - 1); + int minSend = Math.min(0, length - 1); final int bytesToSend = randomIntBetween(minSend, length - 1); if (bytesToSend > 0) { exchange.getResponseBody().write(bytes, rangeStart, bytesToSend); @@ -358,10 +359,6 @@ public abstract class AbstractBlobContainerRetriesTestCase extends ESTestCase { } } - protected int minIncompleteContentToSend() { - return 0; - } - /** * A resettable InputStream that only serves zeros. **/