From e6a82e8a11f23bf5808ac5a6a7216c9c6a437c92 Mon Sep 17 00:00:00 2001 From: Andreas Maechler Date: Fri, 7 Jun 2024 10:47:51 -0600 Subject: [PATCH] Only create container in `AzureStorage` for write operations (#16558) * Remove unused constants * Refactor getBlockBlobLength * Better link * Upper-case log * Mark defaultStorageAccount nullable This is the case if you do not use Azure for deep-storage but ingest from Azure blobs. * Do not always create a new container if it doesn't exist Specifically, only create a container if uploading a blob or writing a blob stream * Add lots of comments, group methods * Revert "Mark defaultStorageAccount nullable" * Add mockito for junit * Add extra test * Add comment Thanks George. * Pass blockSize as Long * Test more branches... --- extensions-core/azure-extensions/pom.xml | 6 + .../data/input/azure/AzureInputSource.java | 8 +- .../azure/AzureStorageAccountInputSource.java | 12 +- .../storage/azure/AzureDataSegmentKiller.java | 1 - .../druid/storage/azure/AzureStorage.java | 455 ++++++++++++------ .../druid/storage/azure/AzureUtils.java | 2 - .../azure/output/AzureStorageConnector.java | 4 +- .../druid/storage/azure/AzureStorageTest.java | 136 +++++- .../output/AzureStorageConnectorTest.java | 2 +- 9 files changed, 428 insertions(+), 198 deletions(-) diff --git a/extensions-core/azure-extensions/pom.xml b/extensions-core/azure-extensions/pom.xml index 2955d88c406..88a1198b61e 100644 --- a/extensions-core/azure-extensions/pom.xml +++ b/extensions-core/azure-extensions/pom.xml @@ -190,6 +190,12 @@ mockito-core test + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java index 339ec18ec3a..07a4c23370e 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java @@ -20,7 +20,6 @@ package org.apache.druid.data.input.azure; import com.azure.storage.blob.models.BlobStorageException; -import com.azure.storage.blob.specialized.BlockBlobClient; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -161,12 +160,7 @@ public class AzureInputSource extends CloudObjectInputSource public long getObjectSize(CloudObjectLocation location) { try { - final BlockBlobClient blobWithAttributes = storage.getBlockBlobReferenceWithAttributes( - location.getBucket(), - location.getPath() - ); - - return blobWithAttributes.getProperties().getBlobSize(); + return storage.getBlockBlobLength(location.getBucket(), location.getPath()); } catch (BlobStorageException e) { throw new RuntimeException(e); diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java index f4f59325306..f84f0474bfc 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureStorageAccountInputSource.java @@ -20,7 +20,6 @@ package org.apache.druid.data.input.azure; import com.azure.storage.blob.models.BlobStorageException; -import com.azure.storage.blob.specialized.BlockBlobClient; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -180,12 +179,7 @@ public class AzureStorageAccountInputSource extends CloudObjectInputSource try { AzureStorage azureStorage = new AzureStorage(azureIngestClientFactory, location.getBucket()); Pair locationInfo = getContainerAndPathFromObjectLocation(location); - final BlockBlobClient blobWithAttributes = azureStorage.getBlockBlobReferenceWithAttributes( - locationInfo.lhs, - locationInfo.rhs - ); - - return blobWithAttributes.getProperties().getBlobSize(); + return azureStorage.getBlockBlobLength(locationInfo.lhs, locationInfo.rhs); } catch (BlobStorageException e) { throw new RuntimeException(e); @@ -246,7 +240,9 @@ public class AzureStorageAccountInputSource extends CloudObjectInputSource public static Pair getContainerAndPathFromObjectLocation(CloudObjectLocation location) { String[] pathParts = location.getPath().split("/", 2); - // If there is no path specified, use a empty path as azure will throw a exception that is more clear than a index error. + + // If there is no path specified, use an empty path as Azure will throw an exception + // that is more clear than an index error. return Pair.of(pathParts[0], pathParts.length == 2 ? pathParts[1] : ""); } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java index dd609e2f357..ce8a6cdd388 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentKiller.java @@ -43,7 +43,6 @@ import java.util.Map; public class AzureDataSegmentKiller implements DataSegmentKiller { private static final Logger log = new Logger(AzureDataSegmentKiller.class); - private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id private final AzureDataSegmentConfig segmentConfig; private final AzureInputDataConfig inputDataConfig; diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java index a7419181364..011812843f2 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java @@ -21,7 +21,6 @@ package org.apache.druid.storage.azure; import com.azure.core.http.rest.PagedIterable; import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobServiceClient; import com.azure.storage.blob.batch.BlobBatchClient; import com.azure.storage.blob.batch.BlobBatchStorageException; import com.azure.storage.blob.models.BlobItem; @@ -34,7 +33,6 @@ import com.azure.storage.blob.options.BlobInputStreamOptions; import com.azure.storage.blob.options.BlockBlobOutputStreamOptions; import com.azure.storage.blob.specialized.BlockBlobClient; import com.azure.storage.common.Utility; -import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Streams; import org.apache.druid.java.util.common.RE; @@ -52,259 +50,402 @@ import java.util.List; import java.util.stream.Collectors; /** - * Abstracts the Azure storage layer. Makes direct calls to Azure file system. + * Abstracts the Azure storage layer, wrapping the Azure Java SDK. + *

+ * When using a service client ({@link com.azure.storage.blob.BlobServiceClient}, methods that rely on a container to + * exist should use {@link com.azure.storage.blob.BlobServiceClient#getBlobContainerClient}. + *

+ * If a method relies on a container to be created if it doesn't exist, call + * {@link com.azure.storage.blob.BlobServiceClient#createBlobContainerIfNotExists(String)}. */ public class AzureStorage { - // Default value from Azure library private static final int DELTA_BACKOFF_MS = 30_000; - // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id + // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#request-body private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; - - private static final Logger log = new Logger(AzureStorage.class); + private static final Logger LOG = new Logger(AzureStorage.class); private final AzureClientFactory azureClientFactory; private final String defaultStorageAccount; public AzureStorage( - AzureClientFactory azureClientFactory, - @Nullable String defaultStorageAccount + final AzureClientFactory azureClientFactory, + final String defaultStorageAccount ) { this.azureClientFactory = azureClientFactory; this.defaultStorageAccount = defaultStorageAccount; } - public List emptyCloudBlobDirectory(final String containerName, final String virtualDirPath) + /** + * See {@link AzureStorage#emptyCloudBlobDirectory(String, String, Integer)} for details. + */ + public List emptyCloudBlobDirectory(final String containerName, @Nullable final String prefix) throws BlobStorageException { - return emptyCloudBlobDirectory(containerName, virtualDirPath, null); + return emptyCloudBlobDirectory(containerName, prefix, null); } - public List emptyCloudBlobDirectory(final String containerName, final String virtualDirPath, final Integer maxAttempts) - throws BlobStorageException + /** + * Delete all blobs under the given prefix. + * + * @param containerName The name of the storage container. + * @param prefix (Optional) The Azure storage prefix to delete blobs for. + * If null, deletes all blobs in the storage container. + * @param maxAttempts (Optional) Number of attempts to retry in case an API call fails. + * If null, defaults to the system default (`druid.azure.maxTries`). + * + * @return The list of blobs deleted. + */ + public List emptyCloudBlobDirectory( + final String containerName, + @Nullable final String prefix, + @Nullable final Integer maxAttempts + ) throws BlobStorageException { - List deletedFiles = new ArrayList<>(); - BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + final BlobContainerClient blobContainerClient = azureClientFactory + .getBlobServiceClient(maxAttempts, defaultStorageAccount) + .getBlobContainerClient(containerName); // https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-list The new client uses flat listing by default. - PagedIterable blobItems = blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(virtualDirPath), Duration.ofMillis(DELTA_BACKOFF_MS)); + final PagedIterable blobItems = blobContainerClient.listBlobs( + new ListBlobsOptions().setPrefix(prefix), + Duration.ofMillis(DELTA_BACKOFF_MS) + ); - blobItems.iterableByPage().forEach(page -> { - page.getElements().forEach(blob -> { - if (blobContainerClient.getBlobClient(blob.getName()).deleteIfExists()) { - deletedFiles.add(blob.getName()); - } - }); - }); + final List deletedFiles = new ArrayList<>(); + blobItems.iterableByPage().forEach( + page -> page.getElements().forEach( + blob -> { + if (blobContainerClient.getBlobClient(blob.getName()).deleteIfExists()) { + deletedFiles.add(blob.getName()); + } + } + ) + ); if (deletedFiles.isEmpty()) { - log.warn("No files were deleted on the following Azure path: [%s]", virtualDirPath); + LOG.warn("No files were deleted on the following Azure path: [%s]", prefix); } return deletedFiles; } - public void uploadBlockBlob(final File file, final String containerName, final String blobPath, final Integer maxAttempts) - throws IOException, BlobStorageException - { - BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); - - try (FileInputStream stream = new FileInputStream(file)) { - // By default this creates a Block blob, no need to use a specific Block blob client. - // We also need to urlEncode the path to handle special characters. - // Set overwrite to true to keep behavior more similar to s3Client.putObject - blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).upload(stream, file.length(), true); - } - } - + /** + * Creates and opens an output stream to write data to the block blob. + *

+ * If the blob already exists, an exception will be thrown. + * + * @param containerName The name of the storage container. + * @param blobName The name of the blob within the container. + * @param blockSize (Optional) The block size to use when writing the blob. + * If null, the default block size will be used. + * @param maxAttempts (Optional) The maximum number of attempts to retry the upload in case of failure. + * If null, the default value from the system configuration (`druid.azure.maxTries`) will be used. + * + * @return An OutputStream for writing the blob. + */ public OutputStream getBlockBlobOutputStream( final String containerName, - final String blobPath, - @Nullable final Integer streamWriteSizeBytes, - Integer maxAttempts + final String blobName, + @Nullable final Long blockSize, + @Nullable final Integer maxAttempts ) throws BlobStorageException { - BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); - BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); + final BlockBlobClient blockBlobClient = azureClientFactory + .getBlobServiceClient(maxAttempts, defaultStorageAccount) + .createBlobContainerIfNotExists(containerName) + .getBlobClient(Utility.urlEncode(blobName)) + .getBlockBlobClient(); + // TODO based on the usage here, it might be better to overwrite the existing blob instead; that's what StorageConnector#write documents it does if (blockBlobClient.exists()) { throw new RE("Reference already exists"); } - BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions(); - if (streamWriteSizeBytes != null) { - options.setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(streamWriteSizeBytes.longValue())); + + final BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions(); + if (blockSize != null) { + options.setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(blockSize)); } + return blockBlobClient.getBlobOutputStream(options); } - // There's no need to download attributes with the new azure clients, they will get fetched as needed. - public BlockBlobClient getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath) - throws BlobStorageException + /** + * Gets the length of the specified block blob. + * + * @param containerName The name of the storage container. + * @param blobName The name of the blob within the container. + * + * @return The length of the blob in bytes. + */ + public long getBlockBlobLength(final String containerName, final String blobName) throws BlobStorageException { - return getOrCreateBlobContainerClient(containerName).getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); + return azureClientFactory + .getBlobServiceClient(null, defaultStorageAccount) + .getBlobContainerClient(containerName) + .getBlobClient(Utility.urlEncode(blobName)) + .getBlockBlobClient() + .getProperties() + .getBlobSize(); } - public long getBlockBlobLength(final String containerName, final String blobPath) + /** + * See {@link AzureStorage#getBlockBlobInputStream(long, Long, String, String, Integer)} for details. + */ + public InputStream getBlockBlobInputStream(final String containerName, final String blobName) throws BlobStorageException { - return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getBlobSize(); + return getBlockBlobInputStream(0L, containerName, blobName); } - public InputStream getBlockBlobInputStream(final String containerName, final String blobPath) + /** + * See {@link AzureStorage#getBlockBlobInputStream(long, Long, String, String, Integer)} for details. + */ + public InputStream getBlockBlobInputStream(final long offset, final String containerName, final String blobName) throws BlobStorageException { - return getBlockBlobInputStream(0L, containerName, blobPath); + return getBlockBlobInputStream(offset, null, containerName, blobName); } - public InputStream getBlockBlobInputStream(long offset, final String containerName, final String blobPath) - throws BlobStorageException + /** + * See {@link AzureStorage#getBlockBlobInputStream(long, Long, String, String, Integer)} for details. + */ + public InputStream getBlockBlobInputStream( + final long offset, + @Nullable final Long length, + final String containerName, + final String blobName + ) throws BlobStorageException { - return getBlockBlobInputStream(offset, null, containerName, blobPath); + return getBlockBlobInputStream(offset, length, containerName, blobName, null); } - public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath) - throws BlobStorageException + /** + * Gets an InputStream for reading the contents of the specified block blob. + * + * @param containerName The name of the storage container. + * @param blobName The name of the blob within the container. + * + * @return An InputStream for reading the blob. + */ + public InputStream getBlockBlobInputStream( + final long offset, + @Nullable final Long length, + final String containerName, + final String blobName, + @Nullable final Integer maxAttempts + ) throws BlobStorageException { - return getBlockBlobInputStream(offset, length, containerName, blobPath, null); - } - - public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath, Integer maxAttempts) - throws BlobStorageException - { - BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); - return blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length))); + return azureClientFactory + .getBlobServiceClient(maxAttempts, defaultStorageAccount) + .getBlobContainerClient(containerName) + .getBlobClient(Utility.urlEncode(blobName)) + .openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length))); } /** * Deletes multiple files from the specified container. * - * @param containerName The name of the container from which files will be deleted. + * @param containerName The name of the storage container. * @param paths An iterable of file paths to be deleted. - * @param maxAttempts (Optional) The maximum number of attempts to delete each file. - * If null, the system default number of attempts will be used. + * @param maxAttempts (Optional) Number of attempts to retry in case an API call fails. + * If null, defaults to the system default (`druid.azure.maxTries`). + * * @return true if all files were successfully deleted; false otherwise. */ - public boolean batchDeleteFiles(String containerName, Iterable paths, @Nullable Integer maxAttempts) - throws BlobBatchStorageException + public boolean batchDeleteFiles( + final String containerName, + final Iterable paths, + @Nullable final Integer maxAttempts + ) throws BlobBatchStorageException { - BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + BlobContainerClient blobContainerClient = azureClientFactory + .getBlobServiceClient(maxAttempts, defaultStorageAccount) + .getBlobContainerClient(containerName); + BlobBatchClient blobBatchClient = azureClientFactory.getBlobBatchClient(blobContainerClient); - List blobUris = Streams.stream(paths).map(path -> blobContainerClient.getBlobContainerUrl() + "/" + path).collect(Collectors.toList()); + List blobUris = Streams.stream(paths) + .map(path -> blobContainerClient.getBlobContainerUrl() + "/" + path) + .collect(Collectors.toList()); + boolean hadException = false; - List> keysChunks = Lists.partition( - blobUris, - MAX_MULTI_OBJECT_DELETE_SIZE - ); + List> keysChunks = Lists.partition(blobUris, MAX_MULTI_OBJECT_DELETE_SIZE); for (List chunkOfKeys : keysChunks) { try { - log.info( - "Removing from container [%s] the following files: [%s]", - containerName, - chunkOfKeys - ); - // We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure. - blobBatchClient.deleteBlobs( - chunkOfKeys, - DeleteSnapshotsOptionType.INCLUDE - ).forEach(response -> - log.debug("Deleting blob with URL %s completed with status code %d%n", - response.getRequest().getUrl(), response.getStatusCode()) - ); + LOG.info("Removing from container [%s] the following files: [%s]", containerName, chunkOfKeys); + + // We have to call forEach on the response because this is the only way azure batch will throw an exception on an operation failure. + blobBatchClient.deleteBlobs(chunkOfKeys, DeleteSnapshotsOptionType.INCLUDE).forEach(response -> LOG.debug( + "Deleting blob with URL %s completed with status code %d%n", + response.getRequest().getUrl(), + response.getStatusCode() + )); } catch (BlobStorageException | BlobBatchStorageException e) { hadException = true; - log.noStackTrace().warn(e, - "Unable to delete from container [%s], the following keys [%s]", - containerName, - chunkOfKeys + LOG.noStackTrace().warn( + e, + "Unable to delete from container [%s], the following keys [%s]", + containerName, + chunkOfKeys ); } catch (Exception e) { hadException = true; - log.noStackTrace().warn(e, - "Unexpected exception occurred when deleting from container [%s], the following keys [%s]", - containerName, - chunkOfKeys + LOG.noStackTrace().warn( + e, + "Unexpected exception occurred when deleting from container [%s], the following keys [%s]", + containerName, + chunkOfKeys ); } } + return !hadException; } - public List listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts) - throws BlobStorageException + /** + * See {@link AzureStorage#getBlockBlobExists(String, String, Integer)} for details. + */ + public boolean getBlockBlobExists(final String container, final String blobName) throws BlobStorageException { - List files = new ArrayList<>(); - BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); + return getBlockBlobExists(container, blobName, null); + } - PagedIterable blobItems = blobContainerClient.listBlobs( - new ListBlobsOptions().setPrefix(virtualDirPath), - Duration.ofMillis(DELTA_BACKOFF_MS) - ); + /** + * Checks if the specified block blob exists in the given Azure Blob Storage container. + * + * @param container The name of the Azure Blob Storage container. + * @param blobName The name of the blob within the container. + * @param maxAttempts (Optional) The maximum number of attempts to retry the existence check in case of failure. + * If null, the default value from the system configuration (`druid.azure.maxTries`) will be used. + * @return `true` if the block blob exists, `false` otherwise. + * @throws BlobStorageException If there is an error checking the existence of the blob. + */ + public boolean getBlockBlobExists( + final String container, + final String blobName, + @Nullable final Integer maxAttempts + ) throws BlobStorageException + { + return azureClientFactory + .getBlobServiceClient(maxAttempts, defaultStorageAccount) + .getBlobContainerClient(container) + .getBlobClient(Utility.urlEncode(blobName)) + .exists(); + } + /** + * Lists the blobs in the specified storage container, optionally matching a given prefix. + * + * @param storageAccount The name of the storage account. + * @param containerName The name of the storage container. + * @param prefix (Optional) The Azure storage prefix to list blobs for. + * If null, lists all blobs in the storage container. + * @param maxResults (Optional) The maximum number of results to return per page. + * If null, defaults to the API default (5000). + * @param maxAttempts (Optional) The maximum number of attempts to retry the list operation in case of failure. + * If null, the default value from the system configuration (`druid.azure.maxTries`) will be used. + * + * @return Returns a lazy loaded list of blobs in this container. + */ + public PagedIterable listBlobsWithPrefixInContainerSegmented( + final String storageAccount, + final String containerName, + @Nullable final String prefix, + @Nullable final Integer maxResults, + @Nullable final Integer maxAttempts + ) throws BlobStorageException + { + final ListBlobsOptions listOptions = new ListBlobsOptions(); + + if (maxResults != null) { + listOptions.setMaxResultsPerPage(maxResults); + } + + if (prefix != null) { + listOptions.setPrefix(prefix); + } + + return azureClientFactory + .getBlobServiceClient(maxAttempts, storageAccount) + .getBlobContainerClient(containerName) + .listBlobs(listOptions, Duration.ofMillis(DELTA_BACKOFF_MS)); + } + + /** + * Lists the blobs in the specified storage container, optionally matching a given prefix. + * + * @param containerName The name of the storage container. + * @param prefix (Optional) The Azure storage prefix to list blobs for. + * If null, lists all blobs in the storage container. + * @param maxResults (Optional) The maximum number of results to return per page. + * If null, defaults to the API default (5000). + * @param maxAttempts (Optional) The maximum number of attempts to retry the list operation in case of failure. + * If null, the default value from the system configuration (`druid.azure.maxTries`) will be used. + * + * @return A list of blob names in this container. + */ + public List listBlobs( + final String containerName, + @Nullable final String prefix, + @Nullable final Integer maxResults, + @Nullable final Integer maxAttempts + ) throws BlobStorageException + { + final ListBlobsOptions listOptions = new ListBlobsOptions().setPrefix(prefix); + + if (maxResults != null) { + listOptions.setMaxResultsPerPage(maxResults); + } + + if (prefix != null) { + listOptions.setPrefix(prefix); + } + + final PagedIterable blobItems = azureClientFactory + .getBlobServiceClient(maxAttempts, defaultStorageAccount) + .getBlobContainerClient(containerName) + .listBlobs(listOptions, Duration.ofMillis(DELTA_BACKOFF_MS)); + + final List files = new ArrayList<>(); blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob -> files.add(blob.getName()))); return files; } - public boolean getBlockBlobExists(String container, String blobPath) throws BlobStorageException - { - return getBlockBlobExists(container, blobPath, null); - } - - - public boolean getBlockBlobExists(String container, String blobPath, Integer maxAttempts) - throws BlobStorageException - { - return getOrCreateBlobContainerClient(container, maxAttempts).getBlobClient(Utility.urlEncode(blobPath)).exists(); - } - - @VisibleForTesting - BlobServiceClient getBlobServiceClient(Integer maxAttempts) - { - return azureClientFactory.getBlobServiceClient(maxAttempts, defaultStorageAccount); - } - - @VisibleForTesting - BlobServiceClient getBlobServiceClient(String storageAccount, Integer maxAttempts) - { - return azureClientFactory.getBlobServiceClient(maxAttempts, storageAccount); - } - - // This method is used in AzureCloudBlobIterator in a method where one azureStorage instance might need to list from multiple - // storage accounts, so storageAccount is a valid parameter. - @VisibleForTesting - PagedIterable listBlobsWithPrefixInContainerSegmented( - final String storageAccount, + /** + * Creates a new blob, or updates the content of an existing blob. + * + * @param file The file to write to the blob. + * @param containerName The name of the storage container. + * @param blobName The blob name to write the file to. + * @param maxAttempts (Optional) Number of attempts to retry in case an API call fails. + * If null, defaults to the system default (`druid.azure.maxTries`). + */ + public void uploadBlockBlob( + final File file, final String containerName, - final String prefix, - int maxResults, - Integer maxAttempts - ) throws BlobStorageException + final String blobName, + @Nullable final Integer maxAttempts + ) throws IOException, BlobStorageException { - BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(storageAccount, containerName, maxAttempts); - return blobContainerClient.listBlobs( - new ListBlobsOptions().setPrefix(prefix).setMaxResultsPerPage(maxResults), - Duration.ofMillis(DELTA_BACKOFF_MS) - ); - } + final BlobContainerClient blobContainerClient = azureClientFactory + .getBlobServiceClient(maxAttempts, defaultStorageAccount) + .createBlobContainerIfNotExists(containerName); - private BlobContainerClient getOrCreateBlobContainerClient(final String containerName) - { - return getBlobServiceClient(null).createBlobContainerIfNotExists(containerName); - } + try (FileInputStream stream = new FileInputStream(file)) { + blobContainerClient + // Creates a blob by default, no need to use a specific blob client. + // We also need to urlEncode the path to handle special characters. + .getBlobClient(Utility.urlEncode(blobName)) - private BlobContainerClient getOrCreateBlobContainerClient(final String containerName, final Integer maxRetries) - { - return getBlobServiceClient(maxRetries).createBlobContainerIfNotExists(containerName); - } - - private BlobContainerClient getOrCreateBlobContainerClient(final String storageAccount, final String containerName, final Integer maxRetries) - { - return getBlobServiceClient(storageAccount, maxRetries).createBlobContainerIfNotExists(containerName); + // Set overwrite to true to keep behavior more similar to s3Client.putObject. + .upload(stream, file.length(), true); + } } } diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java index 2f6d07d542e..2760647073a 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java @@ -37,8 +37,6 @@ import java.util.concurrent.TimeoutException; */ public class AzureUtils { - - public static final String DEFAULT_AZURE_ENDPOINT_SUFFIX = "core.windows.net"; @VisibleForTesting static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net"; diff --git a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java index be7041d7a99..2d6276c5a6d 100644 --- a/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java +++ b/extensions-core/azure-extensions/src/main/java/org/apache/druid/storage/azure/output/AzureStorageConnector.java @@ -140,7 +140,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector paths; try { - paths = azureStorage.listDir(config.getContainer(), prefixBasePath, config.getMaxRetry()); + paths = azureStorage.listBlobs(config.getContainer(), prefixBasePath, null, config.getMaxRetry()); } catch (BlobStorageException e) { throw new IOException(e); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java index b61e7234014..3d76719732a 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageTest.java @@ -29,15 +29,25 @@ import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobItemProperties; import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.models.DeleteSnapshotsOptionType; +import com.azure.storage.blob.specialized.BlockBlobAsyncClient; +import com.azure.storage.blob.specialized.BlockBlobClient; import com.google.common.collect.ImmutableList; import org.apache.druid.common.guava.SettableSupplier; +import org.apache.druid.java.util.common.RE; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.NullSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; +import org.mockito.Mock; import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -46,32 +56,112 @@ import java.nio.file.Path; import java.util.ArrayList; import java.util.List; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; - -// Using Mockito for the whole test class since azure classes (e.g. BlobContainerClient) are final and can't be mocked with EasyMock +/** + * Using Mockito for the whole test class since Azure classes (e.g. BlobContainerClient) are final + * and can't be mocked with EasyMock. + */ +@ExtendWith(MockitoExtension.class) public class AzureStorageTest { - AzureStorage azureStorage; - BlobClient blobClient = Mockito.mock(BlobClient.class); - BlobServiceClient blobServiceClient = Mockito.mock(BlobServiceClient.class); - BlobContainerClient blobContainerClient = Mockito.mock(BlobContainerClient.class); - AzureClientFactory azureClientFactory = Mockito.mock(AzureClientFactory.class); - private final String STORAGE_ACCOUNT = "storageAccount"; private final String CONTAINER = "container"; private final String BLOB_NAME = "blobName"; + private AzureStorage azureStorage; + + @Mock + private AzureClientFactory azureClientFactory; + @Mock + private BlockBlobClient blockBlobClient; + @Mock + private BlobClient blobClient; + @Mock + private BlobContainerClient blobContainerClient; + @Mock + private BlobServiceClient blobServiceClient; + @BeforeEach public void setup() throws BlobStorageException { azureStorage = new AzureStorage(azureClientFactory, STORAGE_ACCOUNT); } + @ParameterizedTest + @ValueSource(longs = {-1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES_LONG + 1}) + public void testGetBlockBlockOutputStream_blockSizeOutOfBoundsException(final long blockSize) + { + Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); + Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(BLOB_NAME); + Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient(); + + final IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> azureStorage.getBlockBlobOutputStream( + CONTAINER, + BLOB_NAME, + blockSize, + null + ) + ); + + assertEquals( + "The value of the parameter 'blockSize' should be between 1 and 4194304000.", + exception.getMessage() + ); + } + @Test - public void testListDir_retriable() throws BlobStorageException + public void testGetBlockBlockOutputStream_blobAlreadyExistsException() + { + Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); + Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(BLOB_NAME); + Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient(); + Mockito.doReturn(true).when(blockBlobClient).exists(); + + final RE exception = assertThrows( + RE.class, + () -> azureStorage.getBlockBlobOutputStream( + CONTAINER, + BLOB_NAME, + 100L, + null + ) + ); + + assertEquals( + "Reference already exists", + exception.getMessage() + ); + } + + @ParameterizedTest + @NullSource + @ValueSource(longs = {1, 100}) + public void testGetBlockBlockOutputStream_success(@Nullable final Long blockSize) + { + Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); + Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(BLOB_NAME); + Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient(); + + assertDoesNotThrow(() -> azureStorage.getBlockBlobOutputStream( + CONTAINER, + BLOB_NAME, + blockSize, + null + )); + } + + @Test + public void testListBlobs_retriable() throws BlobStorageException { BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L)); SettableSupplier> supplier = new SettableSupplier<>(); @@ -81,16 +171,19 @@ public class AzureStorageTest ArgumentMatchers.any(), ArgumentMatchers.any() ); - Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER); final Integer maxAttempts = 3; Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(maxAttempts, STORAGE_ACCOUNT); - assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", maxAttempts)); + assertEquals( + ImmutableList.of(BLOB_NAME), + azureStorage.listBlobs(CONTAINER, "", null, maxAttempts) + ); } @Test - public void testListDir_nullMaxAttempts() throws BlobStorageException + public void testListBlobs_nullMaxAttempts() throws BlobStorageException { BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L)); SettableSupplier> supplier = new SettableSupplier<>(); @@ -100,10 +193,13 @@ public class AzureStorageTest ArgumentMatchers.any(), ArgumentMatchers.any() ); - Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); - assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", null)); + assertEquals( + ImmutableList.of(BLOB_NAME), + azureStorage.listBlobs(CONTAINER, "", null, null) + ); } @Test @@ -118,7 +214,7 @@ public class AzureStorageTest ArgumentMatchers.any(), ArgumentMatchers.any() ); - Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(3, storageAccountCustom); azureStorage.listBlobsWithPrefixInContainerSegmented( @@ -143,7 +239,7 @@ public class AzureStorageTest ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl(); - Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient); Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs( @@ -167,11 +263,11 @@ public class AzureStorageTest ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl(); - Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient); Mockito.doThrow(new RuntimeException()).when(blobBatchClient).deleteBlobs( - captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE) + captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE) ); boolean deleteSuccessful = azureStorage.batchDeleteFiles(CONTAINER, ImmutableList.of(BLOB_NAME), null); @@ -192,11 +288,11 @@ public class AzureStorageTest ArgumentCaptor> captor = ArgumentCaptor.forClass(List.class); Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl(); - Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); + Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient); Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs( - captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE) + captor.capture(), ArgumentMatchers.eq(DeleteSnapshotsOptionType.INCLUDE) ); diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java index 5219f2b5962..ad9c627f036 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorTest.java @@ -195,7 +195,7 @@ public class AzureStorageConnectorTest public void testListDir() throws BlobStorageException, IOException { EasyMock.reset(azureStorage); - EasyMock.expect(azureStorage.listDir(EasyMock.anyString(), EasyMock.anyString(), EasyMock.anyInt())) + EasyMock.expect(azureStorage.listBlobs(EasyMock.anyString(), EasyMock.anyString(), EasyMock.anyInt(), EasyMock.anyInt())) .andReturn(ImmutableList.of(PREFIX + "/x/y/z/" + TEST_FILE, PREFIX + "/p/q/r/" + TEST_FILE)); EasyMock.replay(azureStorage); List ret = Lists.newArrayList(storageConnector.listDir(""));