Azure client upgrade to allow identity options (#15287)

* Include new dependencies

* Mostly implemented

* More azure fixes

* Tests passing

* Unit tests running

* Test running after removing storage exception

* Happy with coverage now

* Add more tests

* fix client factory

* cleanup from testing

* Remove old client

* update docs

* Exclude from spellcheck

* Add licenses

* Fix identity version

* Save work

* Add azure clients

* add licenses

* typos

* Add dependencies

* Exception is not thrown

* Fix intellij check

* Don't need to override

* specify length

* urldecode

* encode path

* Fix checks

* Revert urlencode changes

* Urlencode with azure library

* Update docs/development/extensions-core/azure.md

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>

* PR changes

* Update docs/development/extensions-core/azure.md

Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>

* Deprecate AzureTaskLogsConfig.maxRetries

* Clean up azure retry block

* logic update to reuse clients

* fix comments

* Create container conditionally

* Fix key auth

* Remove container client logic

* Add some more testing

* Update comments

* Add a comment explaining client reuse

* Move logic to factory class

* use bom for dependency management

* fix license versions

---------

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
Co-authored-by: 317brian <53799971+317brian@users.noreply.github.com>
This commit is contained in:
George Shiqi Wu 2024-01-03 18:36:05 -05:00 committed by GitHub
parent b8060fc93f
commit 8e95cea8e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 1301 additions and 1015 deletions

View File

@ -33,8 +33,10 @@ To use this Apache Druid extension, [include](../../configuration/extensions.md#
|--------|---------------|-----------|-------|
|`druid.storage.type`|azure||Must be set.|
|`druid.azure.account`||Azure Storage account name.|Must be set.|
|`druid.azure.key`||Azure Storage account key.|Optional. Either set key or sharedAccessStorageToken but not both.|
|`druid.azure.sharedAccessStorageToken`||Azure Shared Storage access token|Optional. Either set key or sharedAccessStorageToken but not both.|
|`druid.azure.key`||Azure Storage account key.|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.|
|`druid.azure.sharedAccessStorageToken`||Azure Shared Storage access token|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain..|
|`druid.azure.useAzureCredentialsChain`|Use [DefaultAzureCredential](https://learn.microsoft.com/en-us/java/api/overview/azure/identity-readme?view=azure-java-stable) for authentication|Optional. Set one of key, sharedAccessStorageToken or useAzureCredentialsChain.|False|
|`druid.azure.managedIdentityClientId`|If you want to use managed identity authentication in the `DefaultAzureCredential`, `useAzureCredentialsChain` must be true.||Optional.|
|`druid.azure.container`||Azure Storage container name.|Must be set.|
|`druid.azure.prefix`|A prefix string that will be prepended to the blob names for the segments published to Azure deep storage| |""|
|`druid.azure.protocol`|the protocol to use|http or https|https|

View File

@ -33,6 +33,17 @@
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-sdk-bom</artifactId>
<version>1.2.19</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.druid</groupId>
@ -40,29 +51,25 @@
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-storage</artifactId>
<version>8.6.0</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
</exclusions>
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob-batch</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-common</artifactId>
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
@ -129,7 +136,11 @@
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>junit</groupId>

View File

@ -19,14 +19,14 @@
package org.apache.druid.data.input.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
@ -42,7 +42,6 @@ import org.apache.druid.storage.azure.AzureStorage;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -150,7 +149,7 @@ public class AzureInputSource extends CloudObjectInputSource
blob.getBlobLength()
);
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new RuntimeException(e);
}
}
@ -161,14 +160,14 @@ public class AzureInputSource extends CloudObjectInputSource
public long getObjectSize(CloudObjectLocation location)
{
try {
final CloudBlob blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
final BlockBlobClient blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
location.getBucket(),
location.getPath()
);
return blobWithAttributes.getProperties().getLength();
return blobWithAttributes.getProperties().getBlobSize();
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new RuntimeException(e);
}
}

View File

@ -46,6 +46,12 @@ public class AzureAccountConfig
@JsonProperty
private String sharedAccessStorageToken;
@JsonProperty
private String managedIdentityClientId;
@JsonProperty
private Boolean useAzureCredentialsChain = Boolean.FALSE;
@SuppressWarnings("unused") // Used by Jackson deserialization?
public void setProtocol(String protocol)
{
@ -94,9 +100,25 @@ public class AzureAccountConfig
return sharedAccessStorageToken;
}
public Boolean getUseAzureCredentialsChain()
{
return useAzureCredentialsChain;
}
public String getManagedIdentityClientId()
{
return managedIdentityClientId;
}
@SuppressWarnings("unused") // Used by Jackson deserialization?
public void setSharedAccessStorageToken(String sharedAccessStorageToken)
{
this.sharedAccessStorageToken = sharedAccessStorageToken;
}
public void setUseAzureCredentialsChain(Boolean useAzureCredentialsChain)
{
this.useAzureCredentialsChain = useAzureCredentialsChain;
}
}

View File

@ -19,15 +19,14 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.io.ByteSource;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.java.util.common.logger.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
/**
* Used for getting an {@link InputStream} to an azure resource.
@ -62,7 +61,7 @@ public class AzureByteSource extends ByteSource
try {
return azureStorage.getBlockBlobInputStream(offset, containerName, blobPath);
}
catch (StorageException | URISyntaxException e) {
catch (BlobStorageException e) {
if (AzureUtils.AZURE_RETRY.apply(e)) {
throw new IOException("Recoverable exception", e);
}

View File

@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.azure;
import com.azure.core.http.policy.ExponentialBackoffOptions;
import com.azure.core.http.policy.RetryOptions;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.common.StorageSharedKeyCredential;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/**
* Factory class for generating BlobServiceClient objects.
*/
public class AzureClientFactory
{
private final AzureAccountConfig config;
private final Map<Integer, BlobServiceClient> cachedBlobServiceClients;
public AzureClientFactory(AzureAccountConfig config)
{
this.config = config;
this.cachedBlobServiceClients = new HashMap<>();
}
// It's okay to store clients in a map here because all the configs for specifying azure retries are static, and there are only 2 of them.
// The 2 configs are AzureAccountConfig.maxTries and AzureOutputConfig.maxRetrr.
// We will only ever have at most 2 clients in cachedBlobServiceClients.
public BlobServiceClient getBlobServiceClient(Integer retryCount)
{
if (!cachedBlobServiceClients.containsKey(retryCount)) {
BlobServiceClientBuilder clientBuilder = getAuthenticatedBlobServiceClientBuilder()
.retryOptions(new RetryOptions(
new ExponentialBackoffOptions()
.setMaxRetries(retryCount != null ? retryCount : config.getMaxTries())
.setBaseDelay(Duration.ofMillis(1000))
.setMaxDelay(Duration.ofMillis(60000))
));
cachedBlobServiceClients.put(retryCount, clientBuilder.buildClient());
}
return cachedBlobServiceClients.get(retryCount);
}
private BlobServiceClientBuilder getAuthenticatedBlobServiceClientBuilder()
{
BlobServiceClientBuilder clientBuilder = new BlobServiceClientBuilder()
.endpoint("https://" + config.getAccount() + ".blob.core.windows.net");
if (config.getKey() != null) {
clientBuilder.credential(new StorageSharedKeyCredential(config.getAccount(), config.getKey()));
} else if (config.getSharedAccessStorageToken() != null) {
clientBuilder.sasToken(config.getSharedAccessStorageToken());
} else if (config.getUseAzureCredentialsChain()) {
// We might not use the managed identity client id in the credential chain but we can just set it here and it will no-op.
DefaultAzureCredentialBuilder defaultAzureCredentialBuilder = new DefaultAzureCredentialBuilder()
.managedIdentityClientId(config.getManagedIdentityClientId());
clientBuilder.credential(defaultAzureCredentialBuilder.build());
}
return clientBuilder;
}
}

View File

