mirror of https://github.com/apache/druid.git
Fix AzureStorage.batchDeleteFiles (#15730)
* Fix param * Fix deleteBatchFiles * Fix unit tests * Add tests
This commit is contained in:
parent
2eba20d724
commit
ef0232290c
|
@ -22,8 +22,11 @@ package org.apache.druid.storage.azure;
|
||||||
import com.azure.core.http.policy.ExponentialBackoffOptions;
|
import com.azure.core.http.policy.ExponentialBackoffOptions;
|
||||||
import com.azure.core.http.policy.RetryOptions;
|
import com.azure.core.http.policy.RetryOptions;
|
||||||
import com.azure.identity.DefaultAzureCredentialBuilder;
|
import com.azure.identity.DefaultAzureCredentialBuilder;
|
||||||
|
import com.azure.storage.blob.BlobContainerClient;
|
||||||
import com.azure.storage.blob.BlobServiceClient;
|
import com.azure.storage.blob.BlobServiceClient;
|
||||||
import com.azure.storage.blob.BlobServiceClientBuilder;
|
import com.azure.storage.blob.BlobServiceClientBuilder;
|
||||||
|
import com.azure.storage.blob.batch.BlobBatchClient;
|
||||||
|
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
|
||||||
import com.azure.storage.common.StorageSharedKeyCredential;
|
import com.azure.storage.common.StorageSharedKeyCredential;
|
||||||
|
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
|
@ -64,6 +67,13 @@ public class AzureClientFactory
|
||||||
return cachedBlobServiceClients.get(retryCount);
|
return cachedBlobServiceClients.get(retryCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// Mainly here to make testing easier.
|
||||||
|
public BlobBatchClient getBlobBatchClient(BlobContainerClient blobContainerClient)
|
||||||
|
{
|
||||||
|
return new BlobBatchClientBuilder(blobContainerClient).buildClient();
|
||||||
|
}
|
||||||
|
|
||||||
private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
|
private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
|
||||||
{
|
{
|
||||||
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
|
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
|
||||||
|
|
|
@ -22,8 +22,6 @@ package org.apache.druid.storage.azure;
|
||||||
import com.azure.core.http.rest.PagedIterable;
|
import com.azure.core.http.rest.PagedIterable;
|
||||||
import com.azure.storage.blob.BlobContainerClient;
|
import com.azure.storage.blob.BlobContainerClient;
|
||||||
import com.azure.storage.blob.BlobServiceClient;
|
import com.azure.storage.blob.BlobServiceClient;
|
||||||
import com.azure.storage.blob.batch.BlobBatchClient;
|
|
||||||
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
|
|
||||||
import com.azure.storage.blob.batch.BlobBatchStorageException;
|
import com.azure.storage.blob.batch.BlobBatchStorageException;
|
||||||
import com.azure.storage.blob.models.BlobItem;
|
import com.azure.storage.blob.models.BlobItem;
|
||||||
import com.azure.storage.blob.models.BlobRange;
|
import com.azure.storage.blob.models.BlobRange;
|
||||||
|
@ -36,7 +34,7 @@ import com.azure.storage.blob.options.BlockBlobOutputStreamOptions;
|
||||||
import com.azure.storage.blob.specialized.BlockBlobClient;
|
import com.azure.storage.blob.specialized.BlockBlobClient;
|
||||||
import com.azure.storage.common.Utility;
|
import com.azure.storage.common.Utility;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Streams;
|
||||||
import org.apache.druid.java.util.common.RE;
|
import org.apache.druid.java.util.common.RE;
|
||||||
import org.apache.druid.java.util.common.logger.Logger;
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
|
|
||||||
|
@ -49,6 +47,7 @@ import java.io.OutputStream;
|
||||||
import java.time.Duration;
|
import java.time.Duration;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Abstracts the Azure storage layer. Makes direct calls to Azure file system.
|
* Abstracts the Azure storage layer. Makes direct calls to Azure file system.
|
||||||
|
@ -173,9 +172,17 @@ public class AzureStorage
|
||||||
public void batchDeleteFiles(String containerName, Iterable<String> paths, Integer maxAttempts)
|
public void batchDeleteFiles(String containerName, Iterable<String> paths, Integer maxAttempts)
|
||||||
throws BlobBatchStorageException
|
throws BlobBatchStorageException
|
||||||
{
|
{
|
||||||
|
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
|
||||||
|
List<String> blobUris = Streams.stream(paths).map(path -> blobContainerClient.getBlobContainerUrl() + "/" + path).collect(Collectors.toList());
|
||||||
|
|
||||||
BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(getOrCreateBlobContainerClient(containerName, maxAttempts)).buildClient();
|
// We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure.
|
||||||
blobBatchClient.deleteBlobs(Lists.newArrayList(paths), DeleteSnapshotsOptionType.ONLY);
|
azureClientFactory.getBlobBatchClient(blobContainerClient).deleteBlobs(
|
||||||
|
blobUris,
|
||||||
|
DeleteSnapshotsOptionType.INCLUDE
|
||||||
|
).forEach(response ->
|
||||||
|
log.debug("Deleting blob with URL %s completed with status code %d%n",
|
||||||
|
response.getRequest().getUrl(), response.getStatusCode())
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts)
|
public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts)
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.druid.storage.azure.output;
|
package org.apache.druid.storage.azure.output;
|
||||||
|
|
||||||
|
import com.azure.storage.blob.batch.BlobBatchStorageException;
|
||||||
import com.azure.storage.blob.models.BlobStorageException;
|
import com.azure.storage.blob.models.BlobStorageException;
|
||||||
import com.google.common.base.Joiner;
|
import com.google.common.base.Joiner;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
|
@ -158,7 +159,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
|
||||||
config.getMaxRetry()
|
config.getMaxRetry()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (BlobStorageException e) {
|
catch (BlobStorageException | BlobBatchStorageException e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -173,7 +174,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
|
||||||
config.getMaxRetry()
|
config.getMaxRetry()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
catch (BlobStorageException e) {
|
catch (BlobStorageException | BlobBatchStorageException e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,17 +23,22 @@ import com.azure.core.http.rest.PagedIterable;
|
||||||
import com.azure.core.http.rest.PagedResponse;
|
import com.azure.core.http.rest.PagedResponse;
|
||||||
import com.azure.storage.blob.BlobContainerClient;
|
import com.azure.storage.blob.BlobContainerClient;
|
||||||
import com.azure.storage.blob.BlobServiceClient;
|
import com.azure.storage.blob.BlobServiceClient;
|
||||||
|
import com.azure.storage.blob.batch.BlobBatchClient;
|
||||||
import com.azure.storage.blob.models.BlobItem;
|
import com.azure.storage.blob.models.BlobItem;
|
||||||
import com.azure.storage.blob.models.BlobItemProperties;
|
import com.azure.storage.blob.models.BlobItemProperties;
|
||||||
import com.azure.storage.blob.models.BlobStorageException;
|
import com.azure.storage.blob.models.BlobStorageException;
|
||||||
|
import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import org.apache.druid.common.guava.SettableSupplier;
|
import org.apache.druid.common.guava.SettableSupplier;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
import org.mockito.ArgumentMatchers;
|
import org.mockito.ArgumentMatchers;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
|
||||||
// Using Mockito for the whole test class since azure classes (e.g. BlobContainerClient) are final and can't be mocked with EasyMock
|
// Using Mockito for the whole test class since azure classes (e.g. BlobContainerClient) are final and can't be mocked with EasyMock
|
||||||
public class AzureStorageTest
|
public class AzureStorageTest
|
||||||
|
@ -87,5 +92,29 @@ public class AzureStorageTest
|
||||||
|
|
||||||
Assert.assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", null));
|
Assert.assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", null));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBatchDeleteFiles_emptyResponse() throws BlobStorageException
|
||||||
|
{
|
||||||
|
String containerUrl = "https://implysaasdeveastussa.blob.core.windows.net/container";
|
||||||
|
BlobBatchClient blobBatchClient = Mockito.mock(BlobBatchClient.class);
|
||||||
|
|
||||||
|
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
|
||||||
|
supplier.set(new TestPagedResponse<>(ImmutableList.of()));
|
||||||
|
PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
|
||||||
|
|
||||||
|
ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
|
||||||
|
|
||||||
|
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
|
||||||
|
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
|
||||||
|
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null);
|
||||||
|
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
|
||||||
|
Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
|
||||||
|
captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE)
|
||||||
|
);
|
||||||
|
|
||||||
|
azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null);
|
||||||
|
Assert.assertEquals(captor.getValue().get(0), containerUrl + "/" + BLOB_NAME);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.apache.druid.storage.azure.output;
|
package org.apache.druid.storage.azure.output;
|
||||||
|
|
||||||
|
import com.azure.core.http.HttpResponse;
|
||||||
import com.azure.storage.blob.models.BlobStorageException;
|
import com.azure.storage.blob.models.BlobStorageException;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
@ -198,4 +199,30 @@ public class AzureStorageConnectorTest
|
||||||
Assert.assertEquals(ImmutableList.of("x/y/z/" + TEST_FILE, "p/q/r/" + TEST_FILE), ret);
|
Assert.assertEquals(ImmutableList.of("x/y/z/" + TEST_FILE, "p/q/r/" + TEST_FILE), ret);
|
||||||
EasyMock.reset(azureStorage);
|
EasyMock.reset(azureStorage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_deleteFile_blobStorageException()
|
||||||
|
{
|
||||||
|
EasyMock.reset(azureStorage);
|
||||||
|
HttpResponse mockHttpResponse = EasyMock.createMock(HttpResponse.class);
|
||||||
|
azureStorage.batchDeleteFiles(EasyMock.anyString(), EasyMock.anyObject(), EasyMock.anyInt());
|
||||||
|
EasyMock.expectLastCall().andThrow(new BlobStorageException("error", mockHttpResponse, null));
|
||||||
|
EasyMock.replay(azureStorage);
|
||||||
|
Assert.assertThrows(IOException.class, () -> storageConnector.deleteFile("file"));
|
||||||
|
EasyMock.verify(azureStorage);
|
||||||
|
EasyMock.reset(azureStorage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_deleteFiles_blobStorageException()
|
||||||
|
{
|
||||||
|
EasyMock.reset(azureStorage);
|
||||||
|
HttpResponse mockHttpResponse = EasyMock.createMock(HttpResponse.class);
|
||||||
|
azureStorage.batchDeleteFiles(EasyMock.anyString(), EasyMock.anyObject(), EasyMock.anyInt());
|
||||||
|
EasyMock.expectLastCall().andThrow(new BlobStorageException("error", mockHttpResponse, null));
|
||||||
|
EasyMock.replay(azureStorage);
|
||||||
|
Assert.assertThrows(IOException.class, () -> storageConnector.deleteFiles(ImmutableList.of()));
|
||||||
|
EasyMock.verify(azureStorage);
|
||||||
|
EasyMock.reset(azureStorage);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue