Batch kill in azure (#15770)

* Multi kill

* add some unit tests

* Fix param

* Fix deleteBatchFiles

* Fix unit tests

* Add tests

* Save work on batch kill

* add tests

* Fix unit tests

* Update extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java

Co-authored-by: Suneet Saldanha <suneet@apache.org>

* Fix unit tests

* Update extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java

Co-authored-by: Suneet Saldanha <suneet@apache.org>

* fix test

* fix test

* Add test

---------

Co-authored-by: Suneet Saldanha <suneet@apache.org>
This commit is contained in:
George Shiqi Wu 2024-01-31 13:41:15 -05:00 committed by GitHub
parent 0089f6b905
commit 5edfa9429f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 275 additions and 14 deletions

View File

@ -32,6 +32,9 @@ import org.apache.druid.timeline.DataSegment;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -40,6 +43,7 @@ import java.util.Map;
public class AzureDataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(AzureDataSegmentKiller.class);
private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
private final AzureDataSegmentConfig segmentConfig;
private final AzureInputDataConfig inputDataConfig;
@ -63,6 +67,51 @@ public class AzureDataSegmentKiller implements DataSegmentKiller
this.azureCloudBlobIterableFactory = azureCloudBlobIterableFactory;
}
@Override
public void kill(List<DataSegment> segments) throws SegmentLoadingException
{
if (segments.isEmpty()) {
return;
}
if (segments.size() == 1) {
kill(segments.get(0));
return;
}
// create a list of keys to delete
Map<String, List<String>> containerToKeysToDelete = new HashMap<>();
for (DataSegment segment : segments) {
Map<String, Object> loadSpec = segment.getLoadSpec();
final String containerName = MapUtils.getString(loadSpec, "containerName");
final String blobPath = MapUtils.getString(loadSpec, "blobPath");
List<String> keysToDelete = containerToKeysToDelete.computeIfAbsent(
containerName,
k -> new ArrayList<>()
);
keysToDelete.add(blobPath);
}
boolean shouldThrowException = false;
for (Map.Entry<String, List<String>> containerToKeys : containerToKeysToDelete.entrySet()) {
boolean batchSuccessful = azureStorage.batchDeleteFiles(
containerToKeys.getKey(),
containerToKeys.getValue(),
null
);
if (!batchSuccessful) {
shouldThrowException = true;
}
}
if (shouldThrowException) {
throw new SegmentLoadingException(
"Couldn't delete segments from Azure. See the task logs for more details."
);
}
}
@Override
public void kill(DataSegment segment) throws SegmentLoadingException
{

View File

@ -22,6 +22,7 @@ package org.apache.druid.storage.azure;
import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobRange;
@ -34,6 +35,7 @@ import com.azure.storage.blob.options.BlockBlobOutputStreamOptions;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.Utility;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
@ -58,6 +60,10 @@ public class AzureStorage
// Default value from Azure library
private static final int DELTA_BACKOFF_MS = 30_000;
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256;
private static final Logger log = new Logger(AzureStorage.class);
private final AzureClientFactory azureClientFactory;
@ -172,20 +178,60 @@ public class AzureStorage
return blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length)));
}
public void batchDeleteFiles(String containerName, Iterable<String> paths, Integer maxAttempts)
/**
* Deletes multiple files from the specified container.
*
* @param containerName The name of the container from which files will be deleted.
* @param paths An iterable of file paths to be deleted.
* @param maxAttempts (Optional) The maximum number of attempts to delete each file.
* If null, the system default number of attempts will be used.
* @return true if all files were successfully deleted; false otherwise.
*/
public boolean batchDeleteFiles(String containerName, Iterable<String> paths, @Nullable Integer maxAttempts)
throws BlobBatchStorageException
{
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
BlobBatchClient blobBatchClient = azureClientFactory.getBlobBatchClient(blobContainerClient);
List<String> blobUris = Streams.stream(paths).map(path -> blobContainerClient.getBlobContainerUrl() + "/" + path).collect(Collectors.toList());
// We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure.
azureClientFactory.getBlobBatchClient(blobContainerClient).deleteBlobs(
blobUris,
DeleteSnapshotsOptionType.INCLUDE
).forEach(response ->
log.debug("Deleting blob with URL %s completed with status code %d%n",
response.getRequest().getUrl(), response.getStatusCode())
boolean hadException = false;
List<List<String>> keysChunks = Lists.partition(
blobUris,
MAX_MULTI_OBJECT_DELETE_SIZE
);
for (List<String> chunkOfKeys : keysChunks) {
try {
log.info(
"Removing from container [%s] the following files: [%s]",
containerName,
chunkOfKeys
);
// We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure.
blobBatchClient.deleteBlobs(
chunkOfKeys,
DeleteSnapshotsOptionType.INCLUDE
).forEach(response ->
log.debug("Deleting blob with URL %s completed with status code %d%n",
response.getRequest().getUrl(), response.getStatusCode())
);
}
catch (BlobStorageException | BlobBatchStorageException e) {
hadException = true;
log.noStackTrace().warn(e,
"Unable to delete from container [%s], the following keys [%s]",
containerName,
chunkOfKeys
);
}
catch (Exception e) {
hadException = true;
log.noStackTrace().warn(e,
"Unexpected exception occurred when deleting from container [%s], the following keys [%s]",
containerName,
chunkOfKeys
);
}
}
return !hadException;
}
public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts)

View File

@ -22,6 +22,7 @@ package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@ -29,6 +30,7 @@ import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
@ -39,6 +41,7 @@ import java.io.IOException;
import java.net.URI;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
public class AzureDataSegmentKillerTest extends EasyMockSupport
@ -47,6 +50,8 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
private static final String CONTAINER = "test";
private static final String PREFIX = "test/log";
private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final String BLOB_PATH_2 = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/2/0/index.zip";
private static final int MAX_KEYS = 1;
private static final int MAX_TRIES = 3;
@ -70,6 +75,18 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
1
);
private static final DataSegment DATA_SEGMENT_2 = new DataSegment(
"test",
Intervals.of("2015-04-12/2015-04-13"),
"1",
ImmutableMap.of("containerName", CONTAINER_NAME, "blobPath", BLOB_PATH_2),
null,
null,
NoneShardSpec.instance(),
0,
1
);
private AzureDataSegmentConfig segmentConfig;
private AzureInputDataConfig inputDataConfig;
private AzureAccountConfig accountConfig;
@ -285,4 +302,89 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
verifyAll();
}
@Test
public void killBatchTest() throws SegmentLoadingException, BlobStorageException
{
Capture<List<String>> deletedFilesCapture = Capture.newInstance();
EasyMock.expect(azureStorage.batchDeleteFiles(
EasyMock.eq(CONTAINER_NAME),
EasyMock.capture(deletedFilesCapture),
EasyMock.eq(null)
)).andReturn(true);
replayAll();
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(ImmutableList.of(DATA_SEGMENT, DATA_SEGMENT_2));
verifyAll();
Assert.assertEquals(
ImmutableSet.of(BLOB_PATH, BLOB_PATH_2),
new HashSet<>(deletedFilesCapture.getValue())
);
}
@Test(expected = RuntimeException.class)
public void test_killBatch_runtimeException()
throws SegmentLoadingException, BlobStorageException
{
EasyMock.expect(azureStorage.batchDeleteFiles(CONTAINER_NAME, ImmutableList.of(BLOB_PATH, BLOB_PATH_2), null))
.andThrow(new RuntimeException(""));
replayAll();
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(ImmutableList.of(DATA_SEGMENT, DATA_SEGMENT_2));
verifyAll();
}
@Test(expected = SegmentLoadingException.class)
public void test_killBatch_SegmentLoadingExceptionOnError()
throws SegmentLoadingException, BlobStorageException
{
EasyMock.expect(azureStorage.batchDeleteFiles(CONTAINER_NAME, ImmutableList.of(BLOB_PATH, BLOB_PATH_2), null))
.andReturn(false);
replayAll();
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(ImmutableList.of(DATA_SEGMENT, DATA_SEGMENT_2));
verifyAll();
}
@Test
public void killBatch_emptyList() throws SegmentLoadingException, BlobStorageException
{
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(ImmutableList.of());
}
@Test
public void killBatch_singleSegment() throws SegmentLoadingException, BlobStorageException
{
List<String> deletedFiles = new ArrayList<>();
final String dirPath = Paths.get(BLOB_PATH).getParent().toString();
// For a single segment, fall back to regular kill(DataSegment) logic
EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andReturn(deletedFiles);
replayAll();
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.kill(ImmutableList.of(DATA_SEGMENT));
verifyAll();
}
}