@ -19,16 +19,12 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobItem;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.apache.druid.storage.azure.blob.ListBlobItemHolder;
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;
import java.net.URI;
import java.util.Iterator;
@ -42,36 +38,28 @@ public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
{
private static final Logger log = new Logger(AzureCloudBlobIterator.class);
private final AzureStorage storage;
private final ListBlobItemHolderFactory blobItemDruidFactory;
private final Iterator<URI> prefixesIterator;
private final int maxListingLength;
private ResultSegment<ListBlobItem> result;
private String currentContainer;
private String currentPrefix;
private ResultContinuation continuationToken;
private CloudBlobHolder currentBlobItem;
private Iterator<ListBlobItem> blobItemIterator;
private Iterator<BlobItem> blobItemIterator;
private final AzureAccountConfig config;
@AssistedInject
AzureCloudBlobIterator(
AzureStorage storage,
ListBlobItemHolderFactory blobItemDruidFactory,
AzureAccountConfig config,
@Assisted final Iterable<URI> prefixes,
@Assisted final int maxListingLength
)
{
this.storage = storage;
this.blobItemDruidFactory = blobItemDruidFactory;
this.config = config;
this.prefixesIterator = prefixes.iterator();
this.maxListingLength = maxListingLength;
this.result = null;
this.currentContainer = null;
this.currentPrefix = null;
this.continuationToken = null;
this.currentBlobItem = null;
this.blobItemIterator = null;
@ -108,8 +96,6 @@ public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
log.debug("currentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s",
currentUri, currentContainer, currentPrefix
);
result = null;
continuationToken = null;
}
private void fetchNextBatch()
@ -121,14 +107,13 @@ public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
currentContainer,
currentPrefix
);
result = AzureUtils.retryAzureOperation(() -> storage.listBlobsWithPrefixInContainerSegmented(
// We don't need to iterate by page because the client handles this, it will fetch the next page when necessary.
blobItemIterator = storage.listBlobsWithPrefixInContainerSegmented(
currentContainer,
currentPrefix,
continuationToken,
maxListingLength
), config.getMaxTries());
continuationToken = result.getContinuationToken();
blobItemIterator = result.getResults().iterator();
maxListingLength,
config.getMaxTries()
).stream().iterator();
}
catch (Exception e) {
throw new RE(
@ -146,19 +131,15 @@ public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
*/
private void advanceBlobItem()
{
while (blobItemIterator.hasNext() || continuationToken != null || prefixesIterator.hasNext()) {
while (prefixesIterator.hasNext() || blobItemIterator.hasNext()) {
while (blobItemIterator.hasNext()) {
ListBlobItemHolder blobItem = blobItemDruidFactory.create(blobItemIterator.next());
/* skip directory objects */
if (blobItem.isCloudBlob() && blobItem.getCloudBlob().getBlobLength() > 0) {
currentBlobItem = blobItem.getCloudBlob();
BlobItem blobItem = blobItemIterator.next();
if (!blobItem.isPrefix() && blobItem.getProperties().getContentLength() > 0) {
currentBlobItem = new CloudBlobHolder(blobItem, currentContainer);
return;
}
}
if (continuationToken != null) {
fetchNextBatch();
} else if (prefixesIterator.hasNext()) {
if (prefixesIterator.hasNext()) {
prepareNextRequest();
fetchNextBatch();
}

View File

@ -19,9 +19,9 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.base.Predicates;
import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.logger.Logger;
@ -30,7 +30,6 @@ import org.apache.druid.segment.loading.SegmentLoadingException;
import org.apache.druid.timeline.DataSegment;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.Map;
@ -76,13 +75,8 @@ public class AzureDataSegmentKiller implements DataSegmentKiller
try {
azureStorage.emptyCloudBlobDirectory(containerName, dirPath);
}
catch (StorageException e) {
Object extendedInfo =
e.getExtendedErrorInformation() == null ? null : e.getExtendedErrorInformation().getErrorMessage();
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getId(), extendedInfo);
}
catch (URISyntaxException e) {
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getId(), e.getReason());
catch (BlobStorageException e) {
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getId(), e.getMessage());
}
}

View File

@ -19,11 +19,11 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.SegmentUtils;
@ -35,7 +35,6 @@ import org.joda.time.format.ISODateTimeFormat;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Map;
@ -142,10 +141,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final File outFile = zipOutFile = File.createTempFile("index", ".zip");
final long size = CompressionUtils.zip(indexFilesDir, zipOutFile);
return AzureUtils.retryAzureOperation(
() -> uploadDataSegment(segment, binaryVersion, size, outFile, azurePath),
accountConfig.getMaxTries()
);
return uploadDataSegment(segment, binaryVersion, size, outFile, azurePath);
}
catch (Exception e) {
throw new RuntimeException(e);
@ -181,9 +177,9 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
final File compressedSegmentData,
final String azurePath
)
throws StorageException, IOException, URISyntaxException
throws BlobStorageException, IOException
{
azureStorage.uploadBlockBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath);
azureStorage.uploadBlockBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath, accountConfig.getMaxTries());
final DataSegment outSegment = segment
.withSize(size)

View File

@ -19,21 +19,24 @@
package org.apache.druid.storage.azure;
import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.batch.BlobBatchClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.batch.BlobBatchStorageException;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.options.BlobInputStreamOptions;
import com.azure.storage.blob.options.BlockBlobOutputStreamOptions;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.common.Utility;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import com.google.common.collect.Lists;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;
@ -43,9 +46,8 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
/**
@ -53,58 +55,43 @@ import java.util.List;
*/
public class AzureStorage
{
private static final boolean USE_FLAT_BLOB_LISTING = true;
// Default value from Azure library
private static final int DELTA_BACKOFF_MS = 30_000;
private static final Logger log = new Logger(AzureStorage.class);
/**
* Some segment processing tools such as DataSegmentKiller are initialized when an ingestion job starts
* if the extension is loaded, even when the implementation of DataSegmentKiller is not used. As a result,
* if we have a CloudBlobClient instead of a supplier of it, it can cause unnecessary config validation
* against Azure storage even when it's not used at all. To perform the config validation
* only when it is actually used, we use a supplier.
*
* See OmniDataSegmentKiller for how DataSegmentKillers are initialized.
*/
private final Supplier<CloudBlobClient> cloudBlobClient;
private final AzureClientFactory azureClientFactory;
public AzureStorage(
Supplier<CloudBlobClient> cloudBlobClient
AzureClientFactory azureClientFactory
)
{
this.cloudBlobClient = cloudBlobClient;
this.azureClientFactory = azureClientFactory;
}
public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath)
throws StorageException, URISyntaxException
throws BlobStorageException
{
return emptyCloudBlobDirectory(containerName, virtualDirPath, null);
}
public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath, final Integer maxAttempts)
throws StorageException, URISyntaxException
throws BlobStorageException
{
List<String> deletedFiles = new ArrayList<>();
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
Iterable<ListBlobItem> blobItems = container.listBlobs(
virtualDirPath,
USE_FLAT_BLOB_LISTING,
null,
getRequestOptionsWithRetry(maxAttempts),
null
);
// https://learn.microsoft.com/en-us/azure/storage/blobs/storage-blobs-list The new client uses flat listing by default.
PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs(new ListBlobsOptions().setPrefix(virtualDirPath), Duration.ofMillis(DELTA_BACKOFF_MS));
for (ListBlobItem blobItem : blobItems) {
CloudBlob cloudBlob = (CloudBlob) blobItem;
log.debug("Removing file[%s] from Azure.", cloudBlob.getName());
if (cloudBlob.deleteIfExists(DeleteSnapshotsOption.NONE, null, getRequestOptionsWithRetry(maxAttempts), null)) {
deletedFiles.add(cloudBlob.getName());
}
}
blobItems.iterableByPage().forEach(page -> {
page.getElements().forEach(blob -> {
if (blobContainerClient.getBlobClient(blob.getName()).deleteIfExists()) {
deletedFiles.add(blob.getName());
}
});
});
if (deletedFiles.isEmpty()) {
log.warn("No files were deleted on the following Azure path: [%s]", virtualDirPath);
@ -113,12 +100,15 @@ public class AzureStorage
return deletedFiles;
}
public void uploadBlockBlob(final File file, final String containerName, final String blobPath)
throws IOException, StorageException, URISyntaxException
public void uploadBlockBlob(final File file, final String containerName, final String blobPath, final Integer maxAttempts)
throws IOException, BlobStorageException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
try (FileInputStream stream = new FileInputStream(file)) {
container.getBlockBlobReference(blobPath).upload(stream, file.length());
// By default this creates a Block blob, no need to use a specific Block blob client.
// We also need to urlEncode the path to handle special characters.
blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).upload(stream, file.length());
}
}
@ -127,159 +117,123 @@ public class AzureStorage
final String blobPath,
@Nullable final Integer streamWriteSizeBytes,
Integer maxAttempts
) throws URISyntaxException, StorageException
) throws BlobStorageException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
CloudBlockBlob blockBlobReference = container.getBlockBlobReference(blobPath);
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient();
if (blockBlobReference.exists()) {
if (blockBlobClient.exists()) {
throw new RE("Reference already exists");
}
BlockBlobOutputStreamOptions options = new BlockBlobOutputStreamOptions();
if (streamWriteSizeBytes != null) {
blockBlobReference.setStreamWriteSizeInBytes(streamWriteSizeBytes);
options.setParallelTransferOptions(new ParallelTransferOptions().setBlockSizeLong(streamWriteSizeBytes.longValue()));
}
return blockBlobReference.openOutputStream(null, getRequestOptionsWithRetry(maxAttempts), null);
return blockBlobClient.getBlobOutputStream(options);
}
public CloudBlob getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
// There's no need to download attributes with the new azure clients, they will get fetched as needed.
public BlockBlobClient getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath)
throws BlobStorageException
{
final CloudBlockBlob blobReference = getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath);
blobReference.downloadAttributes();
return blobReference;
return getOrCreateBlobContainerClient(containerName).getBlobClient(Utility.urlEncode(blobPath)).getBlockBlobClient();
}
public long getBlockBlobLength(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
throws BlobStorageException
{
return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength();
return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getBlobSize();
}
public InputStream getBlockBlobInputStream(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
throws BlobStorageException
{
return getBlockBlobInputStream(0L, containerName, blobPath);
}
public InputStream getBlockBlobInputStream(long offset, final String containerName, final String blobPath)
throws URISyntaxException, StorageException
throws BlobStorageException
{
return getBlockBlobInputStream(offset, null, containerName, blobPath);
}
public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath)
throws URISyntaxException, StorageException
throws BlobStorageException
{
return getBlockBlobInputStream(offset, length, containerName, blobPath, null);
}
public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath, Integer maxAttempts)
throws URISyntaxException, StorageException
throws BlobStorageException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
return container.getBlockBlobReference(blobPath)
.openInputStream(offset, length, null, getRequestOptionsWithRetry(maxAttempts), null);
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
return blobContainerClient.getBlobClient(Utility.urlEncode(blobPath)).openInputStream(new BlobInputStreamOptions().setRange(new BlobRange(offset, length)));
}
public void batchDeleteFiles(String containerName, Iterable<String> paths, Integer maxAttempts)
throws URISyntaxException, StorageException
throws BlobBatchStorageException
{
CloudBlobContainer cloudBlobContainer = getOrCreateCloudBlobContainer(containerName);
BlobDeleteBatchOperation blobDeleteBatchOperation = new BlobDeleteBatchOperation();
for (String path : paths) {
CloudBlob blobReference = cloudBlobContainer.getBlockBlobReference(path);
blobDeleteBatchOperation.addSubOperation(blobReference);
}
cloudBlobClient.get().executeBatch(blobDeleteBatchOperation, getRequestOptionsWithRetry(maxAttempts), null);
}
public List<String> listDir(final String containerName, final String virtualDirPath)
throws URISyntaxException, StorageException
{
return listDir(containerName, virtualDirPath, null);
BlobBatchClient blobBatchClient = new BlobBatchClientBuilder(getOrCreateBlobContainerClient(containerName, maxAttempts)).buildClient();
blobBatchClient.deleteBlobs(Lists.newArrayList(paths), DeleteSnapshotsOptionType.ONLY);
}
public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts)
throws StorageException, URISyntaxException
throws BlobStorageException
{
List<String> files = new ArrayList<>();
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
for (ListBlobItem blobItem :
container.listBlobs(virtualDirPath, USE_FLAT_BLOB_LISTING, null, getRequestOptionsWithRetry(maxAttempts), null)) {
CloudBlob cloudBlob = (CloudBlob) blobItem;
files.add(cloudBlob.getName());
}
PagedIterable<BlobItem> blobItems = blobContainerClient.listBlobs(
new ListBlobsOptions().setPrefix(virtualDirPath),
Duration.ofMillis(DELTA_BACKOFF_MS)
);
blobItems.iterableByPage().forEach(page -> page.getElements().forEach(blob -> files.add(blob.getName())));
return files;
}
public boolean getBlockBlobExists(String container, String blobPath) throws URISyntaxException, StorageException
public boolean getBlockBlobExists(String container, String blobPath) throws BlobStorageException
{
return getBlockBlobExists(container, blobPath, null);
}
public boolean getBlockBlobExists(String container, String blobPath, Integer maxAttempts)
throws URISyntaxException, StorageException
throws BlobStorageException
{
return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath)
.exists(null, getRequestOptionsWithRetry(maxAttempts), null);
}
/**
* If maxAttempts is provided, this method returns request options with retry built in.
* Retry backoff is exponential backoff, with maxAttempts set to the one provided
*/
@Nullable
private BlobRequestOptions getRequestOptionsWithRetry(Integer maxAttempts)
{
if (maxAttempts == null) {
return null;
}
BlobRequestOptions requestOptions = new BlobRequestOptions();
requestOptions.setRetryPolicyFactory(new RetryExponentialRetry(DELTA_BACKOFF_MS, maxAttempts));
return requestOptions;
return getOrCreateBlobContainerClient(container, maxAttempts).getBlobClient(Utility.urlEncode(blobPath)).exists();
}
@VisibleForTesting
CloudBlobClient getCloudBlobClient()
BlobServiceClient getBlobServiceClient(Integer maxAttempts)
{
return this.cloudBlobClient.get();
return azureClientFactory.getBlobServiceClient(maxAttempts);
}
@VisibleForTesting
ResultSegment<ListBlobItem> listBlobsWithPrefixInContainerSegmented(
PagedIterable<BlobItem> listBlobsWithPrefixInContainerSegmented(
final String containerName,
final String prefix,
ResultContinuation continuationToken,
int maxResults
) throws StorageException, URISyntaxException
int maxResults,
Integer maxAttempts
) throws BlobStorageException
{
CloudBlobContainer cloudBlobContainer = cloudBlobClient.get().getContainerReference(containerName);
return cloudBlobContainer
.listBlobsSegmented(
prefix,
/* Use flat blob listing here so that we get only blob types and not directories.*/
USE_FLAT_BLOB_LISTING,
EnumSet
.noneOf(BlobListingDetails.class),
maxResults,
continuationToken,
null,
null
);
BlobContainerClient blobContainerClient = getOrCreateBlobContainerClient(containerName, maxAttempts);
return blobContainerClient.listBlobs(
new ListBlobsOptions().setPrefix(prefix).setMaxResultsPerPage(maxResults),
Duration.ofMillis(DELTA_BACKOFF_MS)
);
}
private CloudBlobContainer getOrCreateCloudBlobContainer(final String containerName)
throws StorageException, URISyntaxException
private BlobContainerClient getOrCreateBlobContainerClient(final String containerName)
{
CloudBlobContainer cloudBlobContainer = cloudBlobClient.get().getContainerReference(containerName);
cloudBlobContainer.createIfNotExists();
return getBlobServiceClient(null).createBlobContainerIfNotExists(containerName);
}
return cloudBlobContainer;
private BlobContainerClient getOrCreateBlobContainerClient(final String containerName, final Integer maxRetries)
{
return getBlobServiceClient(maxRetries).createBlobContainerIfNotExists(containerName);
}
}

View File

@ -23,14 +23,12 @@ import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.druid.data.input.azure.AzureEntityFactory;
import org.apache.druid.data.input.azure.AzureInputSource;
import org.apache.druid.guice.Binders;
@ -38,11 +36,7 @@ import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;
import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.List;
/**
@ -114,64 +108,38 @@ public class AzureStorageDruidModule implements DruidModule
.build(AzureCloudBlobIteratorFactory.class));
binder.install(new FactoryModuleBuilder()
.build(AzureCloudBlobIterableFactory.class));
binder.install(new FactoryModuleBuilder()
.build(ListBlobItemHolderFactory.class));
}
/**
* Creates a supplier that lazily initialize {@link CloudBlobClient}.
* This is to avoid immediate config validation but defer it until you actually use the client.
*/
@Provides
@LazySingleton
public Supplier<CloudBlobClient> getCloudBlobClient(final AzureAccountConfig config)
public AzureClientFactory getAzureClientFactory(final AzureAccountConfig config)
{
if ((config.getKey() != null && config.getSharedAccessStorageToken() != null)
||
(config.getKey() == null && config.getSharedAccessStorageToken() == null)) {
throw new ISE("Either set 'key' or 'sharedAccessStorageToken' in the azure config but not both."
+ " Please refer to azure documentation.");
if (StringUtils.isEmpty(config.getKey()) && StringUtils.isEmpty(config.getSharedAccessStorageToken()) && BooleanUtils.isNotTrue(config.getUseAzureCredentialsChain())) {
throw new ISE("Either set 'key' or 'sharedAccessStorageToken' or 'useAzureCredentialsChain' in the azure config."
+ " Please refer to azure documentation.");
}
return Suppliers.memoize(() -> {
try {
final CloudStorageAccount account;
if (config.getKey() != null) {
account = CloudStorageAccount.parse(
StringUtils.format(
STORAGE_CONNECTION_STRING_WITH_KEY,
config.getProtocol(),
config.getAccount(),
config.getKey()
)
);
return account.createCloudBlobClient();
} else if (config.getSharedAccessStorageToken() != null) {
account = CloudStorageAccount.parse(StringUtils.format(
STORAGE_CONNECTION_STRING_WITH_TOKEN,
config.getProtocol(),
config.getAccount(),
config.getSharedAccessStorageToken()
));
return account.createCloudBlobClient();
} else {
throw new ISE(
"None of 'key' or 'sharedAccessStorageToken' is set in the azure config."
+ " Please refer to azure extension documentation.");
}
}
catch (URISyntaxException | InvalidKeyException e) {
throw new RuntimeException(e);
}
});
/* Azure named keys and sas tokens are mutually exclusive with each other and with azure keychain auth,
but any form of auth supported by the DefaultAzureCredentialChain is not mutually exclusive, e.g. you can have
environment credentials or workload credentials or managed credentials using the same chain.
**/
if (!StringUtils.isEmpty(config.getKey()) && !StringUtils.isEmpty(config.getSharedAccessStorageToken()) ||
!StringUtils.isEmpty(config.getKey()) && BooleanUtils.isTrue(config.getUseAzureCredentialsChain()) ||
!StringUtils.isEmpty(config.getSharedAccessStorageToken()) && BooleanUtils.isTrue(config.getUseAzureCredentialsChain())
) {
throw new ISE("Set only one of 'key' or 'sharedAccessStorageToken' or 'useAzureCredentialsChain' in the azure config."
+ " Please refer to azure documentation.");
}
return new AzureClientFactory(config);
}
@Provides
@LazySingleton
public AzureStorage getAzureStorageContainer(
final Supplier<CloudBlobClient> cloudBlobClient
final AzureClientFactory azureClientFactory
)
{
return new AzureStorage(cloudBlobClient);
return new AzureStorage(azureClientFactory);
}
}

View File

@ -19,9 +19,9 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.base.Optional;
import com.google.inject.Inject;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
@ -31,7 +31,6 @@ import org.apache.druid.tasklogs.TaskLogs;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.Date;
/**
@ -93,13 +92,7 @@ public class AzureTaskLogs implements TaskLogs
private void pushTaskFile(final File logFile, String taskKey)
{
try {
AzureUtils.retryAzureOperation(
() -> {
azureStorage.uploadBlockBlob(logFile, config.getContainer(), taskKey);
return null;
},
config.getMaxTries()
);
azureStorage.uploadBlockBlob(logFile, config.getContainer(), taskKey, accountConfig.getMaxTries());
}
catch (Exception e) {
throw new RuntimeException(e);
@ -153,7 +146,7 @@ public class AzureTaskLogs implements TaskLogs
throw new IOException(e);
}
}
catch (StorageException | URISyntaxException e) {
catch (BlobStorageException e) {
throw new IOE(e, "Failed to stream logs from: %s", taskKey);
}
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.storage.azure;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
/**
@ -37,19 +36,14 @@ public class AzureTaskLogsConfig
@NotNull
private String prefix = null;
@JsonProperty
@Min(1)
private int maxTries = 3;
public AzureTaskLogsConfig()
{
}
public AzureTaskLogsConfig(String container, String prefix, int maxTries)
public AzureTaskLogsConfig(String container, String prefix)
{
this.container = container;
this.prefix = prefix;
this.maxTries = maxTries;
}
public String getContainer()
@ -61,9 +55,4 @@ public class AzureTaskLogsConfig
{
return prefix;
}
public int getMaxTries()
{
return maxTries;
}
}

View File

@ -19,20 +19,18 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.RetryUtils.Task;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.TimeoutException;
/**
* Utility class for miscellaneous things involving Azure.
@ -48,22 +46,25 @@ public class AzureUtils
// (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage)
static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs";
// This logic is copied from RequestRetryOptions in the azure client. We still need this logic because some classes like
// RetryingInputEntity need a predicate function to tell whether to retry, seperate from the Azure client retries.
public static final Predicate<Throwable> AZURE_RETRY = e -> {
if (e == null) {
return false;
}
for (Throwable t = e; t != null; t = t.getCause()) {
if (t instanceof URISyntaxException) {
return false;
}
if (t instanceof StorageException) {
return true;
if (t instanceof BlobStorageException) {
int statusCode = ((BlobStorageException) t).getStatusCode();
return statusCode == 429 || statusCode == 500 || statusCode == 503;
}
if (t instanceof IOException) {
return true;
}
if (t instanceof TimeoutException) {
return true;
}
}
return false;
};
@ -119,7 +120,6 @@ public class AzureUtils
String prefix,
Predicate<CloudBlobHolder> filter
)
throws Exception
{
AzureCloudBlobIterable azureCloudBlobIterable =
azureCloudBlobIterableFactory.create(ImmutableList.of(new CloudObjectLocation(
@ -131,26 +131,8 @@ public class AzureUtils
while (iterator.hasNext()) {
final CloudBlobHolder nextObject = iterator.next();
if (filter.apply(nextObject)) {
deleteBucketKeys(storage, accountConfig.getMaxTries(), nextObject.getContainerName(), nextObject.getName());
storage.emptyCloudBlobDirectory(nextObject.getContainerName(), nextObject.getName(), accountConfig.getMaxTries());
}
}
}
private static void deleteBucketKeys(
AzureStorage storage,
int maxTries,
String bucket,
String prefix
) throws Exception
{
AzureUtils.retryAzureOperation(() -> {
storage.emptyCloudBlobDirectory(bucket, prefix);
return null;
}, maxTries);
}
static <T> T retryAzureOperation(Task<T> f, int maxTries) throws Exception
{
return RetryUtils.retry(f, AZURE_RETRY, maxTries);
}
}

View File

@ -19,28 +19,27 @@
package org.apache.druid.storage.azure.blob;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import java.net.URISyntaxException;
import com.azure.storage.blob.models.BlobItem;
import java.util.Date;
/**
* Wrapper for {@link CloudBlob}. Used to make testing easier, since {@link CloudBlob}
* Wrapper for {@link BlobItem}. Used to make testing easier, since {@link BlobItem}
* is a final class and so is difficult to mock in unit tests.
*/
public class CloudBlobHolder
{
private final CloudBlob delegate;
private final BlobItem delegate;
private final String container;
public CloudBlobHolder(CloudBlob delegate)
public CloudBlobHolder(BlobItem delegate, String container)
{
this.delegate = delegate;
this.container = container;
}
public String getContainerName() throws URISyntaxException, StorageException
public String getContainerName()
{
return delegate.getContainer().getName();
return container;
}
public String getName()
@ -50,11 +49,11 @@ public class CloudBlobHolder
public long getBlobLength()
{
return delegate.getProperties().getLength();
return delegate.getProperties().getContentLength();
}
public Date getLastModifed()
{
return delegate.getProperties().getLastModified();
return Date.from(delegate.getProperties().getLastModified().toInstant());
}
}

View File

@ -1,71 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.azure.blob;
import com.google.inject.assistedinject.Assisted;
import com.google.inject.assistedinject.AssistedInject;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Wrapper class for {@link ListBlobItem} interface, which was missing some useful
* functionality for telling whether the blob was a cloudBlob or not. This class was
* added mainly to make testing easier.
*/
public class ListBlobItemHolder
{
private final ListBlobItem delegate;
@AssistedInject
public ListBlobItemHolder(@Assisted ListBlobItem delegate)
{
this.delegate = delegate;
}
public String getContainerName() throws URISyntaxException, StorageException
{
return delegate.getContainer().getName();
}
public URI getUri()
{
return delegate.getUri();
}
public CloudBlobHolder getCloudBlob()
{
return new CloudBlobHolder((CloudBlob) delegate);
}
public boolean isCloudBlob()
{
return delegate instanceof CloudBlob;
}
@Override
public String toString()
{
return delegate.toString();
}
}

View File

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.azure.blob;
import com.microsoft.azure.storage.blob.ListBlobItem;
/**
* Factory for creating {@link ListBlobItemHolder} objects
*/
public interface ListBlobItemHolderFactory
{
ListBlobItemHolder create(ListBlobItem blobItem);
}

View File

@ -19,9 +19,9 @@
package org.apache.druid.storage.azure.output;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction;
import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.AzureUtils;
@ -31,7 +31,6 @@ import org.apache.druid.storage.remote.ChunkingStorageConnectorParameters;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@ -64,7 +63,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
try {
return buildInputParams(path, 0, azureStorage.getBlockBlobLength(config.getContainer(), objectPath(path)));
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new IOException(e);
}
}
@ -100,7 +99,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
config.getMaxRetry()
);
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new IOException(e);
}
}
@ -128,7 +127,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
try {
return azureStorage.getBlockBlobExists(config.getContainer(), objectPath(path), config.getMaxRetry());
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new IOException(e);
}
}
@ -144,7 +143,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
config.getMaxRetry()
);
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new IOException(e);
}
}
@ -159,7 +158,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
config.getMaxRetry()
);
}
catch (URISyntaxException | StorageException e) {
catch (BlobStorageException e) {
throw new IOException(e);
}
}
@ -174,7 +173,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
config.getMaxRetry()
);
}
catch (StorageException | URISyntaxException e) {
catch (BlobStorageException e) {
throw new IOException(e);
}
}
@ -185,7 +184,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
try {
azureStorage.emptyCloudBlobDirectory(config.getContainer(), objectPath(path), config.getMaxRetry());
}
catch (StorageException | URISyntaxException e) {
catch (BlobStorageException e) {
throw new IOException(e);
}
}
@ -198,7 +197,7 @@ public class AzureStorageConnector extends ChunkingStorageConnector<AzureInputRa
try {
paths = azureStorage.listDir(config.getContainer(), prefixBasePath, config.getMaxRetry());
}
catch (StorageException | URISyntaxException e) {
catch (BlobStorageException e) {
throw new IOException(e);
}

View File

@ -154,7 +154,7 @@ public class AzureInputSourceTest extends EasyMockSupport
}
@Test
public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLocations() throws Exception
public void test_createSplits_successfullyCreatesCloudLocation_returnsExpectedLocations()
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
List<List<CloudObjectLocation>> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
@ -194,7 +194,6 @@ public class AzureInputSourceTest extends EasyMockSupport
@Test
public void test_getPrefixesSplitStream_withObjectGlob_successfullyCreatesCloudLocation_returnsExpectedLocations()
throws Exception
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
List<List<CloudObjectLocation>> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
@ -372,6 +371,7 @@ public class AzureInputSourceTest extends EasyMockSupport
EqualsVerifier.forClass(AzureInputSource.class)
.usingGetClass()
.withPrefabValues(Logger.class, new Logger(AzureStorage.class), new Logger(AzureStorage.class))
.withPrefabValues(AzureStorage.class, new AzureStorage(null), new AzureStorage(null))
.withNonnullFields("storage")
.withNonnullFields("entityFactory")
.withNonnullFields("azureCloudBlobIterableFactory")

View File

@ -19,14 +19,14 @@
package org.apache.druid.storage.azure;
import com.microsoft.azure.storage.StorageException;
import com.azure.core.http.HttpResponse;
import com.azure.storage.blob.models.BlobStorageException;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
public class AzureByteSourceTest extends EasyMockSupport
{
@ -34,7 +34,7 @@ public class AzureByteSourceTest extends EasyMockSupport
private static final long OFFSET = 10L;
@Test
public void test_openStream_withoutOffset_succeeds() throws IOException, URISyntaxException, StorageException
public void test_openStream_withoutOffset_succeeds() throws IOException, BlobStorageException
{
final String containerName = "container";
final String blobPath = "/path/to/file";
@ -53,7 +53,7 @@ public class AzureByteSourceTest extends EasyMockSupport
}
@Test
public void test_openStream_withOffset_succeeds() throws IOException, URISyntaxException, StorageException
public void test_openStream_withOffset_succeeds() throws IOException, BlobStorageException
{
final String containerName = "container";
final String blobPath = "/path/to/file";
@ -72,23 +72,23 @@ public class AzureByteSourceTest extends EasyMockSupport
}
@Test(expected = IOException.class)
public void openStreamWithRecoverableErrorTest() throws URISyntaxException, StorageException, IOException
public void openStreamWithRecoverableErrorTest() throws BlobStorageException, IOException
{
final String containerName = "container";
final String blobPath = "/path/to/file";
AzureStorage azureStorage = createMock(AzureStorage.class);
HttpResponse httpResponse = createMock(HttpResponse.class);
EasyMock.expect(httpResponse.getStatusCode()).andReturn(500).anyTimes();
EasyMock.replay(httpResponse);
EasyMock.expect(azureStorage.getBlockBlobInputStream(NO_OFFSET, containerName, blobPath)).andThrow(
new StorageException(
new BlobStorageException(
"",
"",
500,
null,
httpResponse,
null
)
);
replayAll();
EasyMock.replay(azureStorage);
AzureByteSource byteSource = new AzureByteSource(azureStorage, containerName, blobPath);

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.azure;
import com.azure.core.http.policy.AzureSasCredentialPolicy;
import com.azure.core.http.policy.BearerTokenAuthenticationPolicy;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.google.common.collect.ImmutableMap;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import java.net.MalformedURLException;
import java.net.URL;
public class AzureClientFactoryTest
{
private AzureClientFactory azureClientFactory;
private static final String ACCOUNT = "account";
@Test
public void test_blobServiceClient_accountName()
{
AzureAccountConfig config = new AzureAccountConfig();
azureClientFactory = new AzureClientFactory(config);
config.setAccount(ACCOUNT);
BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null);
Assert.assertEquals(ACCOUNT, blobServiceClient.getAccountName());
}
@Test
public void test_blobServiceClientBuilder_key() throws MalformedURLException
{
AzureAccountConfig config = new AzureAccountConfig();
config.setKey("key");
config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null);
StorageSharedKeyCredential storageSharedKeyCredential = StorageSharedKeyCredential.getSharedKeyCredentialFromPipeline(
blobServiceClient.getHttpPipeline()
);
Assert.assertNotNull(storageSharedKeyCredential);
// Azure doesn't let us look at the key in the StorageSharedKeyCredential so make sure the authorization header generated is what we expect.
Assert.assertEquals(
new StorageSharedKeyCredential(ACCOUNT, "key").generateAuthorizationHeader(new URL("http://druid.com"), "POST", ImmutableMap.of()),
storageSharedKeyCredential.generateAuthorizationHeader(new URL("http://druid.com"), "POST", ImmutableMap.of())
);
}
@Test
public void test_blobServiceClientBuilder_sasToken()
{
AzureAccountConfig config = new AzureAccountConfig();
config.setSharedAccessStorageToken("sasToken");
config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null);
AzureSasCredentialPolicy azureSasCredentialPolicy = null;
for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount(); i++) {
if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof AzureSasCredentialPolicy) {
azureSasCredentialPolicy = (AzureSasCredentialPolicy) blobServiceClient.getHttpPipeline().getPolicy(i);
}
}
Assert.assertNotNull(azureSasCredentialPolicy);
}
@Test
public void test_blobServiceClientBuilder_useDefaultCredentialChain()
{
AzureAccountConfig config = new AzureAccountConfig();
config.setUseAzureCredentialsChain(true);
config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null);
BearerTokenAuthenticationPolicy bearerTokenAuthenticationPolicy = null;
for (int i = 0; i < blobServiceClient.getHttpPipeline().getPolicyCount(); i++) {
if (blobServiceClient.getHttpPipeline().getPolicy(i) instanceof BearerTokenAuthenticationPolicy) {
bearerTokenAuthenticationPolicy = (BearerTokenAuthenticationPolicy) blobServiceClient.getHttpPipeline().getPolicy(i);
}
}
Assert.assertNotNull(bearerTokenAuthenticationPolicy);
}
@Test
public void test_blobServiceClientBuilder_useCachedClient()
{
AzureAccountConfig config = new AzureAccountConfig();
config.setUseAzureCredentialsChain(true);
config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null);
BlobServiceClient blobServiceClient2 = azureClientFactory.getBlobServiceClient(null);
Assert.assertEquals(blobServiceClient, blobServiceClient2);
}
@Test
public void test_blobServiceClientBuilder_useNewClientForDifferentRetryCount()
{
AzureAccountConfig config = new AzureAccountConfig();
config.setUseAzureCredentialsChain(true);
config.setAccount(ACCOUNT);
azureClientFactory = new AzureClientFactory(config);
BlobServiceClient blobServiceClient = azureClientFactory.getBlobServiceClient(null);
BlobServiceClient blobServiceClient2 = azureClientFactory.getBlobServiceClient(1);
Assert.assertNotEquals(blobServiceClient, blobServiceClient2);
}
@Test
public void test_blobServiceClientBuilder_useAzureAccountConfig_asDefaultMaxTries()
{
AzureAccountConfig config = EasyMock.createMock(AzureAccountConfig.class);
EasyMock.expect(config.getKey()).andReturn("key").times(2);
EasyMock.expect(config.getAccount()).andReturn(ACCOUNT).times(2);
EasyMock.expect(config.getMaxTries()).andReturn(3);
azureClientFactory = new AzureClientFactory(config);
EasyMock.replay(config);
azureClientFactory.getBlobServiceClient(null);
EasyMock.verify(config);
}
}

View File

@ -19,132 +19,46 @@
package org.apache.druid.storage.azure;
import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.PagedResponse;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.apache.druid.storage.azure.blob.ListBlobItemHolder;
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.junit.After;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import org.junit.runner.RunWith;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
@RunWith(EasyMockRunner.class)
public class AzureCloudBlobIteratorTest extends EasyMockSupport
{
private static final String AZURE = "azure";
private static final String CONTAINER1 = "container1";
private static final String PREFIX_ONLY_CLOUD_BLOBS = "prefixOnlyCloudBlobs";
private static final String PREFIX_WITH_NO_BLOBS = "prefixWithNoBlobs";
private static final String PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES = "prefixWithCloudBlobsAndDirectories";
private static final URI PREFIX_ONLY_CLOUD_BLOBS_URI;
private static final URI PREFIX_WITH_NO_BLOBS_URI;
private static final URI PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES_URI;
private static final List<URI> EMPTY_URI_PREFIXES = ImmutableList.of();
private static final List<URI> PREFIXES;
private static final int MAX_LISTING_LENGTH = 10;
private static final int MAX_TRIES = 2;
private static final StorageException RETRYABLE_EXCEPTION = new StorageException("", "", new IOException());
private static final URISyntaxException NON_RETRYABLE_EXCEPTION = new URISyntaxException("", "");
@Mock
private AzureStorage storage;
private ListBlobItemHolderFactory blobItemDruidFactory;
private AzureAccountConfig config;
private ResultSegment<ListBlobItem> resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1;
private ResultSegment<ListBlobItem> resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2;
private ResultSegment<ListBlobItem> resultSegmentPrefixWithNoBlobs;
private ResultSegment<ListBlobItem> resultSegmentPrefixWithCloudBlobsAndDirectories;
private ResultContinuation resultContinuationPrefixOnlyCloudBlobs = new ResultContinuation();
private ResultContinuation nullResultContinuationToken = null;
private ListBlobItem blobItemPrefixWithOnlyCloudBlobs1;
private ListBlobItemHolder cloudBlobItemPrefixWithOnlyCloudBlobs1;
private CloudBlobHolder cloudBlobDruidPrefixWithOnlyCloudBlobs1;
private ListBlobItem blobItemPrefixWithOnlyCloudBlobs2;
private ListBlobItemHolder cloudBlobItemPrefixWithOnlyCloudBlobs2;
private CloudBlobHolder cloudBlobDruidPrefixWithOnlyCloudBlobs2;
private ListBlobItem blobItemPrefixWithCloudBlobsAndDirectories1;
private ListBlobItemHolder directoryItemPrefixWithCloudBlobsAndDirectories;
private ListBlobItem blobItemPrefixWithCloudBlobsAndDirectories2;
private ListBlobItemHolder cloudBlobItemPrefixWithCloudBlobsAndDirectories;
private CloudBlobHolder cloudBlobDruidPrefixWithCloudBlobsAndDirectories;
private ListBlobItem blobItemPrefixWithCloudBlobsAndDirectories3;
private ListBlobItemHolder directoryItemPrefixWithCloudBlobsAndDirectories3;
private AzureCloudBlobIterator azureCloudBlobIterator;
static {
try {
PREFIX_ONLY_CLOUD_BLOBS_URI = new URI(AZURE + "://" + CONTAINER1 + "/" + PREFIX_ONLY_CLOUD_BLOBS);
PREFIX_WITH_NO_BLOBS_URI = new URI(AZURE + "://" + CONTAINER1 + "/" + PREFIX_WITH_NO_BLOBS);
PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES_URI = new URI(AZURE
+ "://"
+ CONTAINER1
+ "/"
+ PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES);
PREFIXES = ImmutableList.of(
PREFIX_ONLY_CLOUD_BLOBS_URI,
PREFIX_WITH_NO_BLOBS_URI,
PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES_URI
);
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
private final AzureAccountConfig config = new AzureAccountConfig();
private final Integer MAX_TRIES = 3;
private final Integer MAX_LISTING_LENGTH = 10;
private final String CONTAINER = "container";
@Before
public void setup()
{
storage = createMock(AzureStorage.class);
config = createMock(AzureAccountConfig.class);
blobItemDruidFactory = createMock(ListBlobItemHolderFactory.class);
resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1 = createMock(ResultSegment.class);
resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2 = createMock(ResultSegment.class);
resultSegmentPrefixWithNoBlobs = createMock(ResultSegment.class);
resultSegmentPrefixWithCloudBlobsAndDirectories = createMock(ResultSegment.class);
cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class);
blobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItem.class);
cloudBlobItemPrefixWithOnlyCloudBlobs1 = createMock(ListBlobItemHolder.class);
cloudBlobDruidPrefixWithOnlyCloudBlobs1 = createMock(CloudBlobHolder.class);
EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs1.getBlobLength()).andReturn(10L).anyTimes();
blobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItem.class);
cloudBlobItemPrefixWithOnlyCloudBlobs2 = createMock(ListBlobItemHolder.class);
cloudBlobDruidPrefixWithOnlyCloudBlobs2 = createMock(CloudBlobHolder.class);
EasyMock.expect(cloudBlobDruidPrefixWithOnlyCloudBlobs2.getBlobLength()).andReturn(10L).anyTimes();
blobItemPrefixWithCloudBlobsAndDirectories1 = createMock(ListBlobItem.class);
directoryItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class);
blobItemPrefixWithCloudBlobsAndDirectories2 = createMock(ListBlobItem.class);
cloudBlobItemPrefixWithCloudBlobsAndDirectories = createMock(ListBlobItemHolder.class);
cloudBlobDruidPrefixWithCloudBlobsAndDirectories = createMock(CloudBlobHolder.class);
EasyMock.expect(cloudBlobDruidPrefixWithCloudBlobsAndDirectories.getBlobLength()).andReturn(10L).anyTimes();
blobItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItem.class);
directoryItemPrefixWithCloudBlobsAndDirectories3 = createMock(ListBlobItemHolder.class);
config.setMaxTries(MAX_TRIES);
}
@Test
@ -152,10 +66,9 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
{
azureCloudBlobIterator = new AzureCloudBlobIterator(
storage,
blobItemDruidFactory,
config,
EMPTY_URI_PREFIXES,
MAX_LISTING_LENGTH
ImmutableList.of(),
1
);
boolean hasNext = azureCloudBlobIterator.hasNext();
Assert.assertFalse(hasNext);
@ -164,168 +77,83 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
@Test
public void test_next_prefixesWithMultipleBlobsAndSomeDirectories_returnsExpectedBlobs() throws Exception
{
EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true);
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn(
cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes();
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn(
cloudBlobItemPrefixWithOnlyCloudBlobs1);
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.isCloudBlob()).andReturn(true);
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs2.getCloudBlob()).andReturn(
cloudBlobDruidPrefixWithOnlyCloudBlobs2).anyTimes();
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs2)).andReturn(
cloudBlobItemPrefixWithOnlyCloudBlobs2);
EasyMock.expect(directoryItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(false);
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories1)).andReturn(
directoryItemPrefixWithCloudBlobsAndDirectories);
EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.isCloudBlob()).andReturn(true);
EasyMock.expect(cloudBlobItemPrefixWithCloudBlobsAndDirectories.getCloudBlob()).andReturn(
cloudBlobDruidPrefixWithCloudBlobsAndDirectories).anyTimes();
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories2)).andReturn(
cloudBlobItemPrefixWithCloudBlobsAndDirectories);
EasyMock.expect(directoryItemPrefixWithCloudBlobsAndDirectories3.isCloudBlob()).andReturn(false);
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithCloudBlobsAndDirectories3)).andReturn(
directoryItemPrefixWithCloudBlobsAndDirectories3);
ArrayList<ListBlobItem> resultBlobItemsPrefixWithOnlyCloudBlobs1 = new ArrayList<>();
resultBlobItemsPrefixWithOnlyCloudBlobs1.add(blobItemPrefixWithOnlyCloudBlobs1);
ArrayList<ListBlobItem> resultBlobItemsPrefixWithOnlyCloudBlobs2 = new ArrayList<>();
resultBlobItemsPrefixWithOnlyCloudBlobs2.add(blobItemPrefixWithOnlyCloudBlobs2);
ArrayList<ListBlobItem> resultBlobItemsPrefixWithNoBlobs = new ArrayList<>();
ArrayList<ListBlobItem> resultBlobItemsPrefixWithCloudBlobsAndDirectories = new ArrayList<>();
resultBlobItemsPrefixWithCloudBlobsAndDirectories.add(blobItemPrefixWithCloudBlobsAndDirectories1);
resultBlobItemsPrefixWithCloudBlobsAndDirectories.add(blobItemPrefixWithCloudBlobsAndDirectories2);
resultBlobItemsPrefixWithCloudBlobsAndDirectories.add(blobItemPrefixWithCloudBlobsAndDirectories3);
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getContinuationToken())
.andReturn(resultContinuationPrefixOnlyCloudBlobs);
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getResults())
.andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs1);
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2.getContinuationToken()).andReturn(nullResultContinuationToken);
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2.getResults())
.andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs2);
EasyMock.expect(resultSegmentPrefixWithNoBlobs.getContinuationToken()).andReturn(nullResultContinuationToken);
EasyMock.expect(resultSegmentPrefixWithNoBlobs.getResults()).andReturn(resultBlobItemsPrefixWithNoBlobs);
EasyMock.expect(resultSegmentPrefixWithCloudBlobsAndDirectories.getContinuationToken())
.andReturn(nullResultContinuationToken);
EasyMock.expect(resultSegmentPrefixWithCloudBlobsAndDirectories.getResults())
.andReturn(resultBlobItemsPrefixWithCloudBlobsAndDirectories);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
CONTAINER1,
PREFIX_ONLY_CLOUD_BLOBS,
nullResultContinuationToken,
MAX_LISTING_LENGTH
)).andThrow(RETRYABLE_EXCEPTION);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
CONTAINER1,
PREFIX_ONLY_CLOUD_BLOBS,
nullResultContinuationToken,
MAX_LISTING_LENGTH
)).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
CONTAINER1,
PREFIX_ONLY_CLOUD_BLOBS,
resultContinuationPrefixOnlyCloudBlobs,
MAX_LISTING_LENGTH
)).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs2);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
CONTAINER1,
PREFIX_WITH_NO_BLOBS,
nullResultContinuationToken,
MAX_LISTING_LENGTH
)).andReturn(resultSegmentPrefixWithNoBlobs);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
CONTAINER1,
PREFIX_WITH_CLOUD_BLOBS_AND_DIRECTORIES,
nullResultContinuationToken,
MAX_LISTING_LENGTH
)).andReturn(resultSegmentPrefixWithCloudBlobsAndDirectories);
replayAll();
azureCloudBlobIterator = new AzureCloudBlobIterator(
storage,
blobItemDruidFactory,
config,
PREFIXES,
MAX_LISTING_LENGTH
List<URI> prefixes = ImmutableList.of(
new URI(StringUtils.format("azure://%s/dir1", CONTAINER)),
new URI(StringUtils.format("azure://%s/dir2", CONTAINER))
);
List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
cloudBlobDruidPrefixWithOnlyCloudBlobs1,
cloudBlobDruidPrefixWithOnlyCloudBlobs2,
cloudBlobDruidPrefixWithCloudBlobsAndDirectories
BlobItem blobItem = new BlobItem().setName("blobName").setProperties(new BlobItemProperties().setContentLength(10L));
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem)));
PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
.andReturn(pagedIterable);
BlobItem blobPrefixItem = new BlobItem().setIsPrefix(true).setName("subdir").setProperties(new BlobItemProperties());
BlobItem blobItem2 = new BlobItem().setName("blobName2").setProperties(new BlobItemProperties().setContentLength(10L));
SettableSupplier<PagedResponse<BlobItem>> supplier2 = new SettableSupplier<>();
supplier2.set(new TestPagedResponse<>(ImmutableList.of(blobPrefixItem, blobItem2)));
PagedIterable<BlobItem> pagedIterable2 = new PagedIterable<>(supplier2);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER, "dir2", MAX_LISTING_LENGTH, MAX_TRIES))
.andReturn(pagedIterable2);
replayAll();
azureCloudBlobIterator = new AzureCloudBlobIterator(
storage,
config,
prefixes,
MAX_LISTING_LENGTH
);
List<CloudBlobHolder> actualBlobItems = new ArrayList<>();
while (azureCloudBlobIterator.hasNext()) {
actualBlobItems.add(azureCloudBlobIterator.next());
}
Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
Assert.assertTrue(expectedBlobItems.containsAll(actualBlobItems));
verifyAll();
List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
new CloudBlobHolder(blobItem, CONTAINER),
new CloudBlobHolder(blobItem2, CONTAINER)
);
Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
Assert.assertEquals(
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()));
}
@Test
public void test_next_emptyObjects_skipEmptyObjects() throws URISyntaxException, StorageException
public void test_next_emptyObjects_skipEmptyObjects() throws Exception
{
EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.isCloudBlob()).andReturn(true);
EasyMock.expect(cloudBlobItemPrefixWithOnlyCloudBlobs1.getCloudBlob()).andReturn(
cloudBlobDruidPrefixWithOnlyCloudBlobs1).anyTimes();
EasyMock.expect(blobItemDruidFactory.create(blobItemPrefixWithOnlyCloudBlobs1)).andReturn(
cloudBlobItemPrefixWithOnlyCloudBlobs1);
ListBlobItem emptyBlobItem = createMock(ListBlobItem.class);
ListBlobItemHolder emptyBlobItemHolder = createMock(ListBlobItemHolder.class);
CloudBlobHolder emptyBlobHolder = createMock(CloudBlobHolder.class);
EasyMock.expect(emptyBlobHolder.getBlobLength()).andReturn(0L).anyTimes();
EasyMock.expect(emptyBlobItemHolder.isCloudBlob()).andReturn(true);
EasyMock.expect(emptyBlobItemHolder.getCloudBlob()).andReturn(emptyBlobHolder).anyTimes();
EasyMock.expect(blobItemDruidFactory.create(emptyBlobItem)).andReturn(emptyBlobItemHolder);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
CONTAINER1,
PREFIX_ONLY_CLOUD_BLOBS,
nullResultContinuationToken,
MAX_LISTING_LENGTH
)).andReturn(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1);
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getContinuationToken())
.andReturn(nullResultContinuationToken);
ArrayList<ListBlobItem> resultBlobItemsPrefixWithOnlyCloudBlobs1 = new ArrayList<>();
resultBlobItemsPrefixWithOnlyCloudBlobs1.add(blobItemPrefixWithOnlyCloudBlobs1);
resultBlobItemsPrefixWithOnlyCloudBlobs1.add(emptyBlobItem);
EasyMock.expect(resultSegmentPrefixOnlyAndFailLessThanMaxTriesCloudBlobs1.getResults())
.andReturn(resultBlobItemsPrefixWithOnlyCloudBlobs1);
replayAll();
azureCloudBlobIterator = new AzureCloudBlobIterator(
storage,
blobItemDruidFactory,
config,
ImmutableList.of(PREFIX_ONLY_CLOUD_BLOBS_URI),
MAX_LISTING_LENGTH
List<URI> prefixes = ImmutableList.of(
new URI(StringUtils.format("azure://%s/dir1", CONTAINER))
);
List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(cloudBlobDruidPrefixWithOnlyCloudBlobs1);
List<CloudBlobHolder> actualBlobItems = Lists.newArrayList(azureCloudBlobIterator);
Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
Assert.assertTrue(expectedBlobItems.containsAll(actualBlobItems));
BlobItem blobItem = new BlobItem().setName("blobName").setProperties(new BlobItemProperties().setContentLength(10L));
BlobItem blobItem2 = new BlobItem().setName("blobName2").setProperties(new BlobItemProperties().setContentLength(0L));
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem, blobItem2)));
PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(CONTAINER, "dir1", MAX_LISTING_LENGTH, MAX_TRIES))
.andReturn(pagedIterable);
replayAll();
azureCloudBlobIterator = new AzureCloudBlobIterator(
storage,
config,
prefixes,
MAX_LISTING_LENGTH
);
List<CloudBlobHolder> actualBlobItems = new ArrayList<>();
while (azureCloudBlobIterator.hasNext()) {
actualBlobItems.add(azureCloudBlobIterator.next());
}
verifyAll();
List<CloudBlobHolder> expectedBlobItems = ImmutableList.of(
new CloudBlobHolder(blobItem, CONTAINER)
);
Assert.assertEquals(expectedBlobItems.size(), actualBlobItems.size());
Assert.assertEquals(
expectedBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()),
actualBlobItems.stream().map(CloudBlobHolder::getName).collect(Collectors.toSet()));
}
@Test(expected = NoSuchElementException.class)
@ -333,9 +161,8 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
{
azureCloudBlobIterator = new AzureCloudBlobIterator(
storage,
blobItemDruidFactory,
config,
EMPTY_URI_PREFIXES,
ImmutableList.of(),
MAX_LISTING_LENGTH
);
azureCloudBlobIterator.next();
@ -344,50 +171,46 @@ public class AzureCloudBlobIteratorTest extends EasyMockSupport
@Test(expected = RE.class)
public void test_fetchNextBatch_moreThanMaxTriesRetryableExceptionsThrownInStorage_throwsREException() throws Exception
{
EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
List<URI> prefixes = ImmutableList.of(
new URI(StringUtils.format("azure://%s/dir1", CONTAINER))
);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.anyObject(),
EasyMock.anyInt(),
EasyMock.anyInt()
)).andThrow(RETRYABLE_EXCEPTION);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.anyObject(),
EasyMock.anyInt()
)).andThrow(RETRYABLE_EXCEPTION);
)).andThrow(new BlobStorageException("", null, null)).times(3);
replayAll();
azureCloudBlobIterator = new AzureCloudBlobIterator(
storage,
blobItemDruidFactory,
config,
PREFIXES,
prefixes,
MAX_LISTING_LENGTH
);
verifyAll();
}
@Test(expected = RE.class)
public void test_fetchNextBatch_nonRetryableExceptionThrownInStorage_throwsREException() throws Exception
{
EasyMock.expect(config.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
List<URI> prefixes = ImmutableList.of(
new URI(StringUtils.format("azure://%s/dir1", CONTAINER))
);
EasyMock.expect(storage.listBlobsWithPrefixInContainerSegmented(
EasyMock.anyString(),
EasyMock.anyString(),
EasyMock.anyObject(),
EasyMock.anyInt(),
EasyMock.anyInt()
)).andThrow(NON_RETRYABLE_EXCEPTION);
)).andThrow(new RuntimeException(""));
replayAll();
azureCloudBlobIterator = new AzureCloudBlobIterator(
storage,
blobItemDruidFactory,
config,
PREFIXES,
prefixes,
MAX_LISTING_LENGTH
);
}
@After
public void cleanup()
{
resetAll();
verifyAll();
}
}

