* Add Blob Download Retries to GCS Repository Exactly as #46589 (and kept as close to it as possible code wise so we can dry things up in a follow-up potentially) but for GCS. Closes #52319
This commit is contained in:
parent
01ec922d13
commit
aeb7b777e6
|
@ -21,7 +21,6 @@ package org.elasticsearch.repositories.gcs;
|
|||
|
||||
import com.google.api.gax.paging.Page;
|
||||
import com.google.cloud.BatchResult;
|
||||
import com.google.cloud.ReadChannel;
|
||||
import com.google.cloud.storage.Blob;
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.BlobInfo;
|
||||
|
@ -34,7 +33,6 @@ import org.apache.logging.log4j.LogManager;
|
|||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||
import org.elasticsearch.common.blobstore.BlobPath;
|
||||
|
@ -47,11 +45,8 @@ import org.elasticsearch.core.internal.io.Streams;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.nio.file.FileAlreadyExistsException;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -176,32 +171,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
|||
* @return the InputStream used to read the blob's content
|
||||
*/
|
||||
InputStream readBlob(String blobName) throws IOException {
|
||||
final BlobId blobId = BlobId.of(bucketName, blobName);
|
||||
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client().reader(blobId));
|
||||
return Channels.newInputStream(new ReadableByteChannel() {
|
||||
@SuppressForbidden(reason = "Channel is based of a socket not a file")
|
||||
@Override
|
||||
public int read(ByteBuffer dst) throws IOException {
|
||||
try {
|
||||
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
|
||||
} catch (StorageException e) {
|
||||
if (e.getCode() == HTTP_NOT_FOUND) {
|
||||
throw new NoSuchFileException("Blob [" + blobName + "] does not exist");
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return readChannel.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
|
||||
}
|
||||
});
|
||||
return new GoogleCloudStorageRetryingInputStream(client(), BlobId.of(bucketName, blobName));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,181 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.repositories.gcs;
|
||||
|
||||
import com.google.cloud.ReadChannel;
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.Channels;
|
||||
import java.nio.channels.ReadableByteChannel;
|
||||
import java.nio.file.NoSuchFileException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||
|
||||
/**
|
||||
* Wrapper around reads from GCS that will retry blob downloads that fail part-way through, resuming from where the failure occurred.
|
||||
* This should be handled by the SDK but it isn't today. This should be revisited in the future (e.g. before removing
|
||||
* the {@link org.elasticsearch.Version#V_7_0_0} version constant) and removed if the SDK handles retries itself in the future.
|
||||
*/
|
||||
class GoogleCloudStorageRetryingInputStream extends InputStream {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageRetryingInputStream.class);
|
||||
|
||||
static final int MAX_SUPPRESSED_EXCEPTIONS = 10;
|
||||
|
||||
private final Storage client;
|
||||
|
||||
private final BlobId blobId;
|
||||
|
||||
private final int maxRetries;
|
||||
|
||||
private InputStream currentStream;
|
||||
private int attempt = 1;
|
||||
private List<StorageException> failures = new ArrayList<>(MAX_SUPPRESSED_EXCEPTIONS);
|
||||
private long currentOffset;
|
||||
private boolean closed;
|
||||
|
||||
GoogleCloudStorageRetryingInputStream(Storage client, BlobId blobId) throws IOException {
|
||||
this.client = client;
|
||||
this.blobId = blobId;
|
||||
this.maxRetries = client.getOptions().getRetrySettings().getMaxAttempts() + 1;
|
||||
currentStream = openStream();
|
||||
}
|
||||
|
||||
private InputStream openStream() throws IOException {
|
||||
try {
|
||||
final ReadChannel readChannel = SocketAccess.doPrivilegedIOException(() -> client.reader(blobId));
|
||||
if (currentOffset > 0L) {
|
||||
readChannel.seek(currentOffset);
|
||||
}
|
||||
return Channels.newInputStream(new ReadableByteChannel() {
|
||||
@SuppressForbidden(reason = "Channel is based of a socket not a file")
|
||||
@Override
|
||||
public int read(ByteBuffer dst) throws IOException {
|
||||
try {
|
||||
return SocketAccess.doPrivilegedIOException(() -> readChannel.read(dst));
|
||||
} catch (StorageException e) {
|
||||
if (e.getCode() == HTTP_NOT_FOUND) {
|
||||
throw new NoSuchFileException("Blob [" + blobId.getName() + "] does not exist");
|
||||
}
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen() {
|
||||
return readChannel.isOpen();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
SocketAccess.doPrivilegedVoidIOException(readChannel::close);
|
||||
}
|
||||
});
|
||||
} catch (StorageException e) {
|
||||
throw addSuppressedExceptions(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read() throws IOException {
|
||||
ensureOpen();
|
||||
while (true) {
|
||||
try {
|
||||
final int result = currentStream.read();
|
||||
currentOffset += 1;
|
||||
return result;
|
||||
} catch (StorageException e) {
|
||||
reopenStreamOrFail(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] b, int off, int len) throws IOException {
|
||||
ensureOpen();
|
||||
while (true) {
|
||||
try {
|
||||
final int bytesRead = currentStream.read(b, off, len);
|
||||
if (bytesRead == -1) {
|
||||
return -1;
|
||||
}
|
||||
currentOffset += bytesRead;
|
||||
return bytesRead;
|
||||
} catch (StorageException e) {
|
||||
reopenStreamOrFail(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void ensureOpen() {
|
||||
if (closed) {
|
||||
assert false : "using GoogleCloudStorageRetryingInputStream after close";
|
||||
throw new IllegalStateException("using GoogleCloudStorageRetryingInputStream after close");
|
||||
}
|
||||
}
|
||||
|
||||
private void reopenStreamOrFail(StorageException e) throws IOException {
|
||||
if (attempt >= maxRetries) {
|
||||
throw addSuppressedExceptions(e);
|
||||
}
|
||||
logger.debug(new ParameterizedMessage("failed reading [{}] at offset [{}], attempt [{}] of [{}], retrying",
|
||||
blobId, currentOffset, attempt, MAX_SUPPRESSED_EXCEPTIONS), e);
|
||||
attempt += 1;
|
||||
if (failures.size() < MAX_SUPPRESSED_EXCEPTIONS) {
|
||||
failures.add(e);
|
||||
}
|
||||
IOUtils.closeWhileHandlingException(currentStream);
|
||||
currentStream = openStream();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
currentStream.close();
|
||||
closed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long skip(long n) {
|
||||
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
throw new UnsupportedOperationException("GoogleCloudStorageRetryingInputStream does not support seeking");
|
||||
}
|
||||
|
||||
private <T extends Exception> T addSuppressedExceptions(T e) {
|
||||
for (StorageException failure : failures) {
|
||||
e.addSuppressed(failure);
|
||||
}
|
||||
return e;
|
||||
}
|
||||
}
|
|
@ -200,6 +200,35 @@ public class GoogleCloudStorageBlobContainerRetriesTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testReadLargeBlobWithRetries() throws Exception {
|
||||
final int maxRetries = randomIntBetween(2, 10);
|
||||
final CountDown countDown = new CountDown(maxRetries);
|
||||
|
||||
// SDK reads in 2 MB chunks so we use twice that to simulate 2 chunks
|
||||
final byte[] bytes = randomBytes(1 << 22);
|
||||
httpServer.createContext("/download/storage/v1/b/bucket/o/large_blob_retries", exchange -> {
|
||||
Streams.readFully(exchange.getRequestBody());
|
||||
exchange.getResponseHeaders().add("Content-Type", "application/octet-stream");
|
||||
final String[] range = exchange.getRequestHeaders().get("Range").get(0).substring("bytes=".length()).split("-");
|
||||
final int offset = Integer.parseInt(range[0]);
|
||||
final int end = Integer.parseInt(range[1]);
|
||||
final byte[] chunk = Arrays.copyOfRange(bytes, offset, Math.min(end + 1, bytes.length));
|
||||
exchange.sendResponseHeaders(RestStatus.OK.getStatus(), chunk.length);
|
||||
if (randomBoolean() && countDown.countDown() == false) {
|
||||
exchange.getResponseBody().write(chunk, 0, chunk.length - 1);
|
||||
exchange.close();
|
||||
return;
|
||||
}
|
||||
exchange.getResponseBody().write(chunk);
|
||||
exchange.close();
|
||||
});
|
||||
|
||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, null);
|
||||
try (InputStream inputStream = blobContainer.readBlob("large_blob_retries")) {
|
||||
assertArrayEquals(bytes, BytesReference.toBytes(Streams.readFully(inputStream)));
|
||||
}
|
||||
}
|
||||
|
||||
public void testReadBlobWithReadTimeouts() {
|
||||
final int maxRetries = randomIntBetween(1, 3);
|
||||
final BlobContainer blobContainer = createBlobContainer(maxRetries, TimeValue.timeValueMillis(between(100, 200)));
|
||||
|
|
Loading…
Reference in New Issue