Only create container in `AzureStorage` for write operations (#16558)

* Remove unused constants

* Refactor getBlockBlobLength

* Better link

* Upper-case log

* Mark defaultStorageAccount nullable

This is the case if you do not use Azure for deep-storage but ingest from Azure blobs.

* Do not always create a new container if it doesn't exist

Specifically, only create a container if uploading a blob or writing a blob stream

* Add lots of comments, group methods

* Revert "Mark defaultStorageAccount nullable"

* Add mockito for junit

* Add extra test

* Add comment

Thanks George.

* Pass blockSize as Long

* Test more branches...
This commit is contained in:
Andreas Maechler 2024-06-07 10:47:51 -06:00 committed by GitHub
parent efe9079f0a
commit e6a82e8a11
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 428 additions and 198 deletions

View File

@ -190,6 +190,12 @@
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -20,7 +20,6 @@
package org.apache.druid.data.input.azure; package org.apache.druid.data.input.azure;
import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
@ -161,12 +160,7 @@ public class AzureInputSource extends CloudObjectInputSource
public long getObjectSize(CloudObjectLocation location) public long getObjectSize(CloudObjectLocation location)
{ {
try { try {
final BlockBlobClient blobWithAttributes = storage.getBlockBlobReferenceWithAttributes( return storage.getBlockBlobLength(location.getBucket(), location.getPath());
location.getBucket(),
location.getPath()
);
return blobWithAttributes.getProperties().getBlobSize();
} }
catch (BlobStorageException e) { catch (BlobStorageException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -20,7 +20,6 @@
package org.apache.druid.data.input.azure; package org.apache.druid.data.input.azure;
import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnore;
@ -180,12 +179,7 @@ public class AzureStorageAccountInputSource extends CloudObjectInputSource
try { try {
AzureStorage azureStorage = new AzureStorage(azureIngestClientFactory, location.getBucket()); AzureStorage azureStorage = new AzureStorage(azureIngestClientFactory, location.getBucket());
Pair<String, String> locationInfo = getContainerAndPathFromObjectLocation(location); Pair<String, String> locationInfo = getContainerAndPathFromObjectLocation(location);
final BlockBlobClient blobWithAttributes = azureStorage.getBlockBlobReferenceWithAttributes( return azureStorage.getBlockBlobLength(locationInfo.lhs, locationInfo.rhs);
locationInfo.lhs,
locationInfo.rhs
);
return blobWithAttributes.getProperties().getBlobSize();
} }
catch (BlobStorageException e) { catch (BlobStorageException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
@ -246,7 +240,9 @@ public class AzureStorageAccountInputSource extends CloudObjectInputSource
public static Pair<String, String> getContainerAndPathFromObjectLocation(CloudObjectLocation location) public static Pair<String, String> getContainerAndPathFromObjectLocation(CloudObjectLocation location)
{ {
String[] pathParts = location.getPath().split("/", 2); String[] pathParts = location.getPath().split("/", 2);
// If there is no path specified, use a empty path as azure will throw a exception that is more clear than a index error.
// If there is no path specified, use an empty path as Azure will throw an exception
// that is more clear than an index error.
return Pair.of(pathParts[0], pathParts.length == 2 ? pathParts[1] : ""); return Pair.of(pathParts[0], pathParts.length == 2 ? pathParts[1] : "");
} }
} }

View File

@ -43,7 +43,6 @@ import java.util.Map;
public class AzureDataSegmentKiller implements DataSegmentKiller public class AzureDataSegmentKiller implements DataSegmentKiller
{ {
private static final Logger log = new Logger(AzureDataSegmentKiller.class); private static final Logger log = new Logger(AzureDataSegmentKiller.class);
private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
private final AzureDataSegmentConfig segmentConfig; private final AzureDataSegmentConfig segmentConfig;
private final AzureInputDataConfig inputDataConfig; private final AzureInputDataConfig inputDataConfig;

View File

@ -21,7 +21,6 @@ package org.apache.druid.storage.azure;
import com.azure.core.http.rest.PagedIterable; import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient; import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatchClient; import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchStorageException; import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobItem; import com.azure.storage.blob.models.BlobItem;
@ -34,7 +33,6 @@ import com.azure.storage.blob.options.BlobInputStreamOptions;
import com.azure.storage.blob.options.BlockBlobOutputStreamOptions; import com.azure.storage.blob.options.BlockBlobOutputStreamOptions;
import com.azure.storage.blob.specialized.BlockBlobClient; import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.Utility; import com.azure.storage.common.Utility;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Streams; import com.google.common.collect.Streams;
import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RE;
@ -52,172 +50,245 @@ import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
* Abstracts the Azure storage layer. Makes direct calls to Azure file system. * Abstracts the Azure storage layer, wrapping the Azure Java SDK.
* <p>
* When using a service client ({@link com.azure.storage.blob.BlobServiceClient}, methods that rely on a container to
* exist should use {@link com.azure.storage.blob.BlobServiceClient#getBlobContainerClient}.
* <p>
* If a method relies on a container to be created if it doesn't exist, call
* {@link com.azure.storage.blob.BlobServiceClient#createBlobContainerIfNotExists(String)}.
*/ */
public class AzureStorage public class AzureStorage
{ {
// Default value from Azure library // Default value from Azure library
private static final int DELTA_BACKOFF_MS = 30_000; private static final int DELTA_BACKOFF_MS = 30_000;
// https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id // https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch#request-body
private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256; private static final Integer MAX_MULTI_OBJECT_DELETE_SIZE = 256;
private static final Logger LOG = new Logger(AzureStorage.class);
private static final Logger log = new Logger(AzureStorage.class);
private final AzureClientFactory azureClientFactory; private final AzureClientFactory azureClientFactory;
private final String defaultStorageAccount; private final String defaultStorageAccount;
public AzureStorage( public AzureStorage(
AzureClientFactory azureClientFactory, final AzureClientFactory azureClientFactory,
@Nullable String defaultStorageAccount final String defaultStorageAccount
) )
{ {
this.azureClientFactory = azureClientFactory; this.azureClientFactory = azureClientFactory;
this.defaultStorageAccount = defaultStorageAccount; this.defaultStorageAccount = defaultStorageAccount;
} }
public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath) /**
* See {@link AzureStorage#emptyCloudBlobDirectory(String, String, Integer)} for details.
*/
public List<String> emptyCloudBlobDirectory(final String containerName, @Nullable final String prefix)
throws BlobStorageException throws BlobStorageException
{ {
return emptyCloudBlobDirectory(containerName, virtualDirPath, null); return emptyCloudBlobDirectory(containerName, prefix, null);
} }
public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath, final Integer maxAttempts) /**
throws BlobStorageException * Delete all blobs under the given prefix.
*
* @param containerName The name of the storage container.
* @param prefix (Optional) The Azure storage prefix to delete blobs for.
* If null, deletes all blobs in the storage container.
* @param maxAttempts (Optional) Number of attempts to retry in case an API call fails.
* If null, defaults to the system default (`druid.azure.maxTries`).
*
* @return The list of blobs deleted.
*/
public List<String> emptyCloudBlobDirectory(
final String containerName,
@Nullable final String prefix,
@Nullable final Integer maxAttempts
) throws BlobStorageException
{ {
List<String> deletedFiles = new ArrayList<>(); final BlobContainerClient blobContainerClient = azureClientFactory
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); .getBlobServiceClient(maxAttempts, defaultStorageAccount)
.getBlobContainerClient(containerName);
// https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-list The new client uses flat listing by default. // https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-list The new client uses flat listing by default.
PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(virtualDirPath), Duration.ofMillis(DELTA_BACKOFF_MS)); final PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs(
new ListBlobsOptions().setPrefix(prefix),
Duration.ofMillis(DELTA_BACKOFF_MS)
);
blobItems.iterableByPage().forEach(page -> { final List<String> deletedFiles = new ArrayList<>();
page.getElements().forEach(blob -> { blobItems.iterableByPage().forEach(
page -> page.getElements().forEach(
blob -> {
if (blobContainerClient.getBlobClient(blob.getName()).deleteIfExists()) { if (blobContainerClient.getBlobClient(blob.getName()).deleteIfExists()) {
deletedFiles.add(blob.getName()); deletedFiles.add(blob.getName());
} }
}); }
}); )
);
if (deletedFiles.isEmpty()) { if (deletedFiles.isEmpty()) {
log.warn("No files were deleted on the following Azure path: [%s]", virtualDirPath); LOG.warn("No files were deleted on the following Azure path: [%s]", prefix);
} }
return deletedFiles; return deletedFiles;
} }
public void uploadBlockBlob(final File file, final String containerName, final String blobPath, final Integer maxAttempts) /**
throws IOException, BlobStorageException * Creates and opens an output stream to write data to the block blob.
{ * <p>
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); * If the blob already exists, an exception will be thrown.
*
try (FileInputStream stream = new FileInputStream(file)) { * @param containerName The name of the storage container.
// By default this creates a Block blob, no need to use a specific Block blob client. * @param blobName The name of the blob within the container.
// We also need to urlEncode the path to handle special characters. * @param blockSize (Optional) The block size to use when writing the blob.
// Set overwrite to true to keep behavior more similar to s3Client.putObject * If null, the default block size will be used.
blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).upload(stream, file.length(), true); * @param maxAttempts (Optional) The maximum number of attempts to retry the upload in case of failure.
} * If null, the default value from the system configuration (`druid.azure.maxTries`) will be used.
} *
* @return An OutputStream for writing the blob.
*/
public OutputStream getBlockBlobOutputStream( public OutputStream getBlockBlobOutputStream(
final String containerName, final String containerName,
final String blobPath, final String blobName,
@Nullable final Integer streamWriteSizeBytes, @Nullable final Long blockSize,
Integer maxAttempts @Nullable final Integer maxAttempts
) throws BlobStorageException ) throws BlobStorageException
{ {
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); final BlockBlobClient blockBlobClient = azureClientFactory
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); .getBlobServiceClient(maxAttempts, defaultStorageAccount)
.createBlobContainerIfNotExists(containerName)
.getBlobClient(Utility.urlEncode(blobName))
.getBlockBlobClient();
// TODO based on the usage here, it might be better to overwrite the existing blob instead; that's what StorageConnector#write documents it does
if (blockBlobClient.exists()) { if (blockBlobClient.exists()) {
throw new RE("Reference already exists"); throw new RE("Reference already exists");
} }
BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions();
if (streamWriteSizeBytes != null) { final BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions();
options.setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(streamWriteSizeBytes.longValue())); if (blockSize != null) {
options.setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(blockSize));
} }
return blockBlobClient.getBlobOutputStream(options); return blockBlobClient.getBlobOutputStream(options);
} }
// There's no need to download attributes with the new azure clients, they will get fetched as needed. /**
public BlockBlobClient getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath) * Gets the length of the specified block blob.
throws BlobStorageException *
* @param containerName The name of the storage container.
* @param blobName The name of the blob within the container.
*
* @return The length of the blob in bytes.
*/
public long getBlockBlobLength(final String containerName, final String blobName) throws BlobStorageException
{ {
return getOrCreateBlobContainerClient(containerName).getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient(); return azureClientFactory
.getBlobServiceClient(null, defaultStorageAccount)
.getBlobContainerClient(containerName)
.getBlobClient(Utility.urlEncode(blobName))
.getBlockBlobClient()
.getProperties()
.getBlobSize();
} }
public long getBlockBlobLength(final String containerName, final String blobPath) /**
* See {@link AzureStorage#getBlockBlobInputStream(long, Long, String, String, Integer)} for details.
*/
public InputStream getBlockBlobInputStream(final String containerName, final String blobName)
throws BlobStorageException throws BlobStorageException
{ {
return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getBlobSize(); return getBlockBlobInputStream(0L, containerName, blobName);
} }
public InputStream getBlockBlobInputStream(final String containerName, final String blobPath) /**
* See {@link AzureStorage#getBlockBlobInputStream(long, Long, String, String, Integer)} for details.
*/
public InputStream getBlockBlobInputStream(final long offset, final String containerName, final String blobName)
throws BlobStorageException throws BlobStorageException
{ {
return getBlockBlobInputStream(0L, containerName, blobPath); return getBlockBlobInputStream(offset, null, containerName, blobName);
} }
public InputStream getBlockBlobInputStream(long offset, final String containerName, final String blobPath) /**
throws BlobStorageException * See {@link AzureStorage#getBlockBlobInputStream(long, Long, String, String, Integer)} for details.
*/
public InputStream getBlockBlobInputStream(
final long offset,
@Nullable final Long length,
final String containerName,
final String blobName
) throws BlobStorageException
{ {
return getBlockBlobInputStream(offset, null, containerName, blobPath); return getBlockBlobInputStream(offset, length, containerName, blobName, null);
} }
public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath) /**
throws BlobStorageException * Gets an InputStream for reading the contents of the specified block blob.
*
* @param containerName The name of the storage container.
* @param blobName The name of the blob within the container.
*
* @return An InputStream for reading the blob.
*/
public InputStream getBlockBlobInputStream(
final long offset,
@Nullable final Long length,
final String containerName,
final String blobName,
@Nullable final Integer maxAttempts
) throws BlobStorageException
{ {
return getBlockBlobInputStream(offset, length, containerName, blobPath, null); return azureClientFactory
} .getBlobServiceClient(maxAttempts, defaultStorageAccount)
.getBlobContainerClient(containerName)
public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath, Integer maxAttempts) .getBlobClient(Utility.urlEncode(blobName))
throws BlobStorageException .openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length)));
{
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
return blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length)));
} }
/** /**
* Deletes multiple files from the specified container. * Deletes multiple files from the specified container.
* *
* @param containerName The name of the container from which files will be deleted. * @param containerName The name of the storage container.
* @param paths An iterable of file paths to be deleted. * @param paths An iterable of file paths to be deleted.
* @param maxAttempts (Optional) The maximum number of attempts to delete each file. * @param maxAttempts (Optional) Number of attempts to retry in case an API call fails.
* If null, the system default number of attempts will be used. * If null, defaults to the system default (`druid.azure.maxTries`).
*
* @return true if all files were successfully deleted; false otherwise. * @return true if all files were successfully deleted; false otherwise.
*/ */
public boolean batchDeleteFiles(String containerName, Iterable<String> paths, @Nullable Integer maxAttempts) public boolean batchDeleteFiles(
throws BlobBatchStorageException final String containerName,
final Iterable<String> paths,
@Nullable final Integer maxAttempts
) throws BlobBatchStorageException
{ {
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); BlobContainerClient blobContainerClient = azureClientFactory
.getBlobServiceClient(maxAttempts, defaultStorageAccount)
.getBlobContainerClient(containerName);
BlobBatchClient blobBatchClient = azureClientFactory.getBlobBatchClient(blobContainerClient); BlobBatchClient blobBatchClient = azureClientFactory.getBlobBatchClient(blobContainerClient);
List<String> blobUris = Streams.stream(paths).map(path -> blobContainerClient.getBlobContainerUrl() + "/" + path).collect(Collectors.toList()); List<String> blobUris = Streams.stream(paths)
.map(path -> blobContainerClient.getBlobContainerUrl() + "/" + path)
.collect(Collectors.toList());
boolean hadException = false; boolean hadException = false;
List<List<String>> keysChunks = Lists.partition( List<List<String>> keysChunks = Lists.partition(blobUris, MAX_MULTI_OBJECT_DELETE_SIZE);
blobUris,
MAX_MULTI_OBJECT_DELETE_SIZE
);
for (List<String> chunkOfKeys : keysChunks) { for (List<String> chunkOfKeys : keysChunks) {
try { try {
log.info( LOG.info("Removing from container [%s] the following files: [%s]", containerName, chunkOfKeys);
"Removing from container [%s] the following files: [%s]",
containerName, // We have to call forEach on the response because this is the only way azure batch will throw an exception on an operation failure.
chunkOfKeys blobBatchClient.deleteBlobs(chunkOfKeys, DeleteSnapshotsOptionType.INCLUDE).forEach(response -> LOG.debug(
); "Deleting blob with URL %s completed with status code %d%n",
// We have to call forEach on the response because this is the only way azure batch will throw an exception on a operation failure. response.getRequest().getUrl(),
blobBatchClient.deleteBlobs( response.getStatusCode()
chunkOfKeys, ));
DeleteSnapshotsOptionType.INCLUDE
).forEach(response ->
log.debug("Deleting blob with URL %s completed with status code %d%n",
response.getRequest().getUrl(), response.getStatusCode())
);
} }
catch (BlobStorageException | BlobBatchStorageException e) { catch (BlobStorageException | BlobBatchStorageException e) {
hadException = true; hadException = true;
log.noStackTrace().warn(e, LOG.noStackTrace().warn(
e,
"Unable to delete from container [%s], the following keys [%s]", "Unable to delete from container [%s], the following keys [%s]",
containerName, containerName,
chunkOfKeys chunkOfKeys
@ -225,86 +296,156 @@ public class AzureStorage
} }
catch (Exception e) { catch (Exception e) {
hadException = true; hadException = true;
log.noStackTrace().warn(e, LOG.noStackTrace().warn(
e,
"Unexpected exception occurred when deleting from container [%s], the following keys [%s]", "Unexpected exception occurred when deleting from container [%s], the following keys [%s]",
containerName, containerName,
chunkOfKeys chunkOfKeys
); );
} }
} }
return !hadException; return !hadException;
} }
public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts) /**
throws BlobStorageException * See {@link AzureStorage#getBlockBlobExists(String, String, Integer)} for details.
*/
public boolean getBlockBlobExists(final String container, final String blobName) throws BlobStorageException
{ {
List<String> files = new ArrayList<>(); return getBlockBlobExists(container, blobName, null);
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts); }
PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs( /**
new ListBlobsOptions().setPrefix(virtualDirPath), * Checks if the specified block blob exists in the given Azure Blob Storage container.
Duration.ofMillis(DELTA_BACKOFF_MS) *
); * @param container The name of the Azure Blob Storage container.
* @param blobName The name of the blob within the container.
* @param maxAttempts (Optional) The maximum number of attempts to retry the existence check in case of failure.
* If null, the default value from the system configuration (`druid.azure.maxTries`) will be used.
* @return `true` if the block blob exists, `false` otherwise.
* @throws BlobStorageException If there is an error checking the existence of the blob.
*/
public boolean getBlockBlobExists(
final String container,
final String blobName,
@Nullable final Integer maxAttempts
) throws BlobStorageException
{
return azureClientFactory
.getBlobServiceClient(maxAttempts, defaultStorageAccount)
.getBlobContainerClient(container)
.getBlobClient(Utility.urlEncode(blobName))
.exists();
}
/**
* Lists the blobs in the specified storage container, optionally matching a given prefix.
*
* @param storageAccount The name of the storage account.
* @param containerName The name of the storage container.
* @param prefix (Optional) The Azure storage prefix to list blobs for.
* If null, lists all blobs in the storage container.
* @param maxResults (Optional) The maximum number of results to return per page.
* If null, defaults to the API default (5000).
* @param maxAttempts (Optional) The maximum number of attempts to retry the list operation in case of failure.
* If null, the default value from the system configuration (`druid.azure.maxTries`) will be used.
*
* @return Returns a lazy loaded list of blobs in this container.
*/
public PagedIterable<BlobItem> listBlobsWithPrefixInContainerSegmented(
final String storageAccount,
final String containerName,
@Nullable final String prefix,
@Nullable final Integer maxResults,
@Nullable final Integer maxAttempts
) throws BlobStorageException
{
final ListBlobsOptions listOptions = new ListBlobsOptions();
if (maxResults != null) {
listOptions.setMaxResultsPerPage(maxResults);
}
if (prefix != null) {
listOptions.setPrefix(prefix);
}
return azureClientFactory
.getBlobServiceClient(maxAttempts, storageAccount)
.getBlobContainerClient(containerName)
.listBlobs(listOptions, Duration.ofMillis(DELTA_BACKOFF_MS));
}
/**
* Lists the blobs in the specified storage container, optionally matching a given prefix.
*
* @param containerName The name of the storage container.
* @param prefix (Optional) The Azure storage prefix to list blobs for.
* If null, lists all blobs in the storage container.
* @param maxResults (Optional) The maximum number of results to return per page.
* If null, defaults to the API default (5000).
* @param maxAttempts (Optional) The maximum number of attempts to retry the list operation in case of failure.
* If null, the default value from the system configuration (`druid.azure.maxTries`) will be used.
*
* @return A list of blob names in this container.
*/
public List<String> listBlobs(
final String containerName,
@Nullable final String prefix,
@Nullable final Integer maxResults,
@Nullable final Integer maxAttempts
) throws BlobStorageException
{
final ListBlobsOptions listOptions = new ListBlobsOptions().setPrefix(prefix);
if (maxResults != null) {
listOptions.setMaxResultsPerPage(maxResults);
}
if (prefix != null) {
listOptions.setPrefix(prefix);
}
final PagedIterable<BlobItem> blobItems = azureClientFactory
.getBlobServiceClient(maxAttempts, defaultStorageAccount)
.getBlobContainerClient(containerName)
.listBlobs(listOptions, Duration.ofMillis(DELTA_BACKOFF_MS));
final List<String> files = new ArrayList<>();
blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob -> files.add(blob.getName()))); blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob -> files.add(blob.getName())));
return files; return files;
} }
public boolean getBlockBlobExists(String container, String blobPath) throws BlobStorageException /**
{ * Creates a new blob, or updates the content of an existing blob.
return getBlockBlobExists(container, blobPath, null); *
} * @param file The file to write to the blob.
* @param containerName The name of the storage container.
* @param blobName The blob name to write the file to.
public boolean getBlockBlobExists(String container, String blobPath, Integer maxAttempts) * @param maxAttempts (Optional) Number of attempts to retry in case an API call fails.
throws BlobStorageException * If null, defaults to the system default (`druid.azure.maxTries`).
{ */
return getOrCreateBlobContainerClient(container, maxAttempts).getBlobClient(Utility.urlEncode(blobPath)).exists(); public void uploadBlockBlob(
} final File file,
@VisibleForTesting
BlobServiceClient getBlobServiceClient(Integer maxAttempts)
{
return azureClientFactory.getBlobServiceClient(maxAttempts, defaultStorageAccount);
}
@VisibleForTesting
BlobServiceClient getBlobServiceClient(String storageAccount, Integer maxAttempts)
{
return azureClientFactory.getBlobServiceClient(maxAttempts, storageAccount);
}
// This method is used in AzureCloudBlobIterator in a method where one azureStorage instance might need to list from multiple
// storage accounts, so storageAccount is a valid parameter.
@VisibleForTesting
PagedIterable<BlobItem> listBlobsWithPrefixInContainerSegmented(
final String storageAccount,
final String containerName, final String containerName,
final String prefix, final String blobName,
int maxResults, @Nullable final Integer maxAttempts
Integer maxAttempts ) throws IOException, BlobStorageException
) throws BlobStorageException
{ {
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(storageAccount, containerName, maxAttempts); final BlobContainerClient blobContainerClient = azureClientFactory
return blobContainerClient.listBlobs( .getBlobServiceClient(maxAttempts, defaultStorageAccount)
new ListBlobsOptions().setPrefix(prefix).setMaxResultsPerPage(maxResults), .createBlobContainerIfNotExists(containerName);
Duration.ofMillis(DELTA_BACKOFF_MS)
);
}
private BlobContainerClient getOrCreateBlobContainerClient(final String containerName) try (FileInputStream stream = new FileInputStream(file)) {
{ blobContainerClient
return getBlobServiceClient(null).createBlobContainerIfNotExists(containerName); // Creates a blob by default, no need to use a specific blob client.
} // We also need to urlEncode the path to handle special characters.
.getBlobClient(Utility.urlEncode(blobName))
private BlobContainerClient getOrCreateBlobContainerClient(final String containerName, final Integer maxRetries) // Set overwrite to true to keep behavior more similar to s3Client.putObject.
{ .upload(stream, file.length(), true);
return getBlobServiceClient(maxRetries).createBlobContainerIfNotExists(containerName);
} }
private BlobContainerClient getOrCreateBlobContainerClient(final String storageAccount, final String containerName, final Integer maxRetries)
{
return getBlobServiceClient(storageAccount, maxRetries).createBlobContainerIfNotExists(containerName);
} }
} }

View File

@ -37,8 +37,6 @@ import java.util.concurrent.TimeoutException;
*/ */
public class AzureUtils public class AzureUtils
{ {
public static final String DEFAULT_AZURE_ENDPOINT_SUFFIX = "core.windows.net";
@VisibleForTesting @VisibleForTesting
static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net"; static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net";

View File

@ -140,7 +140,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
return azureStorage.getBlockBlobOutputStream( return azureStorage.getBlockBlobOutputStream(
config.getContainer(), config.getContainer(),
objectPath(path), objectPath(path),
config.getChunkSize().getBytesInInt(), config.getChunkSize().getBytes(),
config.getMaxRetry() config.getMaxRetry()
); );
} }
@ -196,7 +196,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
final String prefixBasePath = objectPath(dirName); final String prefixBasePath = objectPath(dirName);
List<String> paths; List<String> paths;
try { try {
paths = azureStorage.listDir(config.getContainer(), prefixBasePath, config.getMaxRetry()); paths = azureStorage.listBlobs(config.getContainer(), prefixBasePath, null, config.getMaxRetry());
} }
catch (BlobStorageException e) { catch (BlobStorageException e) {
throw new IOException(e); throw new IOException(e);

View File

@ -29,15 +29,25 @@ import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties; import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobStorageException; import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.DeleteSnapshotsOptionType; import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.common.guava.SettableSupplier; import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.java.util.common.RE;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers; import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import javax.annotation.Nullable;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -46,32 +56,112 @@ import java.nio.file.Path;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
/**
// Using Mockito for the whole test class since azure classes (e.g. BlobContainerClient) are final and can't be mocked with EasyMock * Using Mockito for the whole test class since Azure classes (e.g. BlobContainerClient) are final
* and can't be mocked with EasyMock.
*/
@ExtendWith(MockitoExtension.class)
public class AzureStorageTest public class AzureStorageTest
{ {
AzureStorage azureStorage;
BlobClient blobClient = Mockito.mock(BlobClient.class);
BlobServiceClient blobServiceClient = Mockito.mock(BlobServiceClient.class);
BlobContainerClient blobContainerClient = Mockito.mock(BlobContainerClient.class);
AzureClientFactory azureClientFactory = Mockito.mock(AzureClientFactory.class);
private final String STORAGE_ACCOUNT = "storageAccount"; private final String STORAGE_ACCOUNT = "storageAccount";
private final String CONTAINER = "container"; private final String CONTAINER = "container";
private final String BLOB_NAME = "blobName"; private final String BLOB_NAME = "blobName";
private AzureStorage azureStorage;
@Mock
private AzureClientFactory azureClientFactory;
@Mock
private BlockBlobClient blockBlobClient;
@Mock
private BlobClient blobClient;
@Mock
private BlobContainerClient blobContainerClient;
@Mock
private BlobServiceClient blobServiceClient;
@BeforeEach @BeforeEach
public void setup() throws BlobStorageException public void setup() throws BlobStorageException
{ {
azureStorage = new AzureStorage(azureClientFactory, STORAGE_ACCOUNT); azureStorage = new AzureStorage(azureClientFactory, STORAGE_ACCOUNT);
} }
@ParameterizedTest
@ValueSource(longs = {-1, BlockBlobAsyncClient.MAX_STAGE_BLOCK_BYTES_LONG + 1})
public void testGetBlockBlockOutputStream_blockSizeOutOfBoundsException(final long blockSize)
{
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(BLOB_NAME);
Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient();
final IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> azureStorage.getBlockBlobOutputStream(
CONTAINER,
BLOB_NAME,
blockSize,
null
)
);
assertEquals(
"The value of the parameter 'blockSize' should be between 1 and 4194304000.",
exception.getMessage()
);
}
@Test @Test
public void testListDir_retriable() throws BlobStorageException public void testGetBlockBlockOutputStream_blobAlreadyExistsException()
{
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(BLOB_NAME);
Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient();
Mockito.doReturn(true).when(blockBlobClient).exists();
final RE exception = assertThrows(
RE.class,
() -> azureStorage.getBlockBlobOutputStream(
CONTAINER,
BLOB_NAME,
100L,
null
)
);
assertEquals(
"Reference already exists",
exception.getMessage()
);
}
@ParameterizedTest
@NullSource
@ValueSource(longs = {1, 100})
public void testGetBlockBlockOutputStream_success(@Nullable final Long blockSize)
{
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobClient).when(blobContainerClient).getBlobClient(BLOB_NAME);
Mockito.doReturn(blockBlobClient).when(blobClient).getBlockBlobClient();
assertDoesNotThrow(() -> azureStorage.getBlockBlobOutputStream(
CONTAINER,
BLOB_NAME,
blockSize,
null
));
}
@Test
public void testListBlobs_retriable() throws BlobStorageException
{ {
BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L)); BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L));
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>(); SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
@ -81,16 +171,19 @@ public class AzureStorageTest
ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
); );
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
final Integer maxAttempts = 3; final Integer maxAttempts = 3;
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(maxAttempts, STORAGE_ACCOUNT); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(maxAttempts, STORAGE_ACCOUNT);
assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", maxAttempts)); assertEquals(
ImmutableList.of(BLOB_NAME),
azureStorage.listBlobs(CONTAINER, "", null, maxAttempts)
);
} }
@Test @Test
public void testListDir_nullMaxAttempts() throws BlobStorageException public void testListBlobs_nullMaxAttempts() throws BlobStorageException
{ {
BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L)); BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L));
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>(); SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
@ -100,10 +193,13 @@ public class AzureStorageTest
ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
); );
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", null)); assertEquals(
ImmutableList.of(BLOB_NAME),
azureStorage.listBlobs(CONTAINER, "", null, null)
);
} }
@Test @Test
@ -118,7 +214,7 @@ public class AzureStorageTest
ArgumentMatchers.any(), ArgumentMatchers.any(),
ArgumentMatchers.any() ArgumentMatchers.any()
); );
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(3, storageAccountCustom); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(3, storageAccountCustom);
azureStorage.listBlobsWithPrefixInContainerSegmented( azureStorage.listBlobsWithPrefixInContainerSegmented(
@ -143,7 +239,7 @@ public class AzureStorageTest
ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class); ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl(); Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient); Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs( Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(
@ -167,7 +263,7 @@ public class AzureStorageTest
ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class); ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl(); Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient); Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doThrow(new RuntimeException()).when(blobBatchClient).deleteBlobs( Mockito.doThrow(new RuntimeException()).when(blobBatchClient).deleteBlobs(
@ -192,7 +288,7 @@ public class AzureStorageTest
ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class); ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl(); Mockito.doReturn(containerUrl).when(blobContainerClient).getBlobContainerUrl();
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER); Mockito.doReturn(blobContainerClient).when(blobServiceClient).getBlobContainerClient(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT); Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null, STORAGE_ACCOUNT);
Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient); Mockito.doReturn(blobBatchClient).when(azureClientFactory).getBlobBatchClient(blobContainerClient);
Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs( Mockito.doReturn(pagedIterable).when(blobBatchClient).deleteBlobs(

View File

@ -195,7 +195,7 @@ public class AzureStorageConnectorTest
public void testListDir() throws BlobStorageException, IOException public void testListDir() throws BlobStorageException, IOException
{ {
EasyMock.reset(azureStorage); EasyMock.reset(azureStorage);
EasyMock.expect(azureStorage.listDir(EasyMock.anyString(), EasyMock.anyString(), EasyMock.anyInt())) EasyMock.expect(azureStorage.listBlobs(EasyMock.anyString(), EasyMock.anyString(), EasyMock.anyInt(), EasyMock.anyInt()))
.andReturn(ImmutableList.of(PREFIX + "/x/y/z/" + TEST_FILE, PREFIX + "/p/q/r/" + TEST_FILE)); .andReturn(ImmutableList.of(PREFIX + "/x/y/z/" + TEST_FILE, PREFIX + "/p/q/r/" + TEST_FILE));
EasyMock.replay(azureStorage); EasyMock.replay(azureStorage);
List<String> ret = Lists.newArrayList(storageConnector.listDir("")); List<String> ret = Lists.newArrayList(storageConnector.listDir(""));