View File

@ -19,10 +19,9 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.StorageExtendedErrorInformation;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
@ -38,7 +37,6 @@ import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
@ -48,18 +46,17 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
private static final String CONTAINER_NAME = "container";
private static final String CONTAINER = "test";
private static final String PREFIX = "test/log";
private static final int MAX_TRIES = 3;
private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final int MAX_KEYS = 1;
private static final int MAX_TRIES = 3;
private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L;
private static final long TIME_NOW = 2L;
private static final long TIME_FUTURE = 3L;
private static final String KEY_1 = "key1";
private static final String KEY_2 = "key2";
private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX));
private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null);
private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", "");
// BlobStorageException is not recoverable since the client attempts retries on it internally
private static final Exception NON_RECOVERABLE_EXCEPTION = new BlobStorageException("", null, null);
private static final DataSegment DATA_SEGMENT = new DataSegment(
"test",
@ -73,9 +70,6 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
1
);
private static final StorageExtendedErrorInformation NULL_STORAGE_EXTENDED_ERROR_INFORMATION = null;
private static final StorageExtendedErrorInformation STORAGE_EXTENDED_ERROR_INFORMATION = new StorageExtendedErrorInformation();
private AzureDataSegmentConfig segmentConfig;
private AzureInputDataConfig inputDataConfig;
private AzureAccountConfig accountConfig;
@ -93,7 +87,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
}
@Test
public void killTest() throws SegmentLoadingException, URISyntaxException, StorageException
public void killTest() throws SegmentLoadingException, BlobStorageException
{
List<String> deletedFiles = new ArrayList<>();
@ -112,30 +106,29 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
@Test(expected = SegmentLoadingException.class)
public void test_kill_StorageExceptionExtendedErrorInformationNull_throwsException()
throws SegmentLoadingException, URISyntaxException, StorageException
throws SegmentLoadingException, BlobStorageException
{
common_test_kill_StorageExceptionExtendedError_throwsException(NULL_STORAGE_EXTENDED_ERROR_INFORMATION);
common_test_kill_StorageExceptionExtendedError_throwsException();
}
@Test(expected = SegmentLoadingException.class)
public void test_kill_StorageExceptionExtendedErrorInformationNotNull_throwsException()
throws SegmentLoadingException, URISyntaxException, StorageException
throws SegmentLoadingException, BlobStorageException
{
common_test_kill_StorageExceptionExtendedError_throwsException(STORAGE_EXTENDED_ERROR_INFORMATION);
common_test_kill_StorageExceptionExtendedError_throwsException();
}
@Test(expected = SegmentLoadingException.class)
public void test_kill_URISyntaxException_throwsException()
throws SegmentLoadingException, URISyntaxException, StorageException
@Test(expected = RuntimeException.class)
public void test_kill_runtimeException_throwsException()
throws SegmentLoadingException, BlobStorageException
{
String dirPath = Paths.get(BLOB_PATH).getParent().toString();
EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andThrow(
new URISyntaxException(
"",
new RuntimeException(
""
)
);
@ -182,7 +175,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce();
EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1);
@ -197,7 +190,9 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1, object2),
ImmutableMap.of());
ImmutableMap.of(),
MAX_TRIES
);
EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.killAll();
@ -205,34 +200,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
}
@Test
public void test_killAll_recoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception
{
EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce();
EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1));
EasyMock.replay(object1);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION));
EasyMock.replay(segmentConfig, inputDataConfig, accountConfig, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
AzureDataSegmentKiller killer = new AzureDataSegmentKiller(segmentConfig, inputDataConfig, accountConfig, azureStorage, azureCloudBlobIterableFactory);
killer.killAll();
EasyMock.verify(segmentConfig, inputDataConfig, accountConfig, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments() throws Exception
public void test_killAll_nonrecoverableExceptionWhenListingObjects_deletesAllSegments()
{
boolean ioExceptionThrown = false;
CloudBlobHolder object1 = null;
@ -241,7 +209,7 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
EasyMock.expect(segmentConfig.getContainer()).andReturn(CONTAINER).atLeastOnce();
EasyMock.expect(segmentConfig.getPrefix()).andReturn(PREFIX).atLeastOnce();
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
@ -256,7 +224,8 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION),
MAX_TRIES
);
EasyMock.replay(
segmentConfig,
@ -292,17 +261,15 @@ public class AzureDataSegmentKillerTest extends EasyMockSupport
);
}
private void common_test_kill_StorageExceptionExtendedError_throwsException(StorageExtendedErrorInformation storageExtendedErrorInformation)
throws SegmentLoadingException, URISyntaxException, StorageException
private void common_test_kill_StorageExceptionExtendedError_throwsException()
throws SegmentLoadingException, BlobStorageException
{
String dirPath = Paths.get(BLOB_PATH).getParent().toString();
EasyMock.expect(azureStorage.emptyCloudBlobDirectory(CONTAINER_NAME, dirPath)).andThrow(
new StorageException(
new BlobStorageException(
"",
"",
400,
storageExtendedErrorInformation,
null,
null
)
);

View File

@ -19,7 +19,8 @@
package org.apache.druid.storage.azure;
import com.microsoft.azure.storage.StorageException;
import com.azure.core.http.HttpResponse;
import com.azure.storage.blob.models.BlobStorageException;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.loading.SegmentLoadingException;
import org.easymock.EasyMock;
@ -32,7 +33,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
public class AzureDataSegmentPullerTest extends EasyMockSupport
{
@ -53,7 +53,7 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
@Test
public void test_getSegmentFiles_success()
throws SegmentLoadingException, URISyntaxException, StorageException, IOException
throws SegmentLoadingException, BlobStorageException, IOException
{
final String value = "bucket";
final File pulledFile = AzureTestUtils.createZipTempFile(SEGMENT_FILE_NAME, value);
@ -85,7 +85,7 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
@Test
public void test_getSegmentFiles_blobPathIsHadoop_success()
throws SegmentLoadingException, URISyntaxException, StorageException, IOException
throws SegmentLoadingException, BlobStorageException, IOException
{
final String value = "bucket";
final File pulledFile = AzureTestUtils.createZipTempFile(SEGMENT_FILE_NAME, value);
@ -117,17 +117,15 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
@Test(expected = RuntimeException.class)
public void test_getSegmentFiles_nonRecoverableErrorRaisedWhenPullingSegmentFiles_doNotDeleteOutputDirectory()
throws IOException, URISyntaxException, StorageException, SegmentLoadingException
throws IOException, BlobStorageException, SegmentLoadingException
{
final File outDir = FileUtils.createTempDir();
try {
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow(
new URISyntaxException(
"error",
"error",
404
new RuntimeException(
"error"
)
);
@ -149,17 +147,21 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
@Test(expected = SegmentLoadingException.class)
public void test_getSegmentFiles_recoverableErrorRaisedWhenPullingSegmentFiles_deleteOutputDirectory()
throws IOException, URISyntaxException, StorageException, SegmentLoadingException
throws IOException, BlobStorageException, SegmentLoadingException
{
final File outDir = FileUtils.createTempDir();
try {
HttpResponse httpResponse = createMock(HttpResponse.class);
EasyMock.expect(httpResponse.getStatusCode()).andReturn(500).anyTimes();
EasyMock.replay(httpResponse);
EasyMock.expect(byteSourceFactory.create(CONTAINER_NAME, BLOB_PATH)).andReturn(new AzureByteSource(azureStorage, CONTAINER_NAME, BLOB_PATH));
EasyMock.expect(azureStorage.getBlockBlobInputStream(0L, CONTAINER_NAME, BLOB_PATH)).andThrow(
new StorageException(null, null, 0, null, null)
new BlobStorageException("", httpResponse, null)
).atLeastOnce();
replayAll();
EasyMock.replay(azureStorage);
EasyMock.replay(byteSourceFactory);
AzureDataSegmentPuller puller = new AzureDataSegmentPuller(byteSourceFactory);

View File

@ -19,9 +19,9 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.StringUtils;
@ -37,7 +37,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -69,6 +68,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
private static final String UNIQUE_MATCHER_PREFIX = PREFIX + "/" + UNIQUE_MATCHER_NO_PREFIX;
private static final String NON_UNIQUE_NO_PREFIX_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip";
private static final String NON_UNIQUE_WITH_PREFIX_MATCHER = PREFIX + "/" + "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip";
private static final int MAX_TRIES = 3;
private static final DataSegment SEGMENT_TO_PUSH = new DataSegment(
"foo",
@ -92,6 +92,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
{
azureStorage = createMock(AzureStorage.class);
azureAccountConfig = new AzureAccountConfig();
azureAccountConfig.setMaxTries(MAX_TRIES);
azureAccountConfig.setAccount(ACCOUNT);
segmentConfigWithPrefix = new AzureDataSegmentConfig();
@ -115,7 +116,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
Files.write(DATA, tmp);
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath), EasyMock.eq(MAX_TRIES));
EasyMock.expectLastCall();
replayAll();
@ -148,7 +149,8 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
azureStorage.uploadBlockBlob(
EasyMock.anyObject(File.class),
EasyMock.eq(CONTAINER_NAME),
EasyMock.eq(PREFIX + "/" + azurePath)
EasyMock.eq(PREFIX + "/" + azurePath),
EasyMock.eq(MAX_TRIES)
);
EasyMock.expectLastCall();
@ -181,7 +183,8 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
azureStorage.uploadBlockBlob(
EasyMock.anyObject(File.class),
EasyMock.eq(CONTAINER_NAME),
EasyMock.matches(UNIQUE_MATCHER_NO_PREFIX)
EasyMock.matches(UNIQUE_MATCHER_NO_PREFIX),
EasyMock.eq(MAX_TRIES)
);
EasyMock.expectLastCall();
@ -214,7 +217,8 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
azureStorage.uploadBlockBlob(
EasyMock.anyObject(File.class),
EasyMock.eq(CONTAINER_NAME),
EasyMock.matches(UNIQUE_MATCHER_PREFIX)
EasyMock.matches(UNIQUE_MATCHER_PREFIX),
EasyMock.eq(MAX_TRIES)
);
EasyMock.expectLastCall();
@ -245,8 +249,8 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
final long size = DATA.length;
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
EasyMock.expectLastCall().andThrow(new URISyntaxException("", ""));
azureStorage.uploadBlockBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.anyString(), EasyMock.eq(MAX_TRIES));
EasyMock.expectLastCall().andThrow(new BlobStorageException("", null, null));
replayAll();
@ -277,14 +281,14 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
}
@Test
public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException
public void uploadDataSegmentTest() throws BlobStorageException, IOException
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix);
final int binaryVersion = 9;
final File compressedSegmentData = new File("index.zip");
final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false);
azureStorage.uploadBlockBlob(compressedSegmentData, CONTAINER_NAME, azurePath);
azureStorage.uploadBlockBlob(compressedSegmentData, CONTAINER_NAME, azurePath, MAX_TRIES);
EasyMock.expectLastCall();
replayAll();

View File

@ -19,6 +19,7 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.BlobServiceClient;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
@ -28,9 +29,6 @@ import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.ProvisionException;
import com.google.inject.TypeLiteral;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.druid.data.input.azure.AzureEntityFactory;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.guice.DruidGuiceExtensions;
@ -38,8 +36,6 @@ import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.segment.loading.OmniDataSegmentKiller;
import org.apache.druid.storage.azure.blob.ListBlobItemHolder;
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.Assert;
@ -63,6 +59,7 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
private static final String AZURE_ACCOUNT_NAME;
private static final String AZURE_ACCOUNT_KEY;
private static final String AZURE_SHARED_ACCESS_TOKEN;
private static final String AZURE_MANAGED_CREDENTIAL_CLIENT_ID;
private static final String AZURE_CONTAINER;
private static final String AZURE_PREFIX;
private static final int AZURE_MAX_LISTING_LENGTH;
@ -72,8 +69,6 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
private CloudObjectLocation cloudObjectLocation1;
private CloudObjectLocation cloudObjectLocation2;
private ListBlobItem blobItem1;
private ListBlobItem blobItem2;
private Injector injector;
static {
@ -82,6 +77,7 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
AZURE_ACCOUNT_KEY = Base64.getUrlEncoder()
.encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8));
AZURE_SHARED_ACCESS_TOKEN = "dummyToken";
AZURE_MANAGED_CREDENTIAL_CLIENT_ID = "clientId";
AZURE_CONTAINER = "azureContainer1";
AZURE_PREFIX = "azurePrefix1";
AZURE_MAX_LISTING_LENGTH = 10;
@ -97,8 +93,6 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
{
cloudObjectLocation1 = createMock(CloudObjectLocation.class);
cloudObjectLocation2 = createMock(CloudObjectLocation.class);
blobItem1 = createMock(ListBlobItem.class);
blobItem2 = createMock(ListBlobItem.class);
}
@Test
@ -144,55 +138,6 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
Assert.assertEquals(AZURE_MAX_LISTING_LENGTH, inputDataConfig.getMaxListingLength());
}
@Test
public void testGetBlobClientExpectedClient()
{
injector = makeInjectorWithProperties(PROPERTIES);
Supplier<CloudBlobClient> cloudBlobClient = injector.getInstance(
Key.get(new TypeLiteral<Supplier<CloudBlobClient>>(){})
);
StorageCredentials storageCredentials = cloudBlobClient.get().getCredentials();
Assert.assertEquals(AZURE_ACCOUNT_NAME, storageCredentials.getAccountName());
}
@Test
public void testGetAzureStorageContainerExpectedClient()
{
injector = makeInjectorWithProperties(PROPERTIES);
Supplier<CloudBlobClient> cloudBlobClient = injector.getInstance(
Key.get(new TypeLiteral<Supplier<CloudBlobClient>>(){})
);
StorageCredentials storageCredentials = cloudBlobClient.get().getCredentials();
Assert.assertEquals(AZURE_ACCOUNT_NAME, storageCredentials.getAccountName());
AzureStorage azureStorage = injector.getInstance(AzureStorage.class);
Assert.assertSame(cloudBlobClient.get(), azureStorage.getCloudBlobClient());
}
@Test
public void testGetAzureStorageContainerWithSASExpectedClient()
{
Properties properties = initializePropertes();
properties.setProperty("druid.azure.sharedAccessStorageToken", AZURE_SHARED_ACCESS_TOKEN);
properties.remove("druid.azure.key");
injector = makeInjectorWithProperties(properties);
Supplier<CloudBlobClient> cloudBlobClient = injector.getInstance(
Key.get(new TypeLiteral<Supplier<CloudBlobClient>>(){})
);
AzureAccountConfig azureAccountConfig = injector.getInstance(AzureAccountConfig.class);
Assert.assertEquals(AZURE_SHARED_ACCESS_TOKEN, azureAccountConfig.getSharedAccessStorageToken());
AzureStorage azureStorage = injector.getInstance(AzureStorage.class);
Assert.assertSame(cloudBlobClient.get(), azureStorage.getCloudBlobClient());
}
@Test
public void testGetAzureByteSourceFactoryCanCreateAzureByteSource()
{
@ -247,18 +192,6 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
Assert.assertNotSame(object1, object2);
}
@Test
public void testGetListBlobItemDruidFactoryCanCreateListBlobItemDruid()
{
injector = makeInjectorWithProperties(PROPERTIES);
ListBlobItemHolderFactory factory = injector.getInstance(ListBlobItemHolderFactory.class);
ListBlobItemHolder object1 = factory.create(blobItem1);
ListBlobItemHolder object2 = factory.create(blobItem2);
Assert.assertNotNull(object1);
Assert.assertNotNull(object2);
Assert.assertNotSame(object1, object2);
}
@Test
public void testSegmentKillerBoundSingleton()
{
@ -276,28 +209,51 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
}
@Test
public void testBothAccountKeyAndSAStokenSet()
public void testMultipleCredentialsSet()
{
String message = "Set only one of 'key' or 'sharedAccessStorageToken' or 'useAzureCredentialsChain' in the azure config.";
Properties properties = initializePropertes();
properties.setProperty("druid.azure.sharedAccessStorageToken", AZURE_SHARED_ACCESS_TOKEN);
expectedException.expect(ProvisionException.class);
expectedException.expectMessage("Either set 'key' or 'sharedAccessStorageToken' in the azure config but not both");
expectedException.expectMessage(message);
makeInjectorWithProperties(properties).getInstance(
Key.get(new TypeLiteral<Supplier<CloudBlobClient>>()
Key.get(new TypeLiteral<AzureClientFactory>()
{
})
);
properties = initializePropertes();
properties.setProperty("druid.azure.managedIdentityClientId", AZURE_MANAGED_CREDENTIAL_CLIENT_ID);
expectedException.expect(ProvisionException.class);
expectedException.expectMessage(message);
makeInjectorWithProperties(properties).getInstance(
Key.get(new TypeLiteral<Supplier<BlobServiceClient>>()
{
})
);
properties = initializePropertes();
properties.remove("druid.azure.key");
properties.setProperty("druid.azure.managedIdentityClientId", AZURE_MANAGED_CREDENTIAL_CLIENT_ID);
properties.setProperty("druid.azure.sharedAccessStorageToken", AZURE_SHARED_ACCESS_TOKEN);
expectedException.expect(ProvisionException.class);
expectedException.expectMessage(message);
makeInjectorWithProperties(properties).getInstance(
Key.get(new TypeLiteral<AzureClientFactory>()
{
})
);
}
@Test
public void testBothAccountKeyAndSAStokenUnset()
public void testAllCredentialsUnset()
{
Properties properties = initializePropertes();
properties.remove("druid.azure.key");
expectedException.expect(ProvisionException.class);
expectedException.expectMessage("Either set 'key' or 'sharedAccessStorageToken' in the azure config but not both");
expectedException.expectMessage("Either set 'key' or 'sharedAccessStorageToken' or 'useAzureCredentialsChain' in the azure config.");
makeInjectorWithProperties(properties).getInstance(
Key.get(new TypeLiteral<Supplier<CloudBlobClient>>()
Key.get(new TypeLiteral<AzureClientFactory>()
{
})
);

View File

@ -19,53 +19,73 @@
package org.apache.druid.storage.azure;
import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.PagedResponse;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobItemProperties;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.ImmutableList;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.druid.common.guava.SettableSupplier;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
// Using Mockito for the whole test class since azure classes (e.g. BlobContainerClient) are final and can't be mocked with EasyMock
public class AzureStorageTest
{
AzureStorage azureStorage;
CloudBlobClient cloudBlobClient = Mockito.mock(CloudBlobClient.class);
CloudBlobContainer cloudBlobContainer = Mockito.mock(CloudBlobContainer.class);
BlobServiceClient blobServiceClient = Mockito.mock(BlobServiceClient.class);
BlobContainerClient blobContainerClient = Mockito.mock(BlobContainerClient.class);
AzureClientFactory azureClientFactory = Mockito.mock(AzureClientFactory.class);
private final String CONTAINER = "container";
private final String BLOB_NAME = "blobName";
private final Integer MAX_ATTEMPTS = 3;
@Before
public void setup() throws URISyntaxException, StorageException
public void setup() throws BlobStorageException
{
Mockito.doReturn(cloudBlobContainer).when(cloudBlobClient).getContainerReference(ArgumentMatchers.anyString());
azureStorage = new AzureStorage(() -> cloudBlobClient);
azureStorage = new AzureStorage(azureClientFactory);
}
@Test
public void testListDir() throws URISyntaxException, StorageException
public void testListDir_retriable() throws BlobStorageException
{
List<ListBlobItem> listBlobItems = ImmutableList.of(
new CloudBlockBlob(new URI("azure://dummy.com/container/blobName"))
);
Mockito.doReturn(listBlobItems).when(cloudBlobContainer).listBlobs(
ArgumentMatchers.anyString(),
ArgumentMatchers.anyBoolean(),
ArgumentMatchers.any(),
BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L));
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem)));
PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
Mockito.doReturn(pagedIterable).when(blobContainerClient).listBlobs(
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Assert.assertEquals(ImmutableList.of("blobName"), azureStorage.listDir("test", ""));
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(MAX_ATTEMPTS);
Assert.assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", MAX_ATTEMPTS));
}
@Test
public void testListDir_nullMaxAttempts() throws BlobStorageException
{
BlobItem blobItem = new BlobItem().setName(BLOB_NAME).setProperties(new BlobItemProperties().setContentLength(10L));
SettableSupplier<PagedResponse<BlobItem>> supplier = new SettableSupplier<>();
supplier.set(new TestPagedResponse<>(ImmutableList.of(blobItem)));
PagedIterable<BlobItem> pagedIterable = new PagedIterable<>(supplier);
Mockito.doReturn(pagedIterable).when(blobContainerClient).listBlobs(
ArgumentMatchers.any(),
ArgumentMatchers.any()
);
Mockito.doReturn(blobContainerClient).when(blobServiceClient).createBlobContainerIfNotExists(CONTAINER);
Mockito.doReturn(blobServiceClient).when(azureClientFactory).getBlobServiceClient(null);
Assert.assertEquals(ImmutableList.of(BLOB_NAME), azureStorage.listDir(CONTAINER, "", null));
}
}

View File

@ -19,10 +19,10 @@
package org.apache.druid.storage.azure;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.microsoft.azure.storage.StorageException;
import org.apache.commons.io.IOUtils;
import org.apache.druid.common.utils.CurrentTimeMillisSupplier;
import org.apache.druid.java.util.common.FileUtils;
@ -41,7 +41,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
public class AzureTaskLogsTest extends EasyMockSupport
@ -51,8 +50,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
private static final String PREFIX = "test/log";
private static final String TASK_ID = "taskid";
private static final String TASK_ID_NOT_FOUND = "taskidNotFound";
private static final int MAX_TRIES = 3;
private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX, MAX_TRIES);
private static final AzureTaskLogsConfig AZURE_TASK_LOGS_CONFIG = new AzureTaskLogsConfig(CONTAINER, PREFIX);
private static final int MAX_KEYS = 1;
private static final long TIME_0 = 0L;
private static final long TIME_1 = 1L;
@ -61,8 +59,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
private static final String KEY_1 = "key1";
private static final String KEY_2 = "key2";
private static final URI PREFIX_URI = URI.create(StringUtils.format("azure://%s/%s", CONTAINER, PREFIX));
private static final Exception RECOVERABLE_EXCEPTION = new StorageException("", "", null);
private static final Exception NON_RECOVERABLE_EXCEPTION = new URISyntaxException("", "");
private static final int MAX_TRIES = 3;
private static final Exception NON_RECOVERABLE_EXCEPTION = new BlobStorageException("", null, null);
private AzureInputDataConfig inputDataConfig;
private AzureAccountConfig accountConfig;
@ -97,9 +96,11 @@ public class AzureTaskLogsTest extends EasyMockSupport
try {
final File logFile = new File(tmpDir, "log");
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log");
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log", MAX_TRIES);
EasyMock.expectLastCall();
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
replayAll();
azureTaskLogs.pushTaskLog(TASK_ID, logFile);
@ -119,7 +120,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
try {
final File logFile = new File(tmpDir, "log");
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log");
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/log", MAX_TRIES);
EasyMock.expectLastCall().andThrow(new IOException());
replayAll();
@ -141,7 +143,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
try {
final File logFile = new File(tmpDir, "log");
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json", MAX_TRIES);
EasyMock.expectLastCall();
replayAll();
@ -163,7 +166,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
try {
final File logFile = new File(tmpDir, "status.json");
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/status.json");
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/status.json", MAX_TRIES);
EasyMock.expectLastCall();
replayAll();
@ -185,7 +189,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
try {
final File logFile = new File(tmpDir, "log");
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json");
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
azureStorage.uploadBlockBlob(logFile, CONTAINER, PREFIX + "/" + TASK_ID + "/report.json", MAX_TRIES);
EasyMock.expectLastCall().andThrow(new IOException());
replayAll();
@ -318,7 +323,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true);
EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) testLog.length());
EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andThrow(
new URISyntaxException("", ""));
new BlobStorageException("", null, null));
replayAll();
@ -333,10 +338,9 @@ public class AzureTaskLogsTest extends EasyMockSupport
@Test(expected = IOException.class)
public void test_streamTaskReports_exceptionWhenCheckingBlobExistence_throwsException() throws Exception
{
final String testLog = "hello this is a log";
final String blobPath = PREFIX + "/" + TASK_ID + "/report.json";
EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", ""));
EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new BlobStorageException("", null, null));
replayAll();
@ -393,7 +397,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andReturn(true);
EasyMock.expect(azureStorage.getBlockBlobLength(CONTAINER, blobPath)).andReturn((long) taskStatus.length());
EasyMock.expect(azureStorage.getBlockBlobInputStream(CONTAINER, blobPath)).andThrow(
new URISyntaxException("", ""));
new BlobStorageException("", null, null));
replayAll();
@ -409,7 +413,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
public void test_streamTaskStatus_exceptionWhenCheckingBlobExistence_throwsException() throws Exception
{
final String blobPath = PREFIX + "/" + TASK_ID + "/status.json";
EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new URISyntaxException("", ""));
EasyMock.expect(azureStorage.getBlockBlobExists(CONTAINER, blobPath)).andThrow(new BlobStorageException("", null, null));
replayAll();
@ -422,8 +426,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
public void test_killAll_noException_deletesAllTaskLogs() throws Exception
{
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_1);
@ -438,47 +442,24 @@ public class AzureTaskLogsTest extends EasyMockSupport
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1, object2),
ImmutableMap.of());
ImmutableMap.of(),
MAX_TRIES
);
EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
azureTaskLogs.killAll();
EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killAll_recoverableExceptionWhenDeletingObjects_deletesAllTaskLogs() throws Exception
{
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1));
EasyMock.replay(object1);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION));
EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
azureTaskLogs.killAll();
EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception
public void test_killAll_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
CloudBlobHolder object1 = null;
AzureCloudBlobIterable azureCloudBlobIterable = null;
try {
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(timeSupplier.getAsLong()).andReturn(TIME_NOW);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
@ -493,7 +474,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION),
MAX_TRIES
);
EasyMock.replay(
inputDataConfig,
@ -524,7 +506,7 @@ public class AzureTaskLogsTest extends EasyMockSupport
public void test_killOlderThan_noException_deletesOnlyTaskLogsOlderThan() throws Exception
{
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
CloudBlobHolder object2 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_2, TIME_FUTURE);
@ -539,45 +521,23 @@ public class AzureTaskLogsTest extends EasyMockSupport
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1),
ImmutableMap.of());
ImmutableMap.of(),
MAX_TRIES
);
EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
azureTaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, object2, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killOlderThan_recoverableExceptionWhenDeletingObjects_deletesOnlyTaskLogsOlderThan() throws Exception
{
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
CloudBlobHolder object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
AzureCloudBlobIterable azureCloudBlobIterable = AzureTestUtils.expectListObjects(
azureCloudBlobIterableFactory,
MAX_KEYS,
PREFIX_URI,
ImmutableList.of(object1));
EasyMock.replay(object1);
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(object1),
ImmutableMap.of(object1, RECOVERABLE_EXCEPTION));
EasyMock.replay(inputDataConfig, accountConfig, timeSupplier, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
azureTaskLogs.killOlderThan(TIME_NOW);
EasyMock.verify(inputDataConfig, accountConfig, timeSupplier, object1, azureCloudBlobIterable, azureCloudBlobIterableFactory, azureStorage);
}
@Test
public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs() throws Exception
public void test_killOlderThan_nonrecoverableExceptionWhenListingObjects_doesntDeleteAnyTaskLogs()
{
boolean ioExceptionThrown = false;
CloudBlobHolder object1 = null;
AzureCloudBlobIterable azureCloudBlobIterable = null;
try {
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_KEYS);
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).atLeastOnce();
EasyMock.expect(accountConfig.getMaxTries()).andReturn(MAX_TRIES).anyTimes();
object1 = AzureTestUtils.newCloudBlobHolder(CONTAINER, KEY_1, TIME_0);
@ -592,7 +552,8 @@ public class AzureTaskLogsTest extends EasyMockSupport
AzureTestUtils.expectDeleteObjects(
azureStorage,
ImmutableList.of(),
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION)
ImmutableMap.of(object1, NON_RECOVERABLE_EXCEPTION),
MAX_TRIES
);
EasyMock.replay(
inputDataConfig,

View File

@ -68,7 +68,9 @@ public class AzureTestUtils extends EasyMockSupport
public static void expectDeleteObjects(
AzureStorage storage,
List<CloudBlobHolder> deleteRequestsExpected,
Map<CloudBlobHolder, Exception> deleteRequestToException) throws Exception
Map<CloudBlobHolder, Exception> deleteRequestToException,
Integer maxTries
)
{
Map<CloudBlobHolder, IExpectationSetters<CloudBlobHolder>> requestToResultExpectationSetter = new HashMap<>();
@ -77,7 +79,7 @@ public class AzureTestUtils extends EasyMockSupport
Exception exception = requestsAndErrors.getValue();
IExpectationSetters<CloudBlobHolder> resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
if (resultExpectationSetter == null) {
storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName());
storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName(), maxTries);
resultExpectationSetter = EasyMock.<CloudBlobHolder>expectLastCall().andThrow(exception);
requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter);
} else {
@ -88,7 +90,7 @@ public class AzureTestUtils extends EasyMockSupport
for (CloudBlobHolder deleteObject : deleteRequestsExpected) {
IExpectationSetters<CloudBlobHolder> resultExpectationSetter = requestToResultExpectationSetter.get(deleteObject);
if (resultExpectationSetter == null) {
storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName());
storage.emptyCloudBlobDirectory(deleteObject.getContainerName(), deleteObject.getName(), maxTries);
resultExpectationSetter = EasyMock.expectLastCall();
requestToResultExpectationSetter.put(deleteObject, resultExpectationSetter);
}
@ -99,7 +101,7 @@ public class AzureTestUtils extends EasyMockSupport
public static CloudBlobHolder newCloudBlobHolder(
String container,
String prefix,
long lastModifiedTimestamp) throws Exception
long lastModifiedTimestamp)
{
CloudBlobHolder object = EasyMock.createMock(CloudBlobHolder.class);
EasyMock.expect(object.getContainerName()).andReturn(container).anyTimes();

View File

@ -19,16 +19,24 @@
package org.apache.druid.storage.azure;
import com.microsoft.azure.storage.StorageException;
import com.azure.core.http.HttpResponse;
import com.azure.storage.blob.models.BlobStorageException;
import org.apache.druid.data.input.azure.AzureInputSource;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.EasyMockSupport;
import org.easymock.Mock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.TimeoutException;
public class AzureUtilsTest
@RunWith(EasyMockRunner.class)
public class AzureUtilsTest extends EasyMockSupport
{
private static final String CONTAINER_NAME = "container1";
private static final String BLOB_NAME = "blob1";
@ -39,7 +47,7 @@ public class AzureUtilsTest
private static final URI URI_WITH_PATH_WITH_LEADING_SLASH;
private static final URISyntaxException URI_SYNTAX_EXCEPTION = new URISyntaxException("", "");
private static final StorageException STORAGE_EXCEPTION = new StorageException("", "", null);
private static final IOException IO_EXCEPTION = new IOException();
private static final RuntimeException RUNTIME_EXCEPTION = new RuntimeException();
private static final RuntimeException NULL_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION = new RuntimeException("", null);
@ -64,6 +72,9 @@ public class AzureUtilsTest
}
}
@Mock
private HttpResponse httpResponse;
@Test
public void test_extractAzureKey_pathHasLeadingSlash_returnsPathWithLeadingSlashRemoved()
{
@ -93,9 +104,64 @@ public class AzureUtilsTest
}
@Test
public void test_azureRetry_StorageException_returnsTrue()
public void test_azureRetry_StorageException_500ErrorCode_returnsTrue()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(STORAGE_EXCEPTION);
EasyMock.expect(httpResponse.getStatusCode()).andReturn(500).anyTimes();
replayAll();
BlobStorageException blobStorageException = new BlobStorageException("storage exception", httpResponse, null);
boolean retry = AzureUtils.AZURE_RETRY.apply(blobStorageException);
verifyAll();
Assert.assertTrue(retry);
}
@Test
public void test_azureRetry_StorageException_429ErrorCode_returnsTrue()
{
EasyMock.expect(httpResponse.getStatusCode()).andReturn(429).anyTimes();
replayAll();
BlobStorageException blobStorageException = new BlobStorageException("storage exception", httpResponse, null);
boolean retry = AzureUtils.AZURE_RETRY.apply(blobStorageException);
verifyAll();
Assert.assertTrue(retry);
}
@Test
public void test_azureRetry_StorageException_503ErrorCode_returnsTrue()
{
EasyMock.expect(httpResponse.getStatusCode()).andReturn(503).anyTimes();
replayAll();
BlobStorageException blobStorageException = new BlobStorageException("storage exception", httpResponse, null);
boolean retry = AzureUtils.AZURE_RETRY.apply(blobStorageException);
verifyAll();
Assert.assertTrue(retry);
}
@Test
public void test_azureRetry_StorageException_400ErrorCode_returnsFalse()
{
EasyMock.expect(httpResponse.getStatusCode()).andReturn(400).anyTimes();
replayAll();
BlobStorageException blobStorageException = new BlobStorageException("storage exception", httpResponse, null);
boolean retry = AzureUtils.AZURE_RETRY.apply(blobStorageException);
verifyAll();
Assert.assertFalse(retry);
}
@Test
public void test_azureRetry_nestedIOException_returnsTrue()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(new RuntimeException("runtime", new IOException("ioexception")));
Assert.assertTrue(retry);
}
@Test
public void test_azureRetry_nestedTimeoutException_returnsTrue()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(new RuntimeException("runtime", new TimeoutException("timeout exception")));
Assert.assertTrue(retry);
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.azure;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.util.IterableStream;
import java.util.Collection;
public class TestPagedResponse<T> implements PagedResponse<T>
{
private final Collection<T> responseItems;
public TestPagedResponse(Collection<T> responseItems)
{
this.responseItems = responseItems;
}
@Override
public int getStatusCode()
{
return 0;
}
@Override
public HttpHeaders getHeaders()
{
return null;
}
@Override
public HttpRequest getRequest()
{
return null;
}
@Override
public IterableStream<T> getElements()
{
return IterableStream.of(responseItems);
}
@Override
public String getContinuationToken()
{
return null;
}
@Override
public void close()
{
}
}