View File

@ -37,6 +37,7 @@ import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.List;
@ -121,7 +122,7 @@ public class AzureStorageTest
@Test
public void testBatchDeleteFiles_emptyResponse() throws BlobStorageException
{
String containerUrl = "https://implysaasdeveastussa.blob.core.windows.net/container";
String containerUrl = "https://storageaccount.blob.core.windows.net/container";
BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class);
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
@ -138,8 +139,67 @@ public class AzureStorageTest
captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
);
azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null);
boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null);
Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" + BLOB_NAME);
Assert.assertTrue(deleteSuccessful);
}
@Test
public void testBatchDeleteFiles_error() throws BlobStorageException
{
String containerUrl = "https://storageaccount.blob.core.windows.net/container";
BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class);
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of()));
ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doThrow(new RuntimeException()).when(blobBatchClient).deleteBlobs(
captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
);
boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null);
Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" + BLOB_NAME);
Assert.assertFalse(deleteSuccessful);
}
@Test
public void testBatchDeleteFiles_emptyResponse_multipleResponses() throws BlobStorageException
{
String containerUrl = "https://storageaccount.blob.core.windows.net/container";
BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class);
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of()));
PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
);
List<String> blobNameList = new ArrayList<>();
for (int i = 0; i <= 257; i++) {
blobNameList.add(BLOB_NAME + i);
}
boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, blobNameList, null);
List<List<String>> deletedValues = captor.getAllValues();
Assert.assertEquals(deletedValues.get(0).size(), 256);
Assert.assertEquals(deletedValues.get(1).size(), 2);
Assert.assertTrue(deleteSuccessful);
}
}

View File

@ -156,11 +156,11 @@ public class AzureStorageConnectorTest
EasyMock.reset(azureStorage);
Capture<String> containerCapture = EasyMock.newCapture();
Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
azureStorage.batchDeleteFiles(
EasyMock.expect(azureStorage.batchDeleteFiles(
EasyMock.capture(containerCapture),
EasyMock.capture(pathsCapture),
EasyMock.anyInt()
);
)).andReturn(true);
EasyMock.replay(azureStorage);
storageConnector.deleteFile(TEST_FILE);
Assert.assertEquals(CONTAINER, containerCapture.getValue());
@ -174,7 +174,11 @@ public class AzureStorageConnectorTest
EasyMock.reset(azureStorage);
Capture<String> containerCapture = EasyMock.newCapture();
Capture<Iterable<String>> pathsCapture = EasyMock.newCapture();
azureStorage.batchDeleteFiles(EasyMock.capture(containerCapture), EasyMock.capture(pathsCapture), EasyMock.anyInt());
EasyMock.expect(azureStorage.batchDeleteFiles(
EasyMock.capture(containerCapture),
EasyMock.capture(pathsCapture),
EasyMock.anyInt()
)).andReturn(true);
EasyMock.replay(azureStorage);
storageConnector.deleteFiles(ImmutableList.of(TEST_FILE + "_1.part", TEST_FILE + "_2.part"));
Assert.assertEquals(CONTAINER, containerCapture.getValue());