From add7148f3b3b002cadfd02a522a1cec7ce925887 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 20 Sep 2019 09:52:50 +0200 Subject: [PATCH] GCS deleteBlobsIgnoringIfNotExists should catch StorageException (#46832) GoogleCloudStorageBlobStore.deleteBlobsIgnoringIfNotExists() does not correctly catch StorageException thrown by batch.submit(). In the case a snapshot is deleted through BlobStoreRepository.deleteSnapshot() a storage exception is not caught (only IOException are) so the deletion is interrupted and indices cannot be cleaned up. The storage exception bubbles up to SnapshotService.deleteSnapshotFromRepository() but the listener that removes the deletion from the cluster state is not executed, leaving the deletion in the cluster state. This bug has been reported in #46772 where batch.submit() threw an exception in the test testIndicesDeletedFromRepository and following tests failed because a snapshot deletion was running. Relates #46772 --- .../gcs/GoogleCloudStorageBlobStore.java | 49 ++++++++------- ...leCloudStorageBlobStoreContainerTests.java | 61 +++++++++++++++++++ 2 files changed, 88 insertions(+), 22 deletions(-) diff --git a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java index 72baf4721bf..520b5b798a6 100644 --- a/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java +++ b/plugins/repository-gcs/src/main/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStore.java @@ -364,31 +364,36 @@ class GoogleCloudStorageBlobStore implements BlobStore { } final List blobIdsToDelete = blobNames.stream().map(blob -> BlobId.of(bucketName, blob)).collect(Collectors.toList()); final List failedBlobs = Collections.synchronizedList(new ArrayList<>()); - final StorageException e = SocketAccess.doPrivilegedIOException(() -> { - final AtomicReference ioe = new AtomicReference<>(); - final StorageBatch batch = client().batch(); - for (BlobId blob : blobIdsToDelete) { - batch.delete(blob).notify( - new BatchResult.Callback() { - @Override - public void success(Boolean result) { - } + try { + SocketAccess.doPrivilegedVoidIOException(() -> { + final AtomicReference ioe = new AtomicReference<>(); + final StorageBatch batch = client().batch(); + for (BlobId blob : blobIdsToDelete) { + batch.delete(blob).notify( + new BatchResult.Callback() { + @Override + public void success(Boolean result) { + } - @Override - public void error(StorageException exception) { - if (exception.getCode() != HTTP_NOT_FOUND) { - failedBlobs.add(blob); - if (ioe.compareAndSet(null, exception) == false) { - ioe.get().addSuppressed(exception); + @Override + public void error(StorageException exception) { + if (exception.getCode() != HTTP_NOT_FOUND) { + failedBlobs.add(blob); + if (ioe.compareAndSet(null, exception) == false) { + ioe.get().addSuppressed(exception); + } } } - } - }); - } - batch.submit(); - return ioe.get(); - }); - if (e != null) { + }); + } + batch.submit(); + + final StorageException exception = ioe.get(); + if (exception != null) { + throw exception; + } + }); + } catch (final Exception e) { throw new IOException("Exception when deleting blobs [" + failedBlobs + "]", e); } assert failedBlobs.isEmpty(); diff --git a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java index 5f5eaff85a7..cc3782cabac 100644 --- a/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java +++ b/plugins/repository-gcs/src/test/java/org/elasticsearch/repositories/gcs/GoogleCloudStorageBlobStoreContainerTests.java @@ -19,6 +19,13 @@ package org.elasticsearch.repositories.gcs; +import com.google.cloud.BatchResult; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageBatch; +import com.google.cloud.storage.StorageBatchResult; +import com.google.cloud.storage.StorageException; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRefBuilder; import org.elasticsearch.common.blobstore.BlobContainer; @@ -30,11 +37,18 @@ import org.elasticsearch.repositories.ESBlobStoreContainerTestCase; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.List; import java.util.Locale; import java.util.concurrent.ConcurrentHashMap; import static org.elasticsearch.repositories.ESBlobStoreTestCase.randomBytes; +import static org.hamcrest.Matchers.instanceOf; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -77,4 +91,51 @@ public class GoogleCloudStorageBlobStoreContainerTests extends ESBlobStoreContai } } + @SuppressWarnings("unchecked") + public void testDeleteBlobsIgnoringIfNotExistsThrowsIOException() throws Exception { + final List blobs = Arrays.asList("blobA", "blobB"); + + final StorageBatch batch = mock(StorageBatch.class); + if (randomBoolean()) { + StorageBatchResult result = mock(StorageBatchResult.class); + when(batch.delete(any(BlobId.class))).thenReturn(result); + doThrow(new StorageException(new IOException("Batch submit throws a storage exception"))).when(batch).submit(); + } else { + StorageBatchResult resultA = mock(StorageBatchResult.class); + doReturn(resultA).when(batch).delete(eq(BlobId.of("bucket", "blobA"))); + doAnswer(invocation -> { + StorageException storageException = new StorageException(new IOException("Batched delete throws a storage exception")); + ((BatchResult.Callback) invocation.getArguments()[0]).error(storageException); + return null; + }).when(resultA).notify(any(StorageBatchResult.Callback.class)); + + StorageBatchResult resultB = mock(StorageBatchResult.class); + doReturn(resultB).when(batch).delete(eq(BlobId.of("bucket", "blobB"))); + doAnswer(invocation -> { + if (randomBoolean()) { + StorageException storageException = new StorageException(new IOException("Batched delete throws a storage exception")); + ((BatchResult.Callback) invocation.getArguments()[0]).error(storageException); + } else { + ((BatchResult.Callback) invocation.getArguments()[0]).success(randomBoolean()); + } + return null; + }).when(resultB).notify(any(StorageBatchResult.Callback.class)); + + doNothing().when(batch).submit(); + } + + final Storage storage = mock(Storage.class); + when(storage.get("bucket")).thenReturn(mock(Bucket.class)); + when(storage.batch()).thenReturn(batch); + + final GoogleCloudStorageService storageService = mock(GoogleCloudStorageService.class); + when(storageService.client(any(String.class))).thenReturn(storage); + + try (BlobStore store = new GoogleCloudStorageBlobStore("bucket", "test", storageService)) { + final BlobContainer container = store.blobContainer(new BlobPath()); + + IOException e = expectThrows(IOException.class, () -> container.deleteBlobsIgnoringIfNotExists(blobs)); + assertThat(e.getCause(), instanceOf(StorageException.class)); + } + } }