View File

@ -19,9 +19,9 @@
package org.apache.druid.storage.azure.output;
import com.azure.storage.blob.models.BlobStorageException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.microsoft.azure.storage.StorageException;
import org.apache.commons.io.IOUtils;
import org.apache.druid.storage.StorageConnector;
import org.apache.druid.storage.azure.AzureStorage;
@ -35,7 +35,6 @@ import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
@ -64,7 +63,7 @@ public class AzureStorageConnectorTest
@Test
public void testPathExistsSuccess() throws URISyntaxException, StorageException, IOException
public void testPathExistsSuccess() throws BlobStorageException, IOException
{
final Capture<String> bucket = Capture.newInstance();
final Capture<String> path = Capture.newInstance();
@ -79,7 +78,7 @@ public class AzureStorageConnectorTest
}
@Test
public void testPathExistsNotFound() throws URISyntaxException, StorageException, IOException
public void testPathExistsNotFound() throws BlobStorageException, IOException
{
final Capture<String> bucket = Capture.newInstance();
final Capture<String> path = Capture.newInstance();
@ -94,7 +93,7 @@ public class AzureStorageConnectorTest
}
@Test
public void testRead() throws URISyntaxException, StorageException, IOException
public void testRead() throws BlobStorageException, IOException
{
EasyMock.reset(azureStorage);
@ -122,7 +121,7 @@ public class AzureStorageConnectorTest
}
@Test
public void testReadRange() throws URISyntaxException, StorageException, IOException
public void testReadRange() throws BlobStorageException, IOException
{
String data = "test";
@ -151,7 +150,7 @@ public class AzureStorageConnectorTest
}
@Test
public void testDeleteSinglePath() throws URISyntaxException, StorageException, IOException
public void testDeleteSinglePath() throws BlobStorageException, IOException
{
EasyMock.reset(azureStorage);
Capture<String> containerCapture = EasyMock.newCapture();
@ -169,7 +168,7 @@ public class AzureStorageConnectorTest
}
@Test
public void testDeleteMultiplePaths() throws URISyntaxException, StorageException, IOException
public void testDeleteMultiplePaths() throws BlobStorageException, IOException
{
EasyMock.reset(azureStorage);
Capture<String> containerCapture = EasyMock.newCapture();
@ -189,7 +188,7 @@ public class AzureStorageConnectorTest
}
@Test
public void testListDir() throws URISyntaxException, StorageException, IOException
public void testListDir() throws BlobStorageException, IOException
{
EasyMock.reset(azureStorage);
EasyMock.expect(azureStorage.listDir(EasyMock.anyString(), EasyMock.anyString(), EasyMock.anyInt()))

View File

@ -229,8 +229,10 @@ libraries:
- com.fasterxml.jackson.core: jackson-core
- com.fasterxml.jackson.dataformat: jackson-dataformat-cbor
- com.fasterxml.jackson.dataformat: jackson-dataformat-smile
- com.fasterxml.jackson.dataformat: jackson-dataformat-xml
- com.fasterxml.jackson.datatype: jackson-datatype-guava
- com.fasterxml.jackson.datatype: jackson-datatype-joda
- com.fasterxml.jackson.datatype: jackson-datatype-jsr310
- com.fasterxml.jackson.jaxrs: jackson-jaxrs-base
- com.fasterxml.jackson.jaxrs: jackson-jaxrs-json-provider
- com.fasterxml.jackson.jaxrs: jackson-jaxrs-smile-provider
@ -1253,6 +1255,11 @@ libraries:
- io.netty: netty-transport-classes-epoll
- io.netty: netty-transport-native-epoll
- io.netty: netty-transport-native-unix-common
- io.netty: netty-codec-http2
- io.netty: netty-resolver-dns-classes-macos
- io.netty: netty-transport-classes-kqueue
- io.netty: netty-resolver-dns-native-macos
- io.netty: netty-transport-native-kqueue
notice: |
==
The Netty Project
@ -4180,14 +4187,406 @@ libraries:
---
name: Microsoft Azure Storage Client SDK
name: Microsoft Azure Blob Storage SDK
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 12.25.1
libraries:
- com.azure: azure-storage-blob
---
name: Microsoft Azure Identity SDK
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 1.11.1
libraries:
- com.azure: azure-identity
---
name: Microsoft Azure Batch Blob Storage SDK
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 12.21.1
libraries:
- com.azure: azure-storage-blob-batch
---
name: Microsoft Azure Storage Common
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 12.24.1
libraries:
- com.azure: azure-storage-common
---
name: Microsoft Azure Internal Avro
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 12.10.1
libraries:
- com.azure: azure-storage-internal-avro
---
name: Microsoft Azure JSON
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 1.1.0
libraries:
- com.azure: azure-json
---
name: Microsoft Azure Netty Http
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 1.13.11
libraries:
- com.azure: azure-core-http-netty
---
name: Microsoft Azure Core
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 1.45.1
libraries:
- com.azure: azure-core
---
name: Microsoft MSAL4J
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 1.14.0
libraries:
- com.microsoft.azure: msal4j
---
name: Microsoft MSAL4J Persistence
license_category: binary
module: extensions/druid-azure-extensions
license_name: MIT License
copyright: Microsoft
version: 1.2.0
libraries:
- com.microsoft.azure: msal4j-persistence-extension
---
name: NimbusDS Content Type
license_category: binary
module: extensions/druid-azure-extensions
license_name: Apache License version 2.0
copyright: Microsoft
version: 8.6.0
version: 2.2
libraries:
- com.microsoft.azure: azure-storage
- com.nimbusds: content-type
---
name: NimbusDS Jose
license_category: binary
module: extensions/druid-azure-extensions
license_name: Apache License version 2.0
version: 9.30.2
libraries:
- com.nimbusds: nimbus-jose-jwt
---
name: NimbusDS Oauth
license_category: binary
module: extensions/druid-azure-extensions
license_name: Apache License version 2.0
version: 10.7.1
libraries:
- com.nimbusds: oauth2-oidc-sdk
---
name: Reactor Netty
license_category: binary
module: extensions/druid-azure-extensions
license_name: Apache License version 2.0
version: 1.0.39
libraries:
- io.projectreactor.netty: reactor-netty-core
- io.projectreactor.netty: reactor-netty-http
---
name: Reactor Core
license_category: binary
module: extensions/druid-azure-extensions
license_name: Apache License version 2.0
version: 3.4.34
libraries:
- io.projectreactor: reactor-core
---
name: Woodstox
license_category: binary
module: extensions/druid-azure-extensions
license_name: Apache License version 2.0
version: 6.2.4
libraries:
- com.fasterxml.woodstox: woodstox-core
---
name: Netty
license_category: binary
module: extensions/druid-azure-extensions
license_name: Apache License version 2.0
version: 2.0.61.Final
libraries:
- io.netty: netty-tcnative-boringssl-static
- io.netty: netty-tcnative-classes
notice: |
==
The Netty Project
=================
Please visit the Netty web site for more information:
* http://netty.io/
Copyright 2014 The Netty Project
The Netty Project licenses this file to you under the Apache License,
version 2.0 (the "License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
Also, please refer to each LICENSE.<component>.txt file, which is located in
the 'license' directory of the distribution file, for the license terms of the
components that this product depends on.
-------------------------------------------------------------------------------
This product contains the extensions to Java Collections Framework which has
been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
* LICENSE:
* license/LICENSE.jsr166y.txt (Public Domain)
* HOMEPAGE:
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
* http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
This product contains a modified version of Robert Harder's Public Domain
Base64 Encoder and Decoder, which can be obtained at:
* LICENSE:
* license/LICENSE.base64.txt (Public Domain)
* HOMEPAGE:
* http://iharder.sourceforge.net/current/java/base64/
This product contains a modified portion of 'Webbit', an event based
WebSocket and HTTP server, which can be obtained at:
* LICENSE:
* license/LICENSE.webbit.txt (BSD License)
* HOMEPAGE:
* https://github.com/joewalnes/webbit
This product contains a modified portion of 'SLF4J', a simple logging
facade for Java, which can be obtained at:
* LICENSE:
* license/LICENSE.slf4j.txt (MIT License)
* HOMEPAGE:
* http://www.slf4j.org/
This product contains a modified portion of 'Apache Harmony', an open source
Java SE, which can be obtained at:
* NOTICE:
* license/NOTICE.harmony.txt
* LICENSE:
* license/LICENSE.harmony.txt (Apache License 2.0)
* HOMEPAGE:
* http://archive.apache.org/dist/harmony/
This product contains a modified portion of 'jbzip2', a Java bzip2 compression
and decompression library written by Matthew J. Francis. It can be obtained at:
* LICENSE:
* license/LICENSE.jbzip2.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jbzip2/
This product contains a modified portion of 'libdivsufsort', a C API library to construct
the suffix array and the Burrows-Wheeler transformed string for any input string of
a constant-size alphabet written by Yuta Mori. It can be obtained at:
* LICENSE:
* license/LICENSE.libdivsufsort.txt (MIT License)
* HOMEPAGE:
* https://github.com/y-256/libdivsufsort
This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:
* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools
This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at:
* LICENSE:
* license/LICENSE.jzlib.txt (BSD style License)
* HOMEPAGE:
* http://www.jcraft.com/jzlib/
This product optionally depends on 'Compress-LZF', a Java library for encoding and
decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
* LICENSE:
* license/LICENSE.compress-lzf.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/ning/compress
This product optionally depends on 'lz4', a LZ4 Java compression
and decompression library written by Adrien Grand. It can be obtained at:
* LICENSE:
* license/LICENSE.lz4.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jpountz/lz4-java
This product optionally depends on 'lzma-java', a LZMA Java compression
and decompression library, which can be obtained at:
* LICENSE:
* license/LICENSE.lzma-java.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/jponge/lzma-java
This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
and decompression library written by William Kinney. It can be obtained at:
* LICENSE:
* license/LICENSE.jfastlz.txt (MIT License)
* HOMEPAGE:
* https://code.google.com/p/jfastlz/
This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
interchange format, which can be obtained at:
* LICENSE:
* license/LICENSE.protobuf.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/protobuf
This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
a temporary self-signed X.509 certificate when the JVM does not provide the
equivalent functionality. It can be obtained at:
* LICENSE:
* license/LICENSE.bouncycastle.txt (MIT License)
* HOMEPAGE:
* http://www.bouncycastle.org/
This product optionally depends on 'Snappy', a compression library produced
by Google Inc, which can be obtained at:
* LICENSE:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* https://github.com/google/snappy
This product optionally depends on 'JBoss Marshalling', an alternative Java
serialization API, which can be obtained at:
* LICENSE:
* license/LICENSE.jboss-marshalling.txt (GNU LGPL 2.1)
* HOMEPAGE:
* http://www.jboss.org/jbossmarshalling
This product optionally depends on 'Caliper', Google's micro-
benchmarking framework, which can be obtained at:
* LICENSE:
* license/LICENSE.caliper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/google/caliper
This product optionally depends on 'Apache Commons Logging', a logging
framework, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-logging.txt (Apache License 2.0)
* HOMEPAGE:
* http://commons.apache.org/logging/
This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:
* LICENSE:
* license/LICENSE.log4j.txt (Apache License 2.0)
* HOMEPAGE:
* http://logging.apache.org/log4j/
This product optionally depends on 'Aalto XML', an ultra-high performance
non-blocking XML processor, which can be obtained at:
* LICENSE:
* license/LICENSE.aalto-xml.txt (Apache License 2.0)
* HOMEPAGE:
* http://wiki.fasterxml.com/AaltoHome
This product contains a modified version of 'HPACK', a Java implementation of
the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
* LICENSE:
* license/LICENSE.hpack.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/twitter/hpack
This product contains a modified portion of 'Apache Commons Lang', a Java library
provides utilities for the java.lang API, which can be obtained at:
* LICENSE:
* license/LICENSE.commons-lang.txt (Apache License 2.0)
* HOMEPAGE:
* https://commons.apache.org/proper/commons-lang/
This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build.
* LICENSE:
* license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
* HOMEPAGE:
* https://github.com/takari/maven-wrapper
---

View File

@ -2365,6 +2365,8 @@ isLeader
taskslots
loadstatus
sqlQueryId
useAzureCredentialsChain
DefaultAzureCredential
LAST_VALUE
markUnused
markUsed