A resumable upload session can fail on with a 410 error and should be retried in that case. I added retrying twice using resetting of the given `InputStream` as the retry mechanism since the same approach is used by the AWS S3 SDK already as well and relied upon by the S3 repository implementation. Related GCS documentation: https://cloud.google.com/storage/docs/json_api/v1/status-codes#410_Gone
This commit is contained in:
parent
2c70d403fc
commit
371c355bca
|
@ -31,6 +31,10 @@ import com.google.cloud.storage.Storage;
|
||||||
import com.google.cloud.storage.Storage.BlobListOption;
|
import com.google.cloud.storage.Storage.BlobListOption;
|
||||||
import com.google.cloud.storage.StorageBatch;
|
import com.google.cloud.storage.StorageBatch;
|
||||||
import com.google.cloud.storage.StorageException;
|
import com.google.cloud.storage.StorageException;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
|
import org.elasticsearch.ExceptionsHelper;
|
||||||
import org.elasticsearch.common.SuppressForbidden;
|
import org.elasticsearch.common.SuppressForbidden;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||||
|
@ -60,16 +64,19 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static java.net.HttpURLConnection.HTTP_GONE;
|
||||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||||
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
|
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
|
||||||
|
|
||||||
class GoogleCloudStorageBlobStore implements BlobStore {
|
class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
|
|
||||||
|
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);
|
||||||
|
|
||||||
// The recommended maximum size of a blob that should be uploaded in a single
|
// The recommended maximum size of a blob that should be uploaded in a single
|
||||||
// request. Larger files should be uploaded over multiple requests (this is
|
// request. Larger files should be uploaded over multiple requests (this is
|
||||||
// called "resumable upload")
|
// called "resumable upload")
|
||||||
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
|
// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload
|
||||||
private static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
|
public static final int LARGE_BLOB_THRESHOLD_BYTE_SIZE = 5 * 1024 * 1024;
|
||||||
|
|
||||||
private final String bucketName;
|
private final String bucketName;
|
||||||
private final String clientName;
|
private final String clientName;
|
||||||
|
@ -224,35 +231,53 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
|
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
|
||||||
*/
|
*/
|
||||||
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
|
private void writeBlobResumable(BlobInfo blobInfo, InputStream inputStream, boolean failIfAlreadyExists) throws IOException {
|
||||||
try {
|
// We retry 410 GONE errors to cover the unlikely but possible scenario where a resumable upload session becomes broken and
|
||||||
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
|
// needs to be restarted from scratch. Given how unlikely a 410 error should be according to SLAs we retry only twice.
|
||||||
new Storage.BlobWriteOption[] { Storage.BlobWriteOption.doesNotExist() } :
|
assert inputStream.markSupported();
|
||||||
new Storage.BlobWriteOption[0];
|
inputStream.mark(Integer.MAX_VALUE);
|
||||||
final WriteChannel writeChannel = SocketAccess
|
StorageException storageException = null;
|
||||||
|
final Storage.BlobWriteOption[] writeOptions = failIfAlreadyExists ?
|
||||||
|
new Storage.BlobWriteOption[]{Storage.BlobWriteOption.doesNotExist()} : new Storage.BlobWriteOption[0];
|
||||||
|
for (int retry = 0; retry < 3; ++retry) {
|
||||||
|
try {
|
||||||
|
final WriteChannel writeChannel = SocketAccess
|
||||||
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
|
.doPrivilegedIOException(() -> client().writer(blobInfo, writeOptions));
|
||||||
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
|
Streams.copy(inputStream, Channels.newOutputStream(new WritableByteChannel() {
|
||||||
@Override
|
@Override
|
||||||
public boolean isOpen() {
|
public boolean isOpen() {
|
||||||
return writeChannel.isOpen();
|
return writeChannel.isOpen();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
|
SocketAccess.doPrivilegedVoidIOException(writeChannel::close);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressForbidden(reason = "Channel is based of a socket not a file")
|
@SuppressForbidden(reason = "Channel is based of a socket not a file")
|
||||||
@Override
|
@Override
|
||||||
public int write(ByteBuffer src) throws IOException {
|
public int write(ByteBuffer src) throws IOException {
|
||||||
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
|
return SocketAccess.doPrivilegedIOException(() -> writeChannel.write(src));
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
return;
|
||||||
|
} catch (final StorageException se) {
|
||||||
|
final int errorCode = se.getCode();
|
||||||
|
if (errorCode == HTTP_GONE) {
|
||||||
|
logger.warn(() -> new ParameterizedMessage("Retrying broken resumable upload session for blob {}", blobInfo), se);
|
||||||
|
storageException = ExceptionsHelper.useOrSuppress(storageException, se);
|
||||||
|
inputStream.reset();
|
||||||
|
continue;
|
||||||
|
} else if (failIfAlreadyExists && errorCode == HTTP_PRECON_FAILED) {
|
||||||
|
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
|
||||||
}
|
}
|
||||||
}));
|
if (storageException != null) {
|
||||||
} catch (final StorageException se) {
|
se.addSuppressed(storageException);
|
||||||
if (failIfAlreadyExists && se.getCode() == HTTP_PRECON_FAILED) {
|
}
|
||||||
throw new FileAlreadyExistsException(blobInfo.getBlobId().getName(), null, se.getMessage());
|
throw se;
|
||||||
}
|
}
|
||||||
throw se;
|
|
||||||
}
|
}
|
||||||
|
assert storageException != null;
|
||||||
|
throw storageException;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -19,12 +19,21 @@
|
||||||
|
|
||||||
package org.elasticsearch.repositories.gcs;
|
package org.elasticsearch.repositories.gcs;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.BytesRef;
|
||||||
|
import org.apache.lucene.util.BytesRefBuilder;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobPath;
|
||||||
import org.elasticsearch.common.blobstore.BlobStore;
|
import org.elasticsearch.common.blobstore.BlobStore;
|
||||||
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -37,10 +46,35 @@ public class GoogleCloudStorageBlobStoreContainerTests extends ESBlobStoreContai
|
||||||
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
||||||
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
|
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
|
||||||
try {
|
try {
|
||||||
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
|
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService);
|
return new GoogleCloudStorageBlobStore(bucketName, clientName, storageService);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testWriteReadLarge() throws IOException {
|
||||||
|
try(BlobStore store = newBlobStore()) {
|
||||||
|
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||||
|
byte[] data = randomBytes(GoogleCloudStorageBlobStore.LARGE_BLOB_THRESHOLD_BYTE_SIZE + 1);
|
||||||
|
writeBlob(container, "foobar", new BytesArray(data), randomBoolean());
|
||||||
|
if (randomBoolean()) {
|
||||||
|
// override file, to check if we get latest contents
|
||||||
|
random().nextBytes(data);
|
||||||
|
writeBlob(container, "foobar", new BytesArray(data), false);
|
||||||
|
}
|
||||||
|
try (InputStream stream = container.readBlob("foobar")) {
|
||||||
|
BytesRefBuilder target = new BytesRefBuilder();
|
||||||
|
while (target.length() < data.length) {
|
||||||
|
byte[] buffer = new byte[scaledRandomIntBetween(1, data.length - target.length())];
|
||||||
|
int offset = scaledRandomIntBetween(0, buffer.length - 1);
|
||||||
|
int read = stream.read(buffer, offset, buffer.length - offset);
|
||||||
|
target.append(new BytesRef(buffer, offset, read));
|
||||||
|
}
|
||||||
|
assertEquals(data.length, target.length());
|
||||||
|
assertArrayEquals(data, Arrays.copyOfRange(target.bytes(), 0, target.length()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,7 +125,6 @@ public class GoogleCloudStorageBlobStoreRepositoryTests extends ESMockAPIBasedRe
|
||||||
final MockSecureSettings secureSettings = new MockSecureSettings();
|
final MockSecureSettings secureSettings = new MockSecureSettings();
|
||||||
secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace("test").getKey(), serviceAccount);
|
secureSettings.setFile(CREDENTIALS_FILE_SETTING.getConcreteSettingForNamespace("test").getKey(), serviceAccount);
|
||||||
settings.setSecureSettings(secureSettings);
|
settings.setSecureSettings(secureSettings);
|
||||||
|
|
||||||
return settings.build();
|
return settings.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ public class GoogleCloudStorageBlobStoreTests extends ESBlobStoreTestCase {
|
||||||
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
final String clientName = randomAlphaOfLength(randomIntBetween(1, 10)).toLowerCase(Locale.ROOT);
|
||||||
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
|
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
|
||||||
try {
|
try {
|
||||||
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>()));
|
when(storageService.client(any(String.class))).thenReturn(new MockStorage(bucketName, new ConcurrentHashMap<>(), random()));
|
||||||
} catch (final Exception e) {
|
} catch (final Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,6 +40,7 @@ import com.google.cloud.storage.StorageException;
|
||||||
import com.google.cloud.storage.StorageOptions;
|
import com.google.cloud.storage.StorageOptions;
|
||||||
import com.google.cloud.storage.StorageRpcOptionUtils;
|
import com.google.cloud.storage.StorageRpcOptionUtils;
|
||||||
import com.google.cloud.storage.StorageTestUtils;
|
import com.google.cloud.storage.StorageTestUtils;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -47,6 +48,7 @@ import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.net.HttpURLConnection;
|
||||||
import java.net.URL;
|
import java.net.URL;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.Channels;
|
import java.nio.channels.Channels;
|
||||||
|
@ -55,6 +57,8 @@ import java.nio.channels.WritableByteChannel;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
@ -71,10 +75,12 @@ import static org.mockito.Mockito.mock;
|
||||||
*/
|
*/
|
||||||
class MockStorage implements Storage {
|
class MockStorage implements Storage {
|
||||||
|
|
||||||
|
private final Random random;
|
||||||
private final String bucketName;
|
private final String bucketName;
|
||||||
private final ConcurrentMap<String, byte[]> blobs;
|
private final ConcurrentMap<String, byte[]> blobs;
|
||||||
|
|
||||||
MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs) {
|
MockStorage(final String bucket, final ConcurrentMap<String, byte[]> blobs, final Random random) {
|
||||||
|
this.random = random;
|
||||||
this.bucketName = Objects.requireNonNull(bucket);
|
this.bucketName = Objects.requireNonNull(bucket);
|
||||||
this.blobs = Objects.requireNonNull(blobs);
|
this.blobs = Objects.requireNonNull(blobs);
|
||||||
}
|
}
|
||||||
|
@ -236,12 +242,16 @@ class MockStorage implements Storage {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private final Set<BlobInfo> simulated410s = ConcurrentCollections.newConcurrentSet();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
|
public WriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options) {
|
||||||
if (bucketName.equals(blobInfo.getBucket())) {
|
if (bucketName.equals(blobInfo.getBucket())) {
|
||||||
final ByteArrayOutputStream output = new ByteArrayOutputStream();
|
final ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||||
return new WriteChannel() {
|
return new WriteChannel() {
|
||||||
|
|
||||||
|
private volatile boolean failed;
|
||||||
|
|
||||||
final WritableByteChannel writableByteChannel = Channels.newChannel(output);
|
final WritableByteChannel writableByteChannel = Channels.newChannel(output);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -256,6 +266,11 @@ class MockStorage implements Storage {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int write(ByteBuffer src) throws IOException {
|
public int write(ByteBuffer src) throws IOException {
|
||||||
|
// Only fail a blob once on a 410 error since the error is so unlikely in practice
|
||||||
|
if (simulated410s.add(blobInfo) && random.nextBoolean()) {
|
||||||
|
failed = true;
|
||||||
|
throw new StorageException(HttpURLConnection.HTTP_GONE, "Simulated lost resumeable upload session");
|
||||||
|
}
|
||||||
return writableByteChannel.write(src);
|
return writableByteChannel.write(src);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -267,13 +282,15 @@ class MockStorage implements Storage {
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
IOUtils.closeWhileHandlingException(writableByteChannel);
|
IOUtils.closeWhileHandlingException(writableByteChannel);
|
||||||
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
|
if (failed == false) {
|
||||||
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
|
if (Stream.of(options).anyMatch(option -> option.equals(BlobWriteOption.doesNotExist()))) {
|
||||||
if (existingBytes != null) {
|
byte[] existingBytes = blobs.putIfAbsent(blobInfo.getName(), output.toByteArray());
|
||||||
throw new StorageException(412, "Blob already exists");
|
if (existingBytes != null) {
|
||||||
|
throw new StorageException(412, "Blob already exists");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
blobs.put(blobInfo.getName(), output.toByteArray());
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
blobs.put(blobInfo.getName(), output.toByteArray());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue