From 5edfa9429f40bbddfd868bcd0d51f106702199d3 Mon Sep 17 00:00:00 2001 From: George Shiqi Wu Date: Wed, 31 Jan 2024 13:41:15 -0500 Subject: [PATCH] Batch kill in azure (#15770) * Multi kill * add some unit tests * Fix param * Fix deleteBatchFiles * Fix unit tests * Add tests * Save work on batch kill * add tests * Fix unit tests * Update extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java Co-authored-by: Suneet Saldanha * Fix unit tests * Update extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java Co-authored-by: Suneet Saldanha * fix test * fix test * Add test --------- Co-authored-by: Suneet Saldanha --- .../storage/azure/AzureDataSegmentKiller.java | 49 +++++++++ .../druid/storage/azure/AzureStorage.java | 64 +++++++++-- .../azure/AzureDataSegmentKillerTest.java | 102 ++++++++++++++++++ .../druid/storage/azure/AzureStorageTest.java | 64 ++++++++++- .../output/AzureStorageConnectorTest.java | 10 +- 5 files changed, 275 insertions(+), 14 deletions(-) diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java index 5429b845ef6..dd609e2f357 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java @@ -32,6 +32,9 @@ import org.apache.druid.timeline.DataSegment; import java.io.IOException; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -40,6 +43,7 @@ import java.util.Map; public class AzureDataSegmentKiller implements DataSegmentKiller { private static final Logger log = new Logger(AzureDataSegmentKiller.class); + private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id private final AzureDataSegmentConfig segmentConfig; private final AzureInputDataConfig inputDataConfig; @@ -63,6 +67,51 @@ public class AzureDataSegmentKiller implements DataSegmentKiller this.azureCloudBlobIterableFactory = azureCloudBlobIterableFactory; } + @Override + public void kill(List segments) throws SegmentLoadingException + { + if (segments.isEmpty()) { + return; + } + if (segments.size() == 1) { + kill(segments.get(0)); + return; + } + + // create a list of keys to delete + Map> containerToKeysToDelete = new HashMap<>(); + for (DataSegment segment : segments) { + Map loadSpec = segment.getLoadSpec(); + final String containerName = MapUtils.getString(loadSpec, "containerName"); + final String blobPath = MapUtils.getString(loadSpec, "blobPath"); + List keysToDelete = containerToKeysToDelete.computeIfAbsent( + containerName, + k -> new ArrayList<>() + ); + keysToDelete.add(blobPath); + } + + boolean shouldThrowException = false; + for (Map.Entry> containerToKeys : containerToKeysToDelete.entrySet()) { + boolean batchSuccessful = azureStorage.batchDeleteFiles( + containerToKeys.getKey(), + containerToKeys.getValue(), + null + ); + + if (!batchSuccessful) { + shouldThrowException = true; + } + } + + if (shouldThrowException) { + throw new SegmentLoadingException( + "Couldn't delete segments from Azure. See the task logs for more details." + ); + } + } + + @Override public void kill(DataSegment segment) throws SegmentLoadingException { diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java index 9cf5952bd81..8cf4aa0c048 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java @@ -22,6 +22,7 @@ package org.apache.druid.storage.azure; import com.azure.core.http.rest.PagedIterable; import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.batch.BlobBatchClient; import com.azure.storage.blob.batch.BlobBatchStorageException; import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobRange; @@ -34,6 +35,7 @@ import com.azure.storage.blob.options.BlockBlobOutputStreamOptions; import com.azure.storage.blob.specialized.BlockBlobClient; import com.azure.storage.common.Utility; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import com.google.common.collect.Streams; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.logger.Logger; @@ -58,6 +60,10 @@ public class AzureStorage // Default value from Azure library private static final int DELTA_BACKOFF_MS = 30_000; + // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id + private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; + + private static final Logger log = new Logger(AzureStorage.class); private final AzureClientFactory azureClientFactory; @@ -172,20 +178,60 @@ public class AzureStorage return blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length))); } - public void batchDeleteFiles(String containerName, Iterable paths, Integer maxAttempts) + /** + * Deletes multiple files from the specified container. + * + * @param containerName The name of the container from which files will be deleted. + * @param paths An iterable of file paths to be deleted. + * @param maxAttempts (Optional) The maximum number of attempts to delete each file. + * If null, the system default number of attempts will be used. + * @return true if all files were successfully deleted; false otherwise. + */ + public boolean batchDeleteFiles(String containerName, Iterable paths, @Nullable Integer maxAttempts) throws BlobBatchStorageException { BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + BlobBatchClient blobBatchClient = azureClientFactory.getBlobBatchClient(blobContainerClient); List blobUris = Streams.stream(paths).map(path -> blobContainerClient.getBlobContainerUrl() + "/" + path).collect(Collectors.toList()); - - // We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure. - azureClientFactory.getBlobBatchClient(blobContainerClient).deleteBlobs( - blobUris, - DeleteSnapshotsOptionType.INCLUDE - ).forEach(response -> - log.debug("Deleting blob with URL %s completed with status code %d%n", - response.getRequest().getUrl(), response.getStatusCode()) + boolean hadException = false; + List> keysChunks = Lists.partition( + blobUris, + MAX_MULTI_OBJECT_DELETE_SIZE ); + for (List chunkOfKeys : keysChunks) { + try { + log.info( + "Removing from container [%s] the following files: [%s]", + containerName, + chunkOfKeys + ); + // We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure. + blobBatchClient.deleteBlobs( + chunkOfKeys, + DeleteSnapshotsOptionType.INCLUDE + ).forEach(response -> + log.debug("Deleting blob with URL %s completed with status code %d%n", + response.getRequest().getUrl(), response.getStatusCode()) + ); + } + catch (BlobStorageException | BlobBatchStorageException e) { + hadException = true; + log.noStackTrace().warn(e, + "Unable to delete from container [%s], the following keys [%s]", + containerName, + chunkOfKeys + ); + } + catch (Exception e) { + hadException = true; + log.noStackTrace().warn(e, + "Unexpected exception occurred when deleting from container [%s], the following keys [%s]", + containerName, + chunkOfKeys + ); + } + } + return !hadException; } public List listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts) diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java index 55b5a7dc612..43844c0c7cb 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentKillerTest.java @@ -22,6 +22,7 @@ package org.apache.druid.storage.azure; import com.azure.storage.blob.models.BlobStorageException; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.StringUtils; @@ -29,6 +30,7 @@ import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.storage.azure.blob.CloudBlobHolder; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; +import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; import org.junit.Assert; @@ -39,6 +41,7 @@ import java.io.IOException; import java.net.URI; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; public class AzureDataSegmentKillerTest extends EasyMockSupport @@ -47,6 +50,8 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport private static final String CONTAINER = "test"; private static final String PREFIX = "test/log"; private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"; + private static final String BLOB_PATH_2 = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/2/0/index.zip"; + private static final int MAX_KEYS = 1; private static final int MAX_TRIES = 3; @@ -70,6 +75,18 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport 1 ); + private static final DataSegment DATA_SEGMENT_2 = new DataSegment( + "test", + Intervals.of("2015-04-12/2015-04-13"), + "1", + ImmutableMap.of("containerName", CONTAINER_NAME, "blobPath", BLOB_PATH_2), + null, + null, + NoneShardSpec.instance(), + 0, + 1 + ); + private AzureDataSegmentConfig segmentConfig; private AzureInputDataConfig inputDataConfig; private AzureAccountConfig accountConfig; @@ -285,4 +302,89 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport verifyAll(); } + + @Test + public void killBatchTest() throws SegmentLoadingException, BlobStorageException + { + Capture> deletedFilesCapture = Capture.newInstance(); + EasyMock.expect(azureStorage.batchDeleteFiles( + EasyMock.eq(CONTAINER_NAME), + EasyMock.capture(deletedFilesCapture), + EasyMock.eq(null) + )).andReturn(true); + + replayAll(); + + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); + + killer.kill(ImmutableList.of(DATA_SEGMENT, DATA_SEGMENT_2)); + + verifyAll(); + + Assert.assertEquals( + ImmutableSet.of(BLOB_PATH, BLOB_PATH_2), + new HashSet<>(deletedFilesCapture.getValue()) + ); + } + + @Test(expected = RuntimeException.class) + public void test_killBatch_runtimeException() + throws SegmentLoadingException, BlobStorageException + { + + EasyMock.expect(azureStorage.batchDeleteFiles(CONTAINER_NAME, ImmutableList.of(BLOB_PATH, BLOB_PATH_2), null)) + .andThrow(new RuntimeException("")); + + replayAll(); + + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); + + killer.kill(ImmutableList.of(DATA_SEGMENT, DATA_SEGMENT_2)); + + verifyAll(); + } + + @Test(expected = SegmentLoadingException.class) + public void test_killBatch_SegmentLoadingExceptionOnError() + throws SegmentLoadingException, BlobStorageException + { + + EasyMock.expect(azureStorage.batchDeleteFiles(CONTAINER_NAME, ImmutableList.of(BLOB_PATH, BLOB_PATH_2), null)) + .andReturn(false); + + replayAll(); + + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); + + killer.kill(ImmutableList.of(DATA_SEGMENT, DATA_SEGMENT_2)); + + verifyAll(); + } + + @Test + public void killBatch_emptyList() throws SegmentLoadingException, BlobStorageException + { + + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); + killer.kill(ImmutableList.of()); + } + + @Test + public void killBatch_singleSegment() throws SegmentLoadingException, BlobStorageException + { + + List deletedFiles = new ArrayList<>(); + final String dirPath = Paths.get(BLOB_PATH).getParent().toString(); + + // For a single segment, fall back to regular kill(DataSegment) logic + EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andReturn(deletedFiles); + + replayAll(); + + AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory); + + killer.kill(ImmutableList.of(DATA_SEGMENT)); + + verifyAll(); + } } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java index 65acc9346a6..4159557c87e 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java @@ -37,6 +37,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.Mockito; +import java.util.ArrayList; import java.util.List; @@ -121,7 +122,7 @@ public class AzureStorageTest @Test public void testBatchDeleteFiles_emptyResponse() throws BlobStorageException { - String containerUrl = "https://implysaasdeveastussa.blob.core.windows.net/container"; + String containerUrl = "https://storageaccount.blob.core.windows.net/container"; BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class); SettableSupplier> supplier = new SettableSupplier<>(); @@ -138,8 +139,67 @@ public class AzureStorageTest captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE) ); - azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null); + boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null); Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" + BLOB_NAME); + Assert.assertTrue(deleteSuccessful); + } + + @Test + public void testBatchDeleteFiles_error() throws BlobStorageException + { + String containerUrl = "https://storageaccount.blob.core.windows.net/container"; + BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class); + + SettableSupplier> supplier = new SettableSupplier<>(); + supplier.set(new TestPagedResponse<>(ImmutableList.of())); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + + Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl(); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); + Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient); + Mockito.doThrow(new RuntimeException()).when(blobBatchClient).deleteBlobs( + captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE) + ); + + boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null); + Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" + BLOB_NAME); + Assert.assertFalse(deleteSuccessful); + } + + @Test + public void testBatchDeleteFiles_emptyResponse_multipleResponses() throws BlobStorageException + { + String containerUrl = "https://storageaccount.blob.core.windows.net/container"; + BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class); + + SettableSupplier> supplier = new SettableSupplier<>(); + supplier.set(new TestPagedResponse<>(ImmutableList.of())); + PagedIterable pagedIterable = new PagedIterable<>(supplier); + + ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); + + Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl(); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); + Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient); + Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs( + captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE) + ); + + + List blobNameList = new ArrayList<>(); + for (int i = 0; i <= 257; i++) { + blobNameList.add(BLOB_NAME + i); + } + + boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, blobNameList, null); + + List> deletedValues = captor.getAllValues(); + Assert.assertEquals(deletedValues.get(0).size(), 256); + Assert.assertEquals(deletedValues.get(1).size(), 2); + Assert.assertTrue(deleteSuccessful); } } diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java index 5201a3bdd9d..17cee185538 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java @@ -156,11 +156,11 @@ public class AzureStorageConnectorTest EasyMock.reset(azureStorage); Capture containerCapture = EasyMock.newCapture(); Capture> pathsCapture = EasyMock.newCapture(); - azureStorage.batchDeleteFiles( + EasyMock.expect(azureStorage.batchDeleteFiles( EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture), EasyMock.anyInt() - ); + )).andReturn(true); EasyMock.replay(azureStorage); storageConnector.deleteFile(TEST_FILE); Assert.assertEquals(CONTAINER, containerCapture.getValue()); @@ -174,7 +174,11 @@ public class AzureStorageConnectorTest EasyMock.reset(azureStorage); Capture containerCapture = EasyMock.newCapture(); Capture> pathsCapture = EasyMock.newCapture(); - azureStorage.batchDeleteFiles(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture), EasyMock.anyInt()); + EasyMock.expect(azureStorage.batchDeleteFiles( + EasyMock.capture(containerCapture), + EasyMock.capture(pathsCapture), + EasyMock.anyInt() + )).andReturn(true); EasyMock.replay(azureStorage); storageConnector.deleteFiles(ImmutableList.of(TEST_FILE + "_1.part", TEST_FILE + "_2.part")); Assert.assertEquals(CONTAINER, containerCapture.getValue());