* Implement Bulk Deletes for GCS Repository (#41368) * Just like #40322 for AWS * We already had a bulk delete API but weren't using it from the blob container implementation, now we are using it * Made the bulk delete API also compliant with our interface that only suppresses errors about non existent blobs by stating failed deletes (I didn't use any bulk stat action here since having to stat here should be the exception anyway and it would make error handling a lot more complex) * Fixed bulk delete API to limit its batch size to 100 in line with GCS recommendations back port of #41368
This commit is contained in:
parent
53702efddd
commit
7cc4b9a8b3
|
@ -26,7 +26,9 @@ import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
|
class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
|
||||||
|
|
||||||
|
@ -78,7 +80,12 @@ class GoogleCloudStorageBlobContainer extends AbstractBlobContainer {
|
||||||
blobStore.deleteBlob(buildKey(blobName));
|
blobStore.deleteBlob(buildKey(blobName));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String buildKey(String blobName) {
|
@Override
|
||||||
|
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
|
||||||
|
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String buildKey(String blobName) {
|
||||||
assert blobName != null;
|
assert blobName != null;
|
||||||
return path + blobName;
|
return path + blobName;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.repositories.gcs;
|
package org.elasticsearch.repositories.gcs;
|
||||||
|
|
||||||
|
import com.google.cloud.BatchResult;
|
||||||
import com.google.cloud.ReadChannel;
|
import com.google.cloud.ReadChannel;
|
||||||
import com.google.cloud.WriteChannel;
|
import com.google.cloud.WriteChannel;
|
||||||
import com.google.cloud.storage.Blob;
|
import com.google.cloud.storage.Blob;
|
||||||
|
@ -27,10 +28,9 @@ import com.google.cloud.storage.BlobInfo;
|
||||||
import com.google.cloud.storage.Bucket;
|
import com.google.cloud.storage.Bucket;
|
||||||
import com.google.cloud.storage.Storage;
|
import com.google.cloud.storage.Storage;
|
||||||
import com.google.cloud.storage.Storage.BlobListOption;
|
import com.google.cloud.storage.Storage.BlobListOption;
|
||||||
|
import com.google.cloud.storage.StorageBatch;
|
||||||
import com.google.cloud.storage.StorageException;
|
import com.google.cloud.storage.StorageException;
|
||||||
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
|
||||||
import org.apache.logging.log4j.Logger;
|
|
||||||
import org.elasticsearch.common.SuppressForbidden;
|
import org.elasticsearch.common.SuppressForbidden;
|
||||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||||
|
@ -50,17 +50,18 @@ import java.nio.channels.ReadableByteChannel;
|
||||||
import java.nio.channels.WritableByteChannel;
|
import java.nio.channels.WritableByteChannel;
|
||||||
import java.nio.file.FileAlreadyExistsException;
|
import java.nio.file.FileAlreadyExistsException;
|
||||||
import java.nio.file.NoSuchFileException;
|
import java.nio.file.NoSuchFileException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
|
||||||
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
|
import static java.net.HttpURLConnection.HTTP_PRECON_FAILED;
|
||||||
|
|
||||||
class GoogleCloudStorageBlobStore implements BlobStore {
|
class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
|
|
||||||
private static final Logger logger = LogManager.getLogger(GoogleCloudStorageBlobStore.class);
|
|
||||||
|
|
||||||
// The recommended maximum size of a blob that should be uploaded in a single
|
// The recommended maximum size of a blob that should be uploaded in a single
|
||||||
// request. Larger files should be uploaded over multiple requests (this is
|
// request. Larger files should be uploaded over multiple requests (this is
|
||||||
|
@ -105,7 +106,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
* @param bucketName name of the bucket
|
* @param bucketName name of the bucket
|
||||||
* @return true iff the bucket exists
|
* @return true iff the bucket exists
|
||||||
*/
|
*/
|
||||||
boolean doesBucketExist(String bucketName) {
|
private boolean doesBucketExist(String bucketName) {
|
||||||
try {
|
try {
|
||||||
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName));
|
final Bucket bucket = SocketAccess.doPrivilegedIOException(() -> client().get(bucketName));
|
||||||
return bucket != null;
|
return bucket != null;
|
||||||
|
@ -295,8 +296,8 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
*
|
*
|
||||||
* @param prefix prefix of the blobs to delete
|
* @param prefix prefix of the blobs to delete
|
||||||
*/
|
*/
|
||||||
void deleteBlobsByPrefix(String prefix) throws IOException {
|
private void deleteBlobsByPrefix(String prefix) throws IOException {
|
||||||
deleteBlobs(listBlobsByPrefix("", prefix).keySet());
|
deleteBlobsIgnoringIfNotExists(listBlobsByPrefix("", prefix).keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -304,7 +305,7 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
*
|
*
|
||||||
* @param blobNames names of the blobs to delete
|
* @param blobNames names of the blobs to delete
|
||||||
*/
|
*/
|
||||||
void deleteBlobs(Collection<String> blobNames) throws IOException {
|
void deleteBlobsIgnoringIfNotExists(Collection<String> blobNames) throws IOException {
|
||||||
if (blobNames.isEmpty()) {
|
if (blobNames.isEmpty()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -314,17 +315,33 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList());
|
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList());
|
||||||
final List<Boolean> deletedStatuses = SocketAccess.doPrivilegedIOException(() -> client().delete(blobIdsToDelete));
|
final List<BlobId> failedBlobs = Collections.synchronizedList(new ArrayList<>());
|
||||||
assert blobIdsToDelete.size() == deletedStatuses.size();
|
final StorageException e = SocketAccess.doPrivilegedIOException(() -> {
|
||||||
boolean failed = false;
|
final AtomicReference<StorageException> ioe = new AtomicReference<>();
|
||||||
for (int i = 0; i < blobIdsToDelete.size(); i++) {
|
final StorageBatch batch = client().batch();
|
||||||
if (deletedStatuses.get(i) == false) {
|
for (BlobId blob : blobIdsToDelete) {
|
||||||
logger.error("Failed to delete blob [{}] in bucket [{}]", blobIdsToDelete.get(i).getName(), bucketName);
|
batch.delete(blob).notify(
|
||||||
failed = true;
|
new BatchResult.Callback<Boolean, StorageException>() {
|
||||||
|
@Override
|
||||||
|
public void success(Boolean result) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void error(StorageException exception) {
|
||||||
|
if (exception.getCode() != HTTP_NOT_FOUND) {
|
||||||
|
failedBlobs.add(blob);
|
||||||
|
if (ioe.compareAndSet(null, exception) == false) {
|
||||||
|
ioe.get().addSuppressed(exception);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
batch.submit();
|
||||||
if (failed) {
|
return ioe.get();
|
||||||
throw new IOException("Failed to delete all [" + blobIdsToDelete.size() + "] blobs");
|
});
|
||||||
|
if (e != null) {
|
||||||
|
throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.repositories.gcs;
|
package org.elasticsearch.repositories.gcs;
|
||||||
|
|
||||||
import com.google.api.gax.paging.Page;
|
import com.google.api.gax.paging.Page;
|
||||||
|
import com.google.cloud.BatchResult;
|
||||||
import com.google.cloud.Policy;
|
import com.google.cloud.Policy;
|
||||||
import com.google.cloud.ReadChannel;
|
import com.google.cloud.ReadChannel;
|
||||||
import com.google.cloud.RestorableState;
|
import com.google.cloud.RestorableState;
|
||||||
|
@ -34,11 +35,13 @@ import com.google.cloud.storage.CopyWriter;
|
||||||
import com.google.cloud.storage.ServiceAccount;
|
import com.google.cloud.storage.ServiceAccount;
|
||||||
import com.google.cloud.storage.Storage;
|
import com.google.cloud.storage.Storage;
|
||||||
import com.google.cloud.storage.StorageBatch;
|
import com.google.cloud.storage.StorageBatch;
|
||||||
|
import com.google.cloud.storage.StorageBatchResult;
|
||||||
import com.google.cloud.storage.StorageException;
|
import com.google.cloud.storage.StorageException;
|
||||||
import com.google.cloud.storage.StorageOptions;
|
import com.google.cloud.storage.StorageOptions;
|
||||||
import com.google.cloud.storage.StorageRpcOptionUtils;
|
import com.google.cloud.storage.StorageRpcOptionUtils;
|
||||||
import com.google.cloud.storage.StorageTestUtils;
|
import com.google.cloud.storage.StorageTestUtils;
|
||||||
import org.elasticsearch.core.internal.io.IOUtils;
|
import org.elasticsearch.core.internal.io.IOUtils;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
|
@ -57,6 +60,11 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyVararg;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
|
* {@link MockStorage} mocks a {@link Storage} client by storing all the blobs
|
||||||
* in a given concurrent map.
|
* in a given concurrent map.
|
||||||
|
@ -356,8 +364,25 @@ class MockStorage implements Storage {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
public StorageBatch batch() {
|
public StorageBatch batch() {
|
||||||
return null;
|
final Answer<?> throwOnMissingMock = invocationOnMock -> {
|
||||||
|
throw new AssertionError("Did not expect call to method [" + invocationOnMock.getMethod().getName() + ']');
|
||||||
|
};
|
||||||
|
final StorageBatch batch = mock(StorageBatch.class, throwOnMissingMock);
|
||||||
|
StorageBatchResult<Boolean> result = mock(StorageBatchResult.class, throwOnMissingMock);
|
||||||
|
doAnswer(answer -> {
|
||||||
|
BatchResult.Callback<Boolean, Exception> callback = (BatchResult.Callback<Boolean, Exception>) answer.getArguments()[0];
|
||||||
|
callback.success(true);
|
||||||
|
return null;
|
||||||
|
}).when(result).notify(any(BatchResult.Callback.class));
|
||||||
|
doAnswer(invocation -> {
|
||||||
|
final BlobId blobId = (BlobId) invocation.getArguments()[0];
|
||||||
|
delete(blobId);
|
||||||
|
return result;
|
||||||
|
}).when(batch).delete(any(BlobId.class), anyVararg());
|
||||||
|
doAnswer(invocation -> null).when(batch).submit();
|
||||||
|
return batch;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue