diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index 31c67e1de47..c8b7a82bf34 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -40,7 +40,6 @@ import java.util.stream.Stream; public abstract class CloudObjectInputSource extends AbstractInputSource implements SplittableInputSource { - protected static final int MAX_LISTING_LENGTH = 1024; private final List uris; private final List prefixes; private final List objects; diff --git a/docs/development/extensions-contrib/azure.md b/docs/development/extensions-contrib/azure.md index c3f0d1639b0..ed05b424724 100644 --- a/docs/development/extensions-contrib/azure.md +++ b/docs/development/extensions-contrib/azure.md @@ -29,68 +29,15 @@ To use this Apache Druid extension, make sure to [include](../../development/ext [Microsoft Azure Storage](http://azure.microsoft.com/en-us/services/storage/) is another option for deep storage. This requires some additional Druid configuration. -|Property|Possible Values|Description|Default| +|Property|Description|Possible Values|Default| |--------|---------------|-----------|-------| |`druid.storage.type`|azure||Must be set.| |`druid.azure.account`||Azure Storage account name.|Must be set.| |`druid.azure.key`||Azure Storage account key.|Must be set.| |`druid.azure.container`||Azure Storage container name.|Must be set.| -|`druid.azure.protocol`|http or https||https| -|`druid.azure.maxTries`||Number of tries before cancel an Azure operation.|3| +|`druid.azure.prefix`|A prefix string that will be prepended to the blob names for the segments published to Azure deep storage| |""| +|`druid.azure.protocol`|the protocol to use|http or https|https| +|`druid.azure.maxTries`|Number of tries before canceling an Azure operation.| |3| +|`druid.azure.maxListingLength`|maximum number of input files matching a given prefix to retrieve at a time| |1024| See [Azure Services](http://azure.microsoft.com/en-us/pricing/free-trial/) for more information. - -## Firehose - - - -#### StaticAzureBlobStoreFirehose - -This firehose ingests events, similar to the StaticS3Firehose, but from an Azure Blob Store. - -Data is newline delimited, with one JSON object per line and parsed as per the `InputRowParser` configuration. - -The storage account is shared with the one used for Azure deep storage functionality, but blobs can be in a different container. - -As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz - -This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task). -Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object. - -Sample spec: - -```json -"firehose" : { - "type" : "static-azure-blobstore", - "blobs": [ - { - "container": "container", - "path": "/path/to/your/file.json" - }, - { - "container": "anothercontainer", - "path": "/another/path.json" - } - ] -} -``` - -This firehose provides caching and prefetching features. In IndexTask, a firehose can be read twice if intervals or -shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow. - -|property|description|default|required?| -|--------|-----------|-------|---------| -|type|This should be `static-azure-blobstore`.|N/A|yes| -|blobs|JSON array of [Azure blobs](https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx).|N/A|yes| -|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no| -|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no| -|prefetchTriggerBytes|Threshold to trigger prefetching Azure objects.|maxFetchCapacityBytes / 2|no| -|fetchTimeout|Timeout for fetching an Azure object.|60000|no| -|maxFetchRetry|Maximum retry for fetching an Azure object.|3|no| - -Azure Blobs: - -|property|description|default|required?| -|--------|-----------|-------|---------| -|container|Name of the azure [container](https://azure.microsoft.com/en-us/documentation/articles/storage-dotnet-how-to-use-blobs/#create-a-container)|N/A|yes| -|path|The path where data is located.|N/A|yes| diff --git a/docs/development/extensions-core/google.md b/docs/development/extensions-core/google.md index 58255073598..f76b9754f6d 100644 --- a/docs/development/extensions-core/google.md +++ b/docs/development/extensions-core/google.md @@ -50,8 +50,9 @@ Deep storage can be written to Google Cloud Storage either via this extension or To configure connectivity to google cloud, run druid processes with `GOOGLE_APPLICATION_CREDENTIALS=/path/to/service_account_keyfile` in the environment. -|Property|Possible Values|Description|Default| +|Property|Description|Possible Values|Default| |--------|---------------|-----------|-------| |`druid.storage.type`|google||Must be set.| -|`druid.google.bucket`||GCS bucket name.|Must be set.| -|`druid.google.prefix`||GCS prefix.|No-prefix| +|`druid.google.bucket`||Google Storage bucket name.|Must be set.| +|`druid.google.prefix`|A prefix string that will be prepended to the blob names for the segments published to Google deep storage| |""| +|`druid.google.maxListingLength`|maximum number of input files matching a given prefix to retrieve at a time| |1024| diff --git a/docs/development/extensions-core/s3.md b/docs/development/extensions-core/s3.md index 509b97a52af..bcc38014460 100644 --- a/docs/development/extensions-core/s3.md +++ b/docs/development/extensions-core/s3.md @@ -52,7 +52,7 @@ In addition to this you need to set additional configuration, specific for [deep |Property|Description|Default| |--------|-----------|-------| |`druid.storage.bucket`|Bucket to store in.|Must be set.| -|`druid.storage.baseKey`|Base key prefix to use, i.e. what directory.|Must be set.| +|`druid.storage.baseKey`|A prefix string that will be prepended to the object names for the segments published to S3 deep storage|Must be set.| |`druid.storage.type`|Global deep storage provider. Must be set to `s3` to make use of this extension.|Must be set (likely `s3`).| |`druid.storage.archiveBucket`|S3 bucket name for archiving when running the *archive task*.|none| |`druid.storage.archiveBaseKey`|S3 object key prefix for archiving.|none| diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index d4117d95b95..e19f0efbed5 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -57,6 +57,7 @@ The supported splittable input formats for now are: - [`s3`](#s3-input-source) reads data from AWS S3 storage. - [`gs`](#google-cloud-storage-input-source) reads data from Google Cloud Storage. +- [`azure`](#azure-input-source) reads data from Azure Blob Storage. - [`hdfs`](#hdfs-input-source) reads data from HDFS storage. - [`http`](#http-input-source) reads data from HTTP servers. - [`local`](#local-input-source) reads data from local storage. @@ -66,7 +67,6 @@ Some other cloud storage types are supported with the legacy [`firehose`](#fireh The below `firehose` types are also splittable. Note that only text formats are supported with the `firehose`. -- [`static-azure-blobstore`](../development/extensions-contrib/azure.md#firehose) - [`static-cloudfiles`](../development/extensions-contrib/cloudfiles.md#firehose) The splittable `inputSource` (and `firehose`) types are responsible for generating _splits_. @@ -923,6 +923,83 @@ Google Cloud Storage object: |bucket|Name of the Google Cloud Storage bucket|None|yes| |path|The path where data is located.|None|yes| +### Azure Input Source + +> You need to include the [`druid-azure-extensions`](../development/extensions-contrib/azure.md) as an extension to use the Azure input source. + +The Azure input source is to support reading objects directly from Azure Blob store. Objects can be +specified as list of Azure Blob store URI strings. The Azure input source is splittable and can be used +by the [Parallel task](#parallel-task), where each worker task of `index_parallel` will read +a single object. + +Sample specs: + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "azure", + "uris": ["azure://container/prefix1/file.json", "azure://container/prefix2/file2.json"] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "azure", + "prefixes": ["azure://container/prefix1", "azure://container/prefix2"] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "azure", + "objects": [ + { "bucket": "container", "path": "prefix1/file1.json"}, + { "bucket": "container", "path": "prefix2/file2.json"} + ] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should be `google`.|None|yes| +|uris|JSON array of URIs where Azure Blob objects to be ingested are located. Should be in form "azure://\/\"|None|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\/\"|None|`uris` or `prefixes` or `objects` must be set| +|objects|JSON array of Azure Blob objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set| + +Azure Blob object: + +|property|description|default|required?| +|--------|-----------|-------|---------| +|bucket|Name of the Azure Blob Storage container|None|yes| +|path|The path where data is located.|None|yes| + ### HDFS Input Source > You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFS input source. diff --git a/extensions-contrib/azure-extensions/pom.xml b/extensions-contrib/azure-extensions/pom.xml index 82fc972d646..9162ad8982d 100644 --- a/extensions-contrib/azure-extensions/pom.xml +++ b/extensions-contrib/azure-extensions/pom.xml @@ -126,6 +126,11 @@ jsr305 provided + + commons-lang + commons-lang + provided + @@ -175,7 +180,7 @@ INSTRUCTION COVEREDRATIO - 0.84 + 0.85 LINE @@ -185,17 +190,17 @@ BRANCH COVEREDRATIO - 0.86 + 0.87 COMPLEXITY COVEREDRATIO - 0.80 + 0.79 METHOD COVEREDRATIO - 0.79 + 0.78 CLASS diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java index 7c5ad9d51f1..6bf94f4b82a 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/data/input/azure/AzureInputSource.java @@ -22,7 +22,6 @@ package org.apache.druid.data.input.azure; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputSplit; @@ -31,6 +30,7 @@ import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter; import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory; +import org.apache.druid.storage.azure.AzureInputDataConfig; import org.apache.druid.storage.azure.AzureStorage; import org.apache.druid.storage.azure.blob.CloudBlobHolder; @@ -47,14 +47,13 @@ import java.util.stream.StreamSupport; */ public class AzureInputSource extends CloudObjectInputSource { - @VisibleForTesting - static final int MAX_LISTING_LENGTH = 1024; public static final String SCHEME = "azure"; private final AzureStorage storage; private final AzureEntityFactory entityFactory; private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; private final AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter; + private final AzureInputDataConfig inputDataConfig; @JsonCreator public AzureInputSource( @@ -62,6 +61,7 @@ public class AzureInputSource extends CloudObjectInputSource @JacksonInject AzureEntityFactory entityFactory, @JacksonInject AzureCloudBlobIterableFactory azureCloudBlobIterableFactory, @JacksonInject AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter, + @JacksonInject AzureInputDataConfig inputDataConfig, @JsonProperty("uris") @Nullable List uris, @JsonProperty("prefixes") @Nullable List prefixes, @JsonProperty("objects") @Nullable List objects @@ -74,6 +74,7 @@ public class AzureInputSource extends CloudObjectInputSource azureCloudBlobIterableFactory, "AzureCloudBlobIterableFactory" ); + this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "AzureInputDataConfig"); this.azureCloudBlobToLocationConverter = Preconditions.checkNotNull(azureCloudBlobToLocationConverter, "AzureCloudBlobToLocationConverter"); } @@ -85,6 +86,7 @@ public class AzureInputSource extends CloudObjectInputSource entityFactory, azureCloudBlobIterableFactory, azureCloudBlobToLocationConverter, + inputDataConfig, null, null, ImmutableList.of(split.get()) @@ -117,7 +119,7 @@ public class AzureInputSource extends CloudObjectInputSource private Iterable getIterableObjectsFromPrefixes() { - return azureCloudBlobIterableFactory.create(getPrefixes(), MAX_LISTING_LENGTH); + return azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength()); } @Override @@ -128,7 +130,8 @@ public class AzureInputSource extends CloudObjectInputSource storage, entityFactory, azureCloudBlobIterableFactory, - azureCloudBlobToLocationConverter + azureCloudBlobToLocationConverter, + inputDataConfig ); } @@ -148,6 +151,7 @@ public class AzureInputSource extends CloudObjectInputSource return storage.equals(that.storage) && entityFactory.equals(that.entityFactory) && azureCloudBlobIterableFactory.equals(that.azureCloudBlobIterableFactory) && - azureCloudBlobToLocationConverter.equals(that.azureCloudBlobToLocationConverter); + azureCloudBlobToLocationConverter.equals(that.azureCloudBlobToLocationConverter) && + inputDataConfig.equals(that.inputDataConfig); } } diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java index 3021f4287bd..a6b08885261 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureAccountConfig.java @@ -44,10 +44,6 @@ public class AzureAccountConfig @NotNull private String key; - @JsonProperty - @NotNull - private String container; - @SuppressWarnings("unused") // Used by Jackson deserialization? public void setProtocol(String protocol) { @@ -71,11 +67,6 @@ public class AzureAccountConfig this.key = key; } - public void setContainer(String container) - { - this.container = container; - } - public String getProtocol() { return protocol; @@ -95,9 +86,4 @@ public class AzureAccountConfig { return key; } - - public String getContainer() - { - return container; - } } diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java index 69578589666..a8461de0e88 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureByteSource.java @@ -63,12 +63,12 @@ public class AzureByteSource extends ByteSource return azureStorage.getBlobInputStream(offset, containerName, blobPath); } catch (StorageException | URISyntaxException e) { - log.warn("Exception when opening stream to azure resource, containerName: %s, blobPath: %s, Error: %s", - containerName, blobPath, e.getMessage() - ); if (AzureUtils.AZURE_RETRY.apply(e)) { throw new IOException("Recoverable exception", e); } + log.error("Exception when opening stream to azure resource, containerName: %s, blobPath: %s, Error: %s", + containerName, blobPath, e.getMessage() + ); throw new RuntimeException(e); } } diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java index 4dc0de1b30b..72d6509f5ec 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureCloudBlobIterator.java @@ -105,7 +105,7 @@ public class AzureCloudBlobIterator implements Iterator URI currentUri = prefixesIterator.next(); currentContainer = currentUri.getAuthority(); currentPrefix = AzureUtils.extractAzureKey(currentUri); - log.debug("prepareNextRequest:\ncurrentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s", + log.debug("currentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s", currentUri, currentContainer, currentPrefix ); result = null; @@ -115,6 +115,12 @@ public class AzureCloudBlobIterator implements Iterator private void fetchNextBatch() { try { + log.debug( + "fetching up to %s resources in container '%s' with prefix '%s'", + maxListingLength, + currentContainer, + currentPrefix + ); result = AzureUtils.retryAzureOperation(() -> storage.listBlobsWithPrefixInContainerSegmented( currentContainer, currentPrefix, diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentConfig.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentConfig.java new file mode 100644 index 00000000000..e84658caf8a --- /dev/null +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentConfig.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.azure; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.NotNull; + +/** + * Stores the configuration for segments written to Azure deep storage + */ +public class AzureDataSegmentConfig +{ + @JsonProperty + @NotNull + private String container; + + @JsonProperty + @NotNull + private String prefix = ""; + + public void setContainer(String container) + { + this.container = container; + } + + public void setPrefix(String prefix) + { + this.prefix = prefix; + } + + public String getContainer() + { + return container; + } + + public String getPrefix() + { + return prefix; + } +} diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java index f3f677b7975..727c28a9608 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPuller.java @@ -36,13 +36,6 @@ public class AzureDataSegmentPuller { private static final Logger log = new Logger(AzureDataSegmentPuller.class); - // The azure storage hadoop access pattern is: - // wasb[s]://@.blob.core.windows.net/ - // (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage) - static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs"; - - static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net"; - private final AzureByteSourceFactory byteSourceFactory; @Inject diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java index c5217f531a8..14ea31426ce 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureDataSegmentPusher.java @@ -47,16 +47,19 @@ public class AzureDataSegmentPusher implements DataSegmentPusher private static final Logger log = new Logger(AzureDataSegmentPusher.class); static final List ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP = ImmutableList.of("druid.azure"); private final AzureStorage azureStorage; - private final AzureAccountConfig config; + private final AzureAccountConfig accountConfig; + private final AzureDataSegmentConfig segmentConfig; @Inject public AzureDataSegmentPusher( AzureStorage azureStorage, - AzureAccountConfig config + AzureAccountConfig accountConfig, + AzureDataSegmentConfig segmentConfig ) { this.azureStorage = azureStorage; - this.config = config; + this.accountConfig = accountConfig; + this.segmentConfig = segmentConfig; } @Deprecated @@ -69,12 +72,15 @@ public class AzureDataSegmentPusher implements DataSegmentPusher @Override public String getPathForHadoop() { + String prefix = segmentConfig.getPrefix(); + boolean prefixIsNullOrEmpty = org.apache.commons.lang.StringUtils.isEmpty(prefix); String hadoopPath = StringUtils.format( - "%s://%s@%s.%s/", - AzureDataSegmentPuller.AZURE_STORAGE_HADOOP_PROTOCOL, - config.getContainer(), - config.getAccount(), - AzureDataSegmentPuller.AZURE_STORAGE_HOST_ADDRESS + "%s://%s@%s.%s/%s", + AzureUtils.AZURE_STORAGE_HADOOP_PROTOCOL, + segmentConfig.getContainer(), + accountConfig.getAccount(), + AzureUtils.AZURE_STORAGE_HOST_ADDRESS, + prefixIsNullOrEmpty ? "" : StringUtils.maybeRemoveTrailingSlash(prefix) + '/' ); log.info("Using Azure blob storage Hadoop path: %s", hadoopPath); @@ -85,7 +91,10 @@ public class AzureDataSegmentPusher implements DataSegmentPusher @Override public String getStorageDir(DataSegment dataSegment, boolean useUniquePath) { + String prefix = segmentConfig.getPrefix(); + boolean prefixIsNullOrEmpty = org.apache.commons.lang.StringUtils.isEmpty(prefix); String seg = JOINER.join( + prefixIsNullOrEmpty ? null : StringUtils.maybeRemoveTrailingSlash(prefix), dataSegment.getDataSource(), StringUtils.format( "%s_%s", @@ -127,7 +136,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher return AzureUtils.retryAzureOperation( () -> uploadDataSegment(segment, binaryVersion, size, outFile, azurePath), - config.getMaxTries() + accountConfig.getMaxTries() ); } catch (Exception e) { @@ -148,7 +157,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher "type", AzureStorageDruidModule.SCHEME, "containerName", - config.getContainer(), + segmentConfig.getContainer(), "blobPath", uri.toString() ); @@ -173,7 +182,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher ) throws StorageException, IOException, URISyntaxException { - azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePath); + azureStorage.uploadBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath); final DataSegment outSegment = segment .withSize(size) diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureInputDataConfig.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureInputDataConfig.java new file mode 100644 index 00000000000..b78b6e6835a --- /dev/null +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureInputDataConfig.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.azure; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Min; + +/** + * Stores the configuration for options related to reading + * input data from Azure blob store into Druid + */ +public class AzureInputDataConfig +{ + /** + * The maximum number of input files matching a given prefix to retrieve + * from Azure at a time. + */ + @JsonProperty + @Min(1) + private int maxListingLength = 1024; + + public void setMaxListingLength(int maxListingLength) + { + this.maxListingLength = maxListingLength; + } + + public int getMaxListingLength() + { + return maxListingLength; + } +} diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java index 40bf4a12081..29318906eb9 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorage.java @@ -46,7 +46,7 @@ public class AzureStorage { private static final boolean USE_FLAT_BLOB_LISTING = true; - private final Logger log = new Logger(AzureStorage.class); + private static final Logger log = new Logger(AzureStorage.class); private final CloudBlobClient cloudBlobClient; diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java index 1c6e84bab73..4ae7bc1cd9f 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureStorageDruidModule.java @@ -87,6 +87,8 @@ public class AzureStorageDruidModule implements DruidModule @Override public void configure(Binder binder) { + JsonConfigProvider.bind(binder, "druid.azure", AzureInputDataConfig.class); + JsonConfigProvider.bind(binder, "druid.azure", AzureDataSegmentConfig.class); JsonConfigProvider.bind(binder, "druid.azure", AzureAccountConfig.class); Binders.dataSegmentPusherBinder(binder) diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java index d53d2d638d1..2a79ec87e1b 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/storage/azure/AzureUtils.java @@ -39,24 +39,34 @@ public class AzureUtils @VisibleForTesting static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net"; + // The azure storage hadoop access pattern is: + // wasb[s]://@.blob.core.windows.net/ + // (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage) + static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs"; + public static final Predicate AZURE_RETRY = e -> { - if (e instanceof URISyntaxException) { + if (e == null) { return false; } + for (Throwable t = e; t != null; t = t.getCause()) { + if (t instanceof URISyntaxException) { + return false; + } - if (e instanceof StorageException) { - return true; + if (t instanceof StorageException) { + return true; + } + + if (t instanceof IOException) { + return true; + } } - - if (e instanceof IOException) { - return true; - } - return false; }; /** * extracts the path component of the supplied uri with any leading '/' characters removed. + * * @param uri the uri to extract the path for * @return a String representing the path component of the uri with any leading '/' * characters removed. @@ -68,6 +78,7 @@ public class AzureUtils /** * extracts the blob path component of the supplied uri with any leading 'blob.core.windows.net/' string removed. + * * @param blobPath the path of the blob * @return a String representing the blob path component of the uri with any leading 'blob.core.windows.net/' string * removed characters removed. diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java index 5ca65598832..03f83c5bcde 100644 --- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceSerdeTest.java @@ -26,6 +26,8 @@ import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter; import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory; +import org.apache.druid.storage.azure.AzureDataSegmentConfig; +import org.apache.druid.storage.azure.AzureInputDataConfig; import org.apache.druid.storage.azure.AzureStorage; import org.apache.druid.storage.azure.AzureStorageDruidModule; import org.easymock.EasyMockSupport; @@ -64,6 +66,7 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport private AzureEntityFactory entityFactory; private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter; + private AzureInputDataConfig inputDataConfig; static { try { @@ -86,20 +89,14 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport entityFactory = createMock(AzureEntityFactory.class); azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class); azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class); + inputDataConfig = createMock(AzureInputDataConfig.class); } @Test public void test_uriSerde_constructsProperAzureInputSource() throws Exception { - final InjectableValues.Std injectableValues = new InjectableValues.Std(); - injectableValues.addValue(AzureStorage.class, azureStorage); - injectableValues.addValue(AzureEntityFactory.class, entityFactory); - injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory); - injectableValues.addValue( - AzureCloudBlobHolderToCloudObjectLocationConverter.class, - azureCloudBlobToLocationConverter - ); + final InjectableValues.Std injectableValues = initInjectableValues(); final ObjectMapper objectMapper = new DefaultObjectMapper() .registerModules(new AzureStorageDruidModule().getJacksonModules()); objectMapper.setInjectableValues(injectableValues); @@ -117,14 +114,8 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport @Test public void test_prefixSerde_constructsProperAzureInputSource() throws Exception { - final InjectableValues.Std injectableValues = new InjectableValues.Std(); - injectableValues.addValue(AzureStorage.class, azureStorage); - injectableValues.addValue(AzureEntityFactory.class, entityFactory); - injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory); - injectableValues.addValue( - AzureCloudBlobHolderToCloudObjectLocationConverter.class, - azureCloudBlobToLocationConverter - ); + final InjectableValues.Std injectableValues = initInjectableValues(); + injectableValues.addValue(AzureDataSegmentConfig.class, inputDataConfig); final ObjectMapper objectMapper = new DefaultObjectMapper() .registerModules(new AzureStorageDruidModule().getJacksonModules()); objectMapper.setInjectableValues(injectableValues); @@ -142,14 +133,7 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport @Test public void test_objectSerde_constructsProperAzureInputSource() throws Exception { - final InjectableValues.Std injectableValues = new InjectableValues.Std(); - injectableValues.addValue(AzureStorage.class, azureStorage); - injectableValues.addValue(AzureEntityFactory.class, entityFactory); - injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory); - injectableValues.addValue( - AzureCloudBlobHolderToCloudObjectLocationConverter.class, - azureCloudBlobToLocationConverter - ); + final InjectableValues.Std injectableValues = initInjectableValues(); final ObjectMapper objectMapper = new DefaultObjectMapper() .registerModules(new AzureStorageDruidModule().getJacksonModules()); objectMapper.setInjectableValues(injectableValues); @@ -161,7 +145,20 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport objectMapper.writeValueAsBytes(inputSource), AzureInputSource.class); verifyInputSourceWithObjects(roundTripInputSource); + } + private InjectableValues.Std initInjectableValues() + { + final InjectableValues.Std injectableValues = new InjectableValues.Std(); + injectableValues.addValue(AzureStorage.class, azureStorage); + injectableValues.addValue(AzureEntityFactory.class, entityFactory); + injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory); + injectableValues.addValue( + AzureCloudBlobHolderToCloudObjectLocationConverter.class, + azureCloudBlobToLocationConverter + ); + injectableValues.addValue(AzureInputDataConfig.class, inputDataConfig); + return injectableValues; } private static void verifyInputSourceWithUris(final AzureInputSource inputSource) diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java index 3fb952e287c..59e7288b557 100644 --- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/data/input/azure/AzureInputSourceTest.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter; import org.apache.druid.storage.azure.AzureCloudBlobIterable; import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory; +import org.apache.druid.storage.azure.AzureInputDataConfig; import org.apache.druid.storage.azure.AzureStorage; import org.apache.druid.storage.azure.blob.CloudBlobHolder; import org.easymock.EasyMock; @@ -55,11 +56,13 @@ public class AzureInputSourceTest extends EasyMockSupport private static final String CONTAINER = "CONTAINER"; private static final String BLOB_PATH = "BLOB_PATH"; private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new CloudObjectLocation(CONTAINER, BLOB_PATH); + private static final int MAX_LISTING_LENGTH = 10; private AzureStorage storage; private AzureEntityFactory entityFactory; private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter; + private AzureInputDataConfig inputDataConfig; private InputSplit inputSplit; private AzureEntity azureEntity1; @@ -86,6 +89,7 @@ public class AzureInputSourceTest extends EasyMockSupport azureEntity1 = createMock(AzureEntity.class); azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class); azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class); + inputDataConfig = createMock(AzureInputDataConfig.class); cloudBlobDruid1 = createMock(CloudBlobHolder.class); azureCloudBlobIterable = createMock(AzureCloudBlobIterable.class); } @@ -99,6 +103,7 @@ public class AzureInputSourceTest extends EasyMockSupport entityFactory, azureCloudBlobIterableFactory, azureCloudBlobToLocationConverter, + inputDataConfig, EMPTY_URIS, EMPTY_PREFIXES, EMPTY_OBJECTS @@ -118,6 +123,7 @@ public class AzureInputSourceTest extends EasyMockSupport entityFactory, azureCloudBlobIterableFactory, azureCloudBlobToLocationConverter, + inputDataConfig, EMPTY_URIS, EMPTY_PREFIXES, objects @@ -135,7 +141,8 @@ public class AzureInputSourceTest extends EasyMockSupport List expectedCloudLocations = ImmutableList.of(CLOUD_OBJECT_LOCATION_1); List expectedCloudBlobs = ImmutableList.of(cloudBlobDruid1); Iterator expectedCloudBlobsIterator = expectedCloudBlobs.iterator(); - EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, AzureInputSource.MAX_LISTING_LENGTH)).andReturn( + EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH); + EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, MAX_LISTING_LENGTH)).andReturn( azureCloudBlobIterable); EasyMock.expect(azureCloudBlobIterable.spliterator()) .andReturn(Spliterators.spliteratorUnknownSize(expectedCloudBlobsIterator, 0)); @@ -148,6 +155,7 @@ public class AzureInputSourceTest extends EasyMockSupport entityFactory, azureCloudBlobIterableFactory, azureCloudBlobToLocationConverter, + inputDataConfig, EMPTY_URIS, prefixes, EMPTY_OBJECTS @@ -173,6 +181,7 @@ public class AzureInputSourceTest extends EasyMockSupport entityFactory, azureCloudBlobIterableFactory, azureCloudBlobToLocationConverter, + inputDataConfig, EMPTY_URIS, prefixes, EMPTY_OBJECTS @@ -192,6 +201,7 @@ public class AzureInputSourceTest extends EasyMockSupport entityFactory, azureCloudBlobIterableFactory, azureCloudBlobToLocationConverter, + inputDataConfig, EMPTY_URIS, prefixes, EMPTY_OBJECTS @@ -211,6 +221,7 @@ public class AzureInputSourceTest extends EasyMockSupport .withNonnullFields("entityFactory") .withNonnullFields("azureCloudBlobIterableFactory") .withNonnullFields("azureCloudBlobToLocationConverter") + .withNonnullFields("inputDataConfig") .verify(); } diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java index c8b8eeabafc..fc984349628 100644 --- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPullerTest.java @@ -40,7 +40,7 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport private static final String SEGMENT_FILE_NAME = "segment"; private static final String CONTAINER_NAME = "container"; private static final String BLOB_PATH = "path/to/storage/index.zip"; - private static final String BLOB_PATH_HADOOP = AzureDataSegmentPuller.AZURE_STORAGE_HOST_ADDRESS + "/path/to/storage/index.zip"; + private static final String BLOB_PATH_HADOOP = AzureUtils.AZURE_STORAGE_HOST_ADDRESS + "/path/to/storage/index.zip"; private AzureStorage azureStorage; private AzureByteSourceFactory byteSourceFactory; diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java index 7da2339ab63..3c09f8ed804 100644 --- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureDataSegmentPusherTest.java @@ -19,11 +19,9 @@ package org.apache.druid.storage.azure; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import com.microsoft.azure.storage.StorageException; -import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.StringUtils; @@ -51,7 +49,9 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport @Rule public final TemporaryFolder tempFolder = new TemporaryFolder(); + private static final String ACCOUNT = "account"; private static final String CONTAINER_NAME = "container"; + private static final String PREFIX = "prefix"; private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"; private static final DataSegment DATA_SEGMENT = new DataSegment( "test", @@ -65,8 +65,10 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport 1 ); private static final byte[] DATA = new byte[]{0x0, 0x0, 0x0, 0x1}; - private static final String UNIQUE_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip"; - private static final String NON_UNIQUE_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip"; + private static final String UNIQUE_MATCHER_NO_PREFIX = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip"; + private static final String UNIQUE_MATCHER_PREFIX = PREFIX + "/" + UNIQUE_MATCHER_NO_PREFIX; + private static final String NON_UNIQUE_NO_PREFIX_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip"; + private static final String NON_UNIQUE_WITH_PREFIX_MATCHER = PREFIX + "/" + "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip"; private static final DataSegment SEGMENT_TO_PUSH = new DataSegment( "foo", @@ -82,24 +84,30 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport private AzureStorage azureStorage; private AzureAccountConfig azureAccountConfig; - private ObjectMapper jsonMapper; + private AzureDataSegmentConfig segmentConfigWithPrefix; + private AzureDataSegmentConfig segmentConfigWithoutPrefix; @Before public void before() { azureStorage = createMock(AzureStorage.class); azureAccountConfig = new AzureAccountConfig(); - azureAccountConfig.setAccount("account"); - azureAccountConfig.setContainer("container"); + azureAccountConfig.setAccount(ACCOUNT); - jsonMapper = new DefaultObjectMapper(); + segmentConfigWithPrefix = new AzureDataSegmentConfig(); + segmentConfigWithPrefix.setContainer(CONTAINER_NAME); + segmentConfigWithPrefix.setPrefix(PREFIX + "/"); + + segmentConfigWithoutPrefix = new AzureDataSegmentConfig(); + segmentConfigWithoutPrefix.setContainer(CONTAINER_NAME); } @Test - public void test_push_nonUniquePath_succeeds() throws Exception + public void test_push_nonUniquePathNoPrefix_succeeds() throws Exception { boolean useUniquePath = false; - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithoutPrefix + ); // Create a mock segment on disk File tmp = tempFolder.newFile("version.bin"); @@ -116,7 +124,70 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport Assert.assertTrue( segment.getLoadSpec().get("blobPath").toString(), - Pattern.compile(NON_UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() + Pattern.compile(NON_UNIQUE_NO_PREFIX_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() + ); + + Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize()); + + verifyAll(); + } + + @Test + public void test_push_nonUniquePathWithPrefix_succeeds() throws Exception + { + boolean useUniquePath = false; + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix + ); + + // Create a mock segment on disk + File tmp = tempFolder.newFile("version.bin"); + + Files.write(DATA, tmp); + + String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath); + azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath)); + EasyMock.expectLastCall(); + + replayAll(); + + DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath); + + Assert.assertTrue( + segment.getLoadSpec().get("blobPath").toString(), + Pattern.compile(NON_UNIQUE_WITH_PREFIX_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() + ); + + Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize()); + + verifyAll(); + } + + @Test + public void test_push_uniquePathNoPrefix_succeeds() throws Exception + { + boolean useUniquePath = true; + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithoutPrefix); + + // Create a mock segment on disk + File tmp = tempFolder.newFile("version.bin"); + + Files.write(DATA, tmp); + + String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath); + azureStorage.uploadBlob( + EasyMock.anyObject(File.class), + EasyMock.eq(CONTAINER_NAME), + EasyMock.matches(UNIQUE_MATCHER_NO_PREFIX) + ); + EasyMock.expectLastCall(); + + replayAll(); + + DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath); + + Assert.assertTrue( + segment.getLoadSpec().get("blobPath").toString(), + Pattern.compile(UNIQUE_MATCHER_NO_PREFIX).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() ); Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize()); @@ -128,7 +199,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport public void test_push_uniquePath_succeeds() throws Exception { boolean useUniquePath = true; - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix); // Create a mock segment on disk File tmp = tempFolder.newFile("version.bin"); @@ -136,7 +207,11 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport Files.write(DATA, tmp); String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath); - azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.matches(UNIQUE_MATCHER)); + azureStorage.uploadBlob( + EasyMock.anyObject(File.class), + EasyMock.eq(CONTAINER_NAME), + EasyMock.matches(UNIQUE_MATCHER_PREFIX) + ); EasyMock.expectLastCall(); replayAll(); @@ -145,7 +220,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport Assert.assertTrue( segment.getLoadSpec().get("blobPath").toString(), - Pattern.compile(UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() + Pattern.compile(UNIQUE_MATCHER_PREFIX).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() ); Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize()); @@ -157,7 +232,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport public void test_push_exception_throwsException() throws Exception { boolean useUniquePath = true; - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix); // Create a mock segment on disk File tmp = tempFolder.newFile("version.bin"); @@ -175,7 +250,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport Assert.assertTrue( segment.getLoadSpec().get("blobPath").toString(), - Pattern.compile(UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() + Pattern.compile(UNIQUE_MATCHER_NO_PREFIX).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() ); Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize()); @@ -187,7 +262,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport public void getAzurePathsTest() { - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix); final String storageDir = pusher.getStorageDir(DATA_SEGMENT, false); final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false); @@ -200,7 +275,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport @Test public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException { - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix); final int binaryVersion = 9; final File compressedSegmentData = new File("index.zip"); final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false); @@ -228,25 +303,41 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport } @Test - public void getPathForHadoopTest() + public void getPathForHadoopWithPrefixTest() { - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix); + String hadoopPath = pusher.getPathForHadoop(); + Assert.assertEquals("wasbs://container@account.blob.core.windows.net/prefix/", hadoopPath); + } + + @Test + public void getPathForHadoopWithoutPrefixTest() + { + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithoutPrefix); String hadoopPath = pusher.getPathForHadoop(); Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath); } @Test - public void test_getPathForHadoop_noArgs_succeeds() + public void test_getPathForHadoop_noArgsWithoutPrefix_succeeds() { - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithoutPrefix); String hadoopPath = pusher.getPathForHadoop(""); Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath); } + @Test + public void test_getPathForHadoop_noArgsWithPrefix_succeeds() + { + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix); + String hadoopPath = pusher.getPathForHadoop(""); + Assert.assertEquals("wasbs://container@account.blob.core.windows.net/prefix/", hadoopPath); + } + @Test public void test_getAllowedPropertyPrefixesForHadoop_returnsExpcetedPropertyPrefixes() { - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix); List actualPropertyPrefixes = pusher.getAllowedPropertyPrefixesForHadoop(); Assert.assertEquals(AzureDataSegmentPusher.ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP, actualPropertyPrefixes); } @@ -254,7 +345,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport @Test public void storageDirContainsNoColonsTest() { - AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); + AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix); DataSegment withColons = DATA_SEGMENT.withVersion("2018-01-05T14:54:09.295Z"); String segmentPath = pusher.getStorageDir(withColons, false); Assert.assertFalse("Path should not contain any columns", segmentPath.contains(":")); diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java index 4c5b16a3b30..f8dd7923081 100644 --- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureStorageDruidModuleTest.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; -import com.google.inject.Key; import com.google.inject.Module; import com.microsoft.azure.storage.StorageCredentials; import com.microsoft.azure.storage.blob.CloudBlobClient; @@ -33,6 +32,7 @@ import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.guice.DruidGuiceExtensions; import org.apache.druid.guice.JsonConfigurator; import org.apache.druid.guice.LazySingleton; +import org.apache.druid.jackson.JacksonModule; import org.apache.druid.storage.azure.blob.ListBlobItemHolder; import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory; import org.easymock.EasyMock; @@ -53,6 +53,8 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport private static final String AZURE_ACCOUNT_NAME; private static final String AZURE_ACCOUNT_KEY; private static final String AZURE_CONTAINER; + private static final String AZURE_PREFIX; + private static final int AZURE_MAX_LISTING_LENGTH; private static final String PATH = "path"; private static final Iterable EMPTY_PREFIXES_ITERABLE = ImmutableList.of(); private static final Properties PROPERTIES; @@ -71,6 +73,8 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport AZURE_ACCOUNT_KEY = Base64.getUrlEncoder() .encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8.toString())); AZURE_CONTAINER = "azureContainer1"; + AZURE_PREFIX = "azurePrefix1"; + AZURE_MAX_LISTING_LENGTH = 10; PROPERTIES = initializePropertes(); } catch (Exception e) { @@ -88,14 +92,38 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport } @Test - public void test_getBlobClient_expectedClient() + public void test_getAzureAccountConfig_expectedConfig() { injector = makeInjectorWithProperties(PROPERTIES); - AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class)); + AzureAccountConfig azureAccountConfig = injector.getInstance(AzureAccountConfig.class); Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount()); Assert.assertEquals(AZURE_ACCOUNT_KEY, azureAccountConfig.getKey()); - Assert.assertEquals(AZURE_CONTAINER, azureAccountConfig.getContainer()); + } + + @Test + public void test_getAzureDataSegmentConfig_expectedConfig() + { + injector = makeInjectorWithProperties(PROPERTIES); + AzureDataSegmentConfig segmentConfig = injector.getInstance(AzureDataSegmentConfig.class); + + Assert.assertEquals(AZURE_CONTAINER, segmentConfig.getContainer()); + Assert.assertEquals(AZURE_PREFIX, segmentConfig.getPrefix()); + } + + @Test + public void test_getAzureInputDataConfig_expectedConfig() + { + injector = makeInjectorWithProperties(PROPERTIES); + AzureInputDataConfig inputDataConfig = injector.getInstance(AzureInputDataConfig.class); + + Assert.assertEquals(AZURE_MAX_LISTING_LENGTH, inputDataConfig.getMaxListingLength()); + } + + @Test + public void test_getBlobClient_expectedClient() + { + injector = makeInjectorWithProperties(PROPERTIES); CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class); StorageCredentials storageCredentials = cloudBlobClient.getCredentials(); @@ -107,11 +135,6 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport public void test_getAzureStorageContainer_expectedClient() { injector = makeInjectorWithProperties(PROPERTIES); - AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class)); - - Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount()); - Assert.assertEquals(AZURE_ACCOUNT_KEY, azureAccountConfig.getKey()); - Assert.assertEquals(AZURE_CONTAINER, azureAccountConfig.getContainer()); CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class); StorageCredentials storageCredentials = cloudBlobClient.getCredentials(); @@ -204,6 +227,7 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport return Guice.createInjector( ImmutableList.of( new DruidGuiceExtensions(), + new JacksonModule(), new Module() { @Override @@ -224,6 +248,8 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport props.put("druid.azure.account", AZURE_ACCOUNT_NAME); props.put("druid.azure.key", AZURE_ACCOUNT_KEY); props.put("druid.azure.container", AZURE_CONTAINER); + props.put("druid.azure.prefix", AZURE_PREFIX); + props.put("druid.azure.maxListingLength", String.valueOf(AZURE_MAX_LISTING_LENGTH)); return props; } } diff --git a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java index 92258eaeb5f..f75370703ce 100644 --- a/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java +++ b/extensions-contrib/azure-extensions/src/test/java/org/apache/druid/storage/azure/AzureUtilsTest.java @@ -19,11 +19,14 @@ package org.apache.druid.storage.azure; +import com.microsoft.azure.storage.StorageException; import org.apache.druid.data.input.azure.AzureInputSource; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; public class AzureUtilsTest { @@ -35,6 +38,20 @@ public class AzureUtilsTest + BLOB_NAME; private static final URI URI_WITH_PATH_WITH_LEADING_SLASH; + private static final URISyntaxException URI_SYNTAX_EXCEPTION = new URISyntaxException("", ""); + private static final StorageException STORAGE_EXCEPTION = new StorageException("", "", null); + private static final IOException IO_EXCEPTION = new IOException(); + private static final RuntimeException RUNTIME_EXCEPTION = new RuntimeException(); + private static final RuntimeException NULL_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION = new RuntimeException("", null); + private static final RuntimeException IO_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION = new RuntimeException( + "", + new IOException() + ); + private static final RuntimeException RUNTIME_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTON = new RuntimeException( + "", + new RuntimeException() + ); + static { try { URI_WITH_PATH_WITH_LEADING_SLASH = new URI(AzureInputSource.SCHEME @@ -67,4 +84,60 @@ public class AzureUtilsTest String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_NAME); Assert.assertEquals(BLOB_NAME, path); } + + @Test + public void test_azureRetry_URISyntaxException_returnsFalse() + { + boolean retry = AzureUtils.AZURE_RETRY.apply(URI_SYNTAX_EXCEPTION); + Assert.assertFalse(retry); + } + + @Test + public void test_azureRetry_StorageException_returnsTrue() + { + boolean retry = AzureUtils.AZURE_RETRY.apply(STORAGE_EXCEPTION); + Assert.assertTrue(retry); + } + + @Test + public void test_azureRetry_IOException_returnsTrue() + { + boolean retry = AzureUtils.AZURE_RETRY.apply(IO_EXCEPTION); + Assert.assertTrue(retry); + } + + @Test + public void test_azureRetry_nullException_returnsFalse() + { + boolean retry = AzureUtils.AZURE_RETRY.apply(null); + Assert.assertFalse(retry); + } + + @Test + public void test_azureRetry_RunTimeException_returnsFalse() + { + boolean retry = AzureUtils.AZURE_RETRY.apply(RUNTIME_EXCEPTION); + Assert.assertFalse(retry); + } + + @Test + public void test_azureRetry_nullExceptionWrappedInRunTimeException_returnsFalse() + { + boolean retry = AzureUtils.AZURE_RETRY.apply(NULL_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION); + Assert.assertFalse(retry); + } + + @Test + public void test_azureRetry_IOExceptionWrappedInRunTimeException_returnsTrue() + { + boolean retry = AzureUtils.AZURE_RETRY.apply(IO_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION); + Assert.assertTrue(retry); + } + + @Test + public void test_azureRetry_RunTimeExceptionWrappedInRunTimeException_returnsFalse() + { + boolean retry = AzureUtils.AZURE_RETRY.apply(RUNTIME_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTON); + Assert.assertFalse(retry); + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 62d04cfa933..8b1e026473d 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleUtils; @@ -40,13 +41,13 @@ import java.util.stream.StreamSupport; public class GoogleCloudStorageInputSource extends CloudObjectInputSource { static final String SCHEME = "gs"; - private static final int MAX_LISTING_LENGTH = 1024; - private final GoogleStorage storage; + private final GoogleInputDataConfig inputDataConfig; @JsonCreator public GoogleCloudStorageInputSource( @JacksonInject GoogleStorage storage, + @JacksonInject GoogleInputDataConfig inputDataConfig, @JsonProperty("uris") @Nullable List uris, @JsonProperty("prefixes") @Nullable List prefixes, @JsonProperty("objects") @Nullable List objects @@ -54,6 +55,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource withSplit(InputSplit split) { - return new GoogleCloudStorageInputSource(storage, null, null, ImmutableList.of(split.get())); + return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, ImmutableList.of(split.get())); } private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject) @@ -84,7 +86,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource storageObjectIterable() { return () -> - GoogleUtils.lazyFetchingStorageObjectsIterator(storage, getPrefixes().iterator(), MAX_LISTING_LENGTH); + GoogleUtils.lazyFetchingStorageObjectsIterator(storage, getPrefixes().iterator(), inputDataConfig.getMaxListingLength()); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleInputDataConfig.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleInputDataConfig.java new file mode 100644 index 00000000000..5c3674e742a --- /dev/null +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleInputDataConfig.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.google; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Min; + +/** + * Stores the configuration for options related to reading + * input data from Google blob store into Druid + */ +public class GoogleInputDataConfig +{ + /** + * The maximum number of input files matching a given prefix to retrieve + * from Google at a time. + */ + @JsonProperty + @Min(1) + private int maxListingLength = 1024; + + public void setMaxListingLength(int maxListingLength) + { + this.maxListingLength = maxListingLength; + } + + public int getMaxListingLength() + { + return maxListingLength; + } +} diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index 17047be0bf7..62b3b628143 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -46,6 +46,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.storage.google.GoogleInputDataConfig; import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.utils.CompressionUtils; @@ -66,8 +67,9 @@ import java.util.stream.Stream; public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTest { - private static final long EXPECTED_MAX_LISTING_LENGTH = 1024L; + private static final int MAX_LISTING_LENGTH = 10; private static final GoogleStorage STORAGE = EasyMock.createMock(GoogleStorage.class); + private static final GoogleInputDataConfig INPUT_DATA_CONFIG = EasyMock.createMock(GoogleInputDataConfig.class); private static final List EXPECTED_URIS = Arrays.asList( URI.create("gs://foo/bar/file.csv"), @@ -99,7 +101,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe { final ObjectMapper mapper = createGoogleObjectMapper(); final GoogleCloudStorageInputSource withUris = - new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); + new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null); final GoogleCloudStorageInputSource serdeWithUris = mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class); Assert.assertEquals(withUris, serdeWithUris); @@ -110,7 +112,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe { final ObjectMapper mapper = createGoogleObjectMapper(); final GoogleCloudStorageInputSource withPrefixes = - new GoogleCloudStorageInputSource(STORAGE, ImmutableList.of(), PREFIXES, null); + new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null); final GoogleCloudStorageInputSource serdeWithPrefixes = mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class); Assert.assertEquals(withPrefixes, serdeWithPrefixes); @@ -123,6 +125,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe final GoogleCloudStorageInputSource withObjects = new GoogleCloudStorageInputSource( STORAGE, + INPUT_DATA_CONFIG, null, null, ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")) @@ -137,7 +140,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe { GoogleCloudStorageInputSource inputSource = - new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); + new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null); Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), @@ -150,12 +153,15 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe public void testWithPrefixesSplit() throws IOException { EasyMock.reset(STORAGE); + EasyMock.reset(INPUT_DATA_CONFIG); addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + EasyMock.expect(INPUT_DATA_CONFIG.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH); EasyMock.replay(STORAGE); + EasyMock.replay(INPUT_DATA_CONFIG); GoogleCloudStorageInputSource inputSource = - new GoogleCloudStorageInputSource(STORAGE, null, PREFIXES, null); + new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null); Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), @@ -169,14 +175,18 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe public void testReader() throws IOException { EasyMock.reset(STORAGE); + EasyMock.reset(INPUT_DATA_CONFIG); addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); addExpectedGetObjectMock(EXPECTED_URIS.get(0)); addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); addExpectedGetObjectMock(EXPECTED_URIS.get(1)); + EasyMock.expect(INPUT_DATA_CONFIG.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH); EasyMock.replay(STORAGE); + EasyMock.replay(INPUT_DATA_CONFIG); GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( STORAGE, + INPUT_DATA_CONFIG, null, PREFIXES, null @@ -208,14 +218,18 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe public void testCompressedReader() throws IOException { EasyMock.reset(STORAGE); + EasyMock.reset(INPUT_DATA_CONFIG); addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0))); addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0)); addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1))); addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1)); + EasyMock.expect(INPUT_DATA_CONFIG.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH); EasyMock.replay(STORAGE); + EasyMock.replay(INPUT_DATA_CONFIG); GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( STORAGE, + INPUT_DATA_CONFIG, null, PREFIXES, null @@ -250,7 +264,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class); EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once(); EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once(); - EasyMock.expect(listRequest.setMaxResults(EXPECTED_MAX_LISTING_LENGTH)).andReturn(listRequest).once(); + EasyMock.expect(listRequest.setMaxResults((long) MAX_LISTING_LENGTH)).andReturn(listRequest).once(); EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())))) .andReturn(listRequest) .once(); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java index 4fa8fc5cdc9..829d56cdc71 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.storage.s3.S3InputDataConfig; import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -42,10 +43,12 @@ import java.util.stream.StreamSupport; public class S3InputSource extends CloudObjectInputSource { private final ServerSideEncryptingAmazonS3 s3Client; + private final S3InputDataConfig inputDataConfig; @JsonCreator public S3InputSource( @JacksonInject ServerSideEncryptingAmazonS3 s3Client, + @JacksonInject S3InputDataConfig inputDataConfig, @JsonProperty("uris") @Nullable List uris, @JsonProperty("prefixes") @Nullable List prefixes, @JsonProperty("objects") @Nullable List objects @@ -53,6 +56,7 @@ public class S3InputSource extends CloudObjectInputSource { super(S3StorageDruidModule.SCHEME, uris, prefixes, objects); this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client"); + this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig"); } @Override @@ -72,7 +76,7 @@ public class S3InputSource extends CloudObjectInputSource @Override public SplittableInputSource withSplit(InputSplit split) { - return new S3InputSource(s3Client, null, null, ImmutableList.of(split.get())); + return new S3InputSource(s3Client, inputDataConfig, null, null, ImmutableList.of(split.get())); } @Override @@ -87,6 +91,6 @@ public class S3InputSource extends CloudObjectInputSource private Iterable getIterableObjectsFromPrefixes() { - return () -> S3Utils.objectSummaryIterator(s3Client, getPrefixes(), MAX_LISTING_LENGTH); + return () -> S3Utils.objectSummaryIterator(s3Client, getPrefixes(), inputDataConfig.getMaxListingLength()); } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfig.java index 687fea664a4..7c6deb55505 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfig.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfig.java @@ -37,8 +37,8 @@ public class S3DataSegmentPusherConfig private boolean disableAcl = false; @JsonProperty - @Min(0) - private int maxListingLength = 1000; + @Min(1) + private int maxListingLength = 1024; // use s3n by default for backward compatibility @JsonProperty private boolean useS3aSchema = false; diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3InputDataConfig.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3InputDataConfig.java new file mode 100644 index 00000000000..ed97c476d52 --- /dev/null +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3InputDataConfig.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import javax.validation.constraints.Min; + +/** + * Stores the configuration for options related to reading + * input data from Amazon S3 into Druid + */ +public class S3InputDataConfig +{ + /** + * The maximum number of input files matching a given prefix to retrieve + * from Amazon S3 at a time. + */ + @JsonProperty + @Min(1) + private int maxListingLength = 1024; + + public void setMaxListingLength(int maxListingLength) + { + this.maxListingLength = maxListingLength; + } + + public int getMaxListingLength() + { + return maxListingLength; + } +} diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index 4cfca195d59..acedc0c6296 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.storage.s3.NoopServerSideEncryption; +import org.apache.druid.storage.s3.S3InputDataConfig; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.apache.druid.testing.InitializedNullHandlingTest; @@ -86,6 +87,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest S3_CLIENT, new NoopServerSideEncryption() ); + private static final S3InputDataConfig INPUT_DATA_CONFIG; + private static final int MAX_LISTING_LENGTH = 10; private static final List EXPECTED_URIS = Arrays.asList( URI.create("s3://foo/bar/file.csv"), @@ -112,6 +115,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest private static final byte[] CONTENT = StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); + static { + INPUT_DATA_CONFIG = new S3InputDataConfig(); + INPUT_DATA_CONFIG.setMaxListingLength(MAX_LISTING_LENGTH); + } + @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -121,7 +129,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest @Test public void testSerdeWithUris() throws Exception { - final S3InputSource withUris = new S3InputSource(SERVICE, EXPECTED_URIS, null, null); + final S3InputSource withUris = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, EXPECTED_URIS, null, null); final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class); Assert.assertEquals(withUris, serdeWithUris); } @@ -129,7 +137,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest @Test public void testSerdeWithPrefixes() throws Exception { - final S3InputSource withPrefixes = new S3InputSource(SERVICE, null, PREFIXES, null); + final S3InputSource withPrefixes = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null); final S3InputSource serdeWithPrefixes = MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); Assert.assertEquals(withPrefixes, serdeWithPrefixes); @@ -141,6 +149,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest final S3InputSource withPrefixes = new S3InputSource( SERVICE, + INPUT_DATA_CONFIG, null, null, EXPECTED_LOCATION @@ -155,6 +164,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest { final S3InputSource withPrefixes = new S3InputSource( SERVICE, + INPUT_DATA_CONFIG, ImmutableList.of(), ImmutableList.of(), EXPECTED_LOCATION @@ -171,6 +181,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest // constructor will explode new S3InputSource( SERVICE, + INPUT_DATA_CONFIG, EXPECTED_URIS, PREFIXES, EXPECTED_LOCATION @@ -184,6 +195,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest // constructor will explode new S3InputSource( SERVICE, + INPUT_DATA_CONFIG, EXPECTED_URIS, PREFIXES, ImmutableList.of() @@ -197,6 +209,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest // constructor will explode new S3InputSource( SERVICE, + INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, EXPECTED_LOCATION @@ -206,7 +219,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest @Test public void testWithUrisSplit() { - S3InputSource inputSource = new S3InputSource(SERVICE, EXPECTED_URIS, null, null); + S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, EXPECTED_URIS, null, null); Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), @@ -224,7 +237,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); EasyMock.replay(S3_CLIENT); - S3InputSource inputSource = new S3InputSource(SERVICE, null, PREFIXES, null); + S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null); Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), @@ -245,6 +258,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest S3InputSource inputSource = new S3InputSource( SERVICE, + INPUT_DATA_CONFIG, null, ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), null @@ -273,6 +287,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest S3InputSource inputSource = new S3InputSource( SERVICE, + INPUT_DATA_CONFIG, null, ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), null @@ -314,6 +329,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest S3InputSource inputSource = new S3InputSource( SERVICE, + INPUT_DATA_CONFIG, null, ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)), null diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfigTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfigTest.java index af898811aed..c35733618a7 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfigTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPusherConfigTest.java @@ -50,7 +50,7 @@ public class S3DataSegmentPusherConfigTest { String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"}"; String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," - + "\"disableAcl\":false,\"maxListingLength\":1000,\"useS3aSchema\":false}"; + + "\"disableAcl\":false,\"maxListingLength\":1024,\"useS3aSchema\":false}"; S3DataSegmentPusherConfig config = JSON_MAPPER.readValue(jsonConfig, S3DataSegmentPusherConfig.class); Assert.assertEquals(expectedJsonConfig, JSON_MAPPER.writeValueAsString(config)); @@ -67,6 +67,6 @@ public class S3DataSegmentPusherConfigTest Set> violations = validator.validate(config); Assert.assertEquals(1, violations.size()); ConstraintViolation violation = Iterators.getOnlyElement(violations.iterator()); - Assert.assertEquals("must be greater than or equal to 0", violation.getMessage()); + Assert.assertEquals("must be greater than or equal to 1", violation.getMessage()); } }