GCS deleteBlobsIgnoringIfNotExists should catch StorageException (#46832)
GoogleCloudStorageBlobStore.deleteBlobsIgnoringIfNotExists() does not correctly catch StorageException thrown by batch.submit(). In the case a snapshot is deleted through BlobStoreRepository.deleteSnapshot() a storage exception is not caught (only IOException are) so the deletion is interrupted and indices cannot be cleaned up. The storage exception bubbles up to SnapshotService.deleteSnapshotFromRepository() but the listener that removes the deletion from the cluster state is not executed, leaving the deletion in the cluster state. This bug has been reported in #46772 where batch.submit() threw an exception in the test testIndicesDeletedFromRepository and following tests failed because a snapshot deletion was running. Relates #46772
This commit is contained in:
parent
74d1588ec5
commit
add7148f3b
|
@ -364,31 +364,36 @@ class GoogleCloudStorageBlobStore implements BlobStore {
|
|||
}
|
||||
final List<BlobId> blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList());
|
||||
final List<BlobId> failedBlobs = Collections.synchronizedList(new ArrayList<>());
|
||||
final StorageException e = SocketAccess.doPrivilegedIOException(() -> {
|
||||
final AtomicReference<StorageException> ioe = new AtomicReference<>();
|
||||
final StorageBatch batch = client().batch();
|
||||
for (BlobId blob : blobIdsToDelete) {
|
||||
batch.delete(blob).notify(
|
||||
new BatchResult.Callback<Boolean, StorageException>() {
|
||||
@Override
|
||||
public void success(Boolean result) {
|
||||
}
|
||||
try {
|
||||
SocketAccess.doPrivilegedVoidIOException(() -> {
|
||||
final AtomicReference<StorageException> ioe = new AtomicReference<>();
|
||||
final StorageBatch batch = client().batch();
|
||||
for (BlobId blob : blobIdsToDelete) {
|
||||
batch.delete(blob).notify(
|
||||
new BatchResult.Callback<Boolean, StorageException>() {
|
||||
@Override
|
||||
public void success(Boolean result) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void error(StorageException exception) {
|
||||
if (exception.getCode() != HTTP_NOT_FOUND) {
|
||||
failedBlobs.add(blob);
|
||||
if (ioe.compareAndSet(null, exception) == false) {
|
||||
ioe.get().addSuppressed(exception);
|
||||
@Override
|
||||
public void error(StorageException exception) {
|
||||
if (exception.getCode() != HTTP_NOT_FOUND) {
|
||||
failedBlobs.add(blob);
|
||||
if (ioe.compareAndSet(null, exception) == false) {
|
||||
ioe.get().addSuppressed(exception);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
batch.submit();
|
||||
return ioe.get();
|
||||
});
|
||||
if (e != null) {
|
||||
});
|
||||
}
|
||||
batch.submit();
|
||||
|
||||
final StorageException exception = ioe.get();
|
||||
if (exception != null) {
|
||||
throw exception;
|
||||
}
|
||||
});
|
||||
} catch (final Exception e) {
|
||||
throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e);
|
||||
}
|
||||
assert failedBlobs.isEmpty();
|
||||
|
|
|
@ -19,6 +19,13 @@
|
|||
|
||||
package org.elasticsearch.repositories.gcs;
|
||||
|
||||
import com.google.cloud.BatchResult;
|
||||
import com.google.cloud.storage.BlobId;
|
||||
import com.google.cloud.storage.Bucket;
|
||||
import com.google.cloud.storage.Storage;
|
||||
import com.google.cloud.storage.StorageBatch;
|
||||
import com.google.cloud.storage.StorageBatchResult;
|
||||
import com.google.cloud.storage.StorageException;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.BytesRefBuilder;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
|
@ -30,11 +37,18 @@ import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
|
|||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -77,4 +91,51 @@ public class GoogleCloudStorageBlobStoreContainerTests extends ESBlobStoreContai
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Exception {
|
||||
final List<String> blobs = Arrays.asList("blobA", "blobB");
|
||||
|
||||
final StorageBatch batch = mock(StorageBatch.class);
|
||||
if (randomBoolean()) {
|
||||
StorageBatchResult<Boolean> result = mock(StorageBatchResult.class);
|
||||
when(batch.delete(any(BlobId.class))).thenReturn(result);
|
||||
doThrow(new StorageException(new IOException("Batch submit throws a storage exception"))).when(batch).submit();
|
||||
} else {
|
||||
StorageBatchResult<Boolean> resultA = mock(StorageBatchResult.class);
|
||||
doReturn(resultA).when(batch).delete(eq(BlobId.of("bucket", "blobA")));
|
||||
doAnswer(invocation -> {
|
||||
StorageException storageException = new StorageException(new IOException("Batched delete throws a storage exception"));
|
||||
((BatchResult.Callback) invocation.getArguments()[0]).error(storageException);
|
||||
return null;
|
||||
}).when(resultA).notify(any(StorageBatchResult.Callback.class));
|
||||
|
||||
StorageBatchResult<Boolean> resultB = mock(StorageBatchResult.class);
|
||||
doReturn(resultB).when(batch).delete(eq(BlobId.of("bucket", "blobB")));
|
||||
doAnswer(invocation -> {
|
||||
if (randomBoolean()) {
|
||||
StorageException storageException = new StorageException(new IOException("Batched delete throws a storage exception"));
|
||||
((BatchResult.Callback) invocation.getArguments()[0]).error(storageException);
|
||||
} else {
|
||||
((BatchResult.Callback) invocation.getArguments()[0]).success(randomBoolean());
|
||||
}
|
||||
return null;
|
||||
}).when(resultB).notify(any(StorageBatchResult.Callback.class));
|
||||
|
||||
doNothing().when(batch).submit();
|
||||
}
|
||||
|
||||
final Storage storage = mock(Storage.class);
|
||||
when(storage.get("bucket")).thenReturn(mock(Bucket.class));
|
||||
when(storage.batch()).thenReturn(batch);
|
||||
|
||||
final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class);
|
||||
when(storageService.client(any(String.class))).thenReturn(storage);
|
||||
|
||||
try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", storageService)) {
|
||||
final BlobContainer container = store.blobContainer(new BlobPath());
|
||||
|
||||
IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs));
|
||||
assertThat(e.getCause(), instanceOf(StorageException.class));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue