diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java index 3625eaa813a..d72fcd8395f 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureClientFactory.java @@ -22,8 +22,11 @@ package org.apache.druid.storage.azure; import com.azure.core.http.policy.ExponentialBackoffOptions; import com.azure.core.http.policy.RetryOptions; import com.azure.identity.DefaultAzureCredentialBuilder; +import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.batch.BlobBatchClient; +import com.azure.storage.blob.batch.BlobBatchClientBuilder; import com.azure.storage.common.StorageSharedKeyCredential; import java.time.Duration; @@ -64,6 +67,13 @@ public class AzureClientFactory return cachedBlobServiceClients.get(retryCount); } + + // Mainly here to make testing easier. + public BlobBatchClient getBlobBatchClient(BlobContainerClient blobContainerClient) + { + return new BlobBatchClientBuilder(blobContainerClient).buildClient(); + } + private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder() { BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder() diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java index aa7bef718cf..b929d255c1c 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java @@ -22,8 +22,6 @@ package org.apache.druid.storage.azure; import com.azure.core.http.rest.PagedIterable; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClient; -import com.azure.storage.blob.batch.BlobBatchClient; -import com.azure.storage.blob.batch.BlobBatchClientBuilder; import com.azure.storage.blob.batch.BlobBatchStorageException; import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobRange; @@ -36,7 +34,7 @@ import com.azure.storage.blob.options.BlockBlobOutputStreamOptions; import com.azure.storage.blob.specialized.BlockBlobClient; import com.azure.storage.common.Utility; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; +import com.google.common.collect.Streams; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.logger.Logger; @@ -49,6 +47,7 @@ import java.io.OutputStream; import java.time.Duration; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /** * Abstracts the Azure storage layer. Makes direct calls to Azure file system. @@ -173,9 +172,17 @@ public class AzureStorage public void batchDeleteFiles(String containerName, Iterable paths, Integer maxAttempts) throws BlobBatchStorageException { + BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + List blobUris = Streams.stream(paths).map(path -> blobContainerClient.getBlobContainerUrl() + "/" + path).collect(Collectors.toList()); - BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(getOrCreateBlobContainerClient(containerName, maxAttempts)).buildClient(); - blobBatchClient.deleteBlobs(Lists.newArrayList(paths), DeleteSnapshotsOptionType.ONLY); + // We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure. + azureClientFactory.getBlobBatchClient(blobContainerClient).deleteBlobs( + blobUris, + DeleteSnapshotsOptionType.INCLUDE + ).forEach(response -> + log.debug("Deleting blob with URL %s completed with status code %d%n", + response.getRequest().getUrl(), response.getStatusCode()) + ); } public List listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts) diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java index cd93c80ba14..be7041d7a99 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java @@ -19,6 +19,7 @@ package org.apache.druid.storage.azure.output; +import com.azure.storage.blob.batch.BlobBatchStorageException; import com.azure.storage.blob.models.BlobStorageException; import com.google.common.base.Joiner; import com.google.common.collect.Iterables; @@ -158,7 +159,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector> supplier = new SettableSupplier<>(); + supplier.set(new TestPagedResponse<>(ImmutableList.of())); + PagedIterable pagedIterable = new PagedIterable<>(supplier); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + + Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl(); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null); + Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient); + Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs( + captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE) + ); + + azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null); + Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" + BLOB_NAME); + } } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java index 1bdbe0a2ece..5201a3bdd9d 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.storage.azure.output; +import com.azure.core.http.HttpResponse; import com.azure.storage.blob.models.BlobStorageException; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -198,4 +199,30 @@ public class AzureStorageConnectorTest Assert.assertEquals(ImmutableList.of("x/y/z/" + TEST_FILE, "p/q/r/" + TEST_FILE), ret); EasyMock.reset(azureStorage); } + + @Test + public void test_deleteFile_blobStorageException() + { + EasyMock.reset(azureStorage); + HttpResponse mockHttpResponse = EasyMock.createMock(HttpResponse.class); + azureStorage.batchDeleteFiles(EasyMock.anyString(), EasyMock.anyObject(), EasyMock.anyInt()); + EasyMock.expectLastCall().andThrow(new BlobStorageException("error", mockHttpResponse, null)); + EasyMock.replay(azureStorage); + Assert.assertThrows(IOException.class, () -> storageConnector.deleteFile("file")); + EasyMock.verify(azureStorage); + EasyMock.reset(azureStorage); + } + + @Test + public void test_deleteFiles_blobStorageException() + { + EasyMock.reset(azureStorage); + HttpResponse mockHttpResponse = EasyMock.createMock(HttpResponse.class); + azureStorage.batchDeleteFiles(EasyMock.anyString(), EasyMock.anyObject(), EasyMock.anyInt()); + EasyMock.expectLastCall().andThrow(new BlobStorageException("error", mockHttpResponse, null)); + EasyMock.replay(azureStorage); + Assert.assertThrows(IOException.class, () -> storageConnector.deleteFiles(ImmutableList.of())); + EasyMock.verify(azureStorage); + EasyMock.reset(azureStorage); + } }