Add Azure config options for segment prefix and max listing length (#9356)

* Add Azure config options for segment prefix and max listing length

Added configuration options to allow the user to specify the prefix
within the segment container to store the segment files. Also
added a configuration option to allow the user to specify the
maximum number of input files to stream for each iteration.

* * Fix test failures

* * Address review comments

* * add dependency explicitly to pom

* * update docs

* * Address review comments

* * Address review comments
This commit is contained in:
zachjsh 2020-02-21 14:12:03 -08:00 committed by GitHub
parent 141d8dd875
commit f707064bed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 677 additions and 198 deletions

View File

@ -40,7 +40,6 @@ import java.util.stream.Stream;
public abstract class CloudObjectInputSource<T extends InputEntity> extends AbstractInputSource public abstract class CloudObjectInputSource<T extends InputEntity> extends AbstractInputSource
implements SplittableInputSource<CloudObjectLocation> implements SplittableInputSource<CloudObjectLocation>
{ {
protected static final int MAX_LISTING_LENGTH = 1024;
private final List<URI> uris; private final List<URI> uris;
private final List<URI> prefixes; private final List<URI> prefixes;
private final List<CloudObjectLocation> objects; private final List<CloudObjectLocation> objects;

View File

@ -29,68 +29,15 @@ To use this Apache Druid extension, make sure to [include](../../development/ext
[Microsoft Azure Storage](http://azure.microsoft.com/en-us/services/storage/) is another option for deep storage. This requires some additional Druid configuration. [Microsoft Azure Storage](http://azure.microsoft.com/en-us/services/storage/) is another option for deep storage. This requires some additional Druid configuration.
|Property|Possible Values|Description|Default| |Property|Description|Possible Values|Default|
|--------|---------------|-----------|-------| |--------|---------------|-----------|-------|
|`druid.storage.type`|azure||Must be set.| |`druid.storage.type`|azure||Must be set.|
|`druid.azure.account`||Azure Storage account name.|Must be set.| |`druid.azure.account`||Azure Storage account name.|Must be set.|
|`druid.azure.key`||Azure Storage account key.|Must be set.| |`druid.azure.key`||Azure Storage account key.|Must be set.|
|`druid.azure.container`||Azure Storage container name.|Must be set.| |`druid.azure.container`||Azure Storage container name.|Must be set.|
|`druid.azure.protocol`|http or https||https| |`druid.azure.prefix`|A prefix string that will be prepended to the blob names for the segments published to Azure deep storage| |""|
|`druid.azure.maxTries`||Number of tries before cancel an Azure operation.|3| |`druid.azure.protocol`|the protocol to use|http or https|https|
|`druid.azure.maxTries`|Number of tries before canceling an Azure operation.| |3|
|`druid.azure.maxListingLength`|maximum number of input files matching a given prefix to retrieve at a time| |1024|
See [Azure Services](http://azure.microsoft.com/en-us/pricing/free-trial/) for more information. See [Azure Services](http://azure.microsoft.com/en-us/pricing/free-trial/) for more information.
## Firehose
<a name="firehose"></a>
#### StaticAzureBlobStoreFirehose
This firehose ingests events, similar to the StaticS3Firehose, but from an Azure Blob Store.
Data is newline delimited, with one JSON object per line and parsed as per the `InputRowParser` configuration.
The storage account is shared with the one used for Azure deep storage functionality, but blobs can be in a different container.
As with the S3 blobstore, it is assumed to be gzipped if the extension ends in .gz
This firehose is _splittable_ and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task).
Since each split represents an object in this firehose, each worker task of `index_parallel` will read an object.
Sample spec:
```json
"firehose" : {
"type" : "static-azure-blobstore",
"blobs": [
{
"container": "container",
"path": "/path/to/your/file.json"
},
{
"container": "anothercontainer",
"path": "/another/path.json"
}
]
}
```
This firehose provides caching and prefetching features. In IndexTask, a firehose can be read twice if intervals or
shardSpecs are not specified, and, in this case, caching can be useful. Prefetching is preferred when direct scan of objects is slow.
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `static-azure-blobstore`.|N/A|yes|
|blobs|JSON array of [Azure blobs](https://msdn.microsoft.com/en-us/library/azure/ee691964.aspx).|N/A|yes|
|maxCacheCapacityBytes|Maximum size of the cache space in bytes. 0 means disabling cache. Cached files are not removed until the ingestion task completes.|1073741824|no|
|maxFetchCapacityBytes|Maximum size of the fetch space in bytes. 0 means disabling prefetch. Prefetched files are removed immediately once they are read.|1073741824|no|
|prefetchTriggerBytes|Threshold to trigger prefetching Azure objects.|maxFetchCapacityBytes / 2|no|
|fetchTimeout|Timeout for fetching an Azure object.|60000|no|
|maxFetchRetry|Maximum retry for fetching an Azure object.|3|no|
Azure Blobs:
|property|description|default|required?|
|--------|-----------|-------|---------|
|container|Name of the azure [container](https://azure.microsoft.com/en-us/documentation/articles/storage-dotnet-how-to-use-blobs/#create-a-container)|N/A|yes|
|path|The path where data is located.|N/A|yes|

View File

@ -50,8 +50,9 @@ Deep storage can be written to Google Cloud Storage either via this extension or
To configure connectivity to google cloud, run druid processes with `GOOGLE_APPLICATION_CREDENTIALS=/path/to/service_account_keyfile` in the environment. To configure connectivity to google cloud, run druid processes with `GOOGLE_APPLICATION_CREDENTIALS=/path/to/service_account_keyfile` in the environment.
|Property|Possible Values|Description|Default| |Property|Description|Possible Values|Default|
|--------|---------------|-----------|-------| |--------|---------------|-----------|-------|
|`druid.storage.type`|google||Must be set.| |`druid.storage.type`|google||Must be set.|
|`druid.google.bucket`||GCS bucket name.|Must be set.| |`druid.google.bucket`||Google Storage bucket name.|Must be set.|
|`druid.google.prefix`||GCS prefix.|No-prefix| |`druid.google.prefix`|A prefix string that will be prepended to the blob names for the segments published to Google deep storage| |""|
|`druid.google.maxListingLength`|maximum number of input files matching a given prefix to retrieve at a time| |1024|

View File

@ -52,7 +52,7 @@ In addition to this you need to set additional configuration, specific for [deep
|Property|Description|Default| |Property|Description|Default|
|--------|-----------|-------| |--------|-----------|-------|
|`druid.storage.bucket`|Bucket to store in.|Must be set.| |`druid.storage.bucket`|Bucket to store in.|Must be set.|
|`druid.storage.baseKey`|Base key prefix to use, i.e. what directory.|Must be set.| |`druid.storage.baseKey`|A prefix string that will be prepended to the object names for the segments published to S3 deep storage|Must be set.|
|`druid.storage.type`|Global deep storage provider. Must be set to `s3` to make use of this extension.|Must be set (likely `s3`).| |`druid.storage.type`|Global deep storage provider. Must be set to `s3` to make use of this extension.|Must be set (likely `s3`).|
|`druid.storage.archiveBucket`|S3 bucket name for archiving when running the *archive task*.|none| |`druid.storage.archiveBucket`|S3 bucket name for archiving when running the *archive task*.|none|
|`druid.storage.archiveBaseKey`|S3 object key prefix for archiving.|none| |`druid.storage.archiveBaseKey`|S3 object key prefix for archiving.|none|

View File

@ -57,6 +57,7 @@ The supported splittable input formats for now are:
- [`s3`](#s3-input-source) reads data from AWS S3 storage. - [`s3`](#s3-input-source) reads data from AWS S3 storage.
- [`gs`](#google-cloud-storage-input-source) reads data from Google Cloud Storage. - [`gs`](#google-cloud-storage-input-source) reads data from Google Cloud Storage.
- [`azure`](#azure-input-source) reads data from Azure Blob Storage.
- [`hdfs`](#hdfs-input-source) reads data from HDFS storage. - [`hdfs`](#hdfs-input-source) reads data from HDFS storage.
- [`http`](#http-input-source) reads data from HTTP servers. - [`http`](#http-input-source) reads data from HTTP servers.
- [`local`](#local-input-source) reads data from local storage. - [`local`](#local-input-source) reads data from local storage.
@ -66,7 +67,6 @@ Some other cloud storage types are supported with the legacy [`firehose`](#fireh
The below `firehose` types are also splittable. Note that only text formats are supported The below `firehose` types are also splittable. Note that only text formats are supported
with the `firehose`. with the `firehose`.
- [`static-azure-blobstore`](../development/extensions-contrib/azure.md#firehose)
- [`static-cloudfiles`](../development/extensions-contrib/cloudfiles.md#firehose) - [`static-cloudfiles`](../development/extensions-contrib/cloudfiles.md#firehose)
The splittable `inputSource` (and `firehose`) types are responsible for generating _splits_. The splittable `inputSource` (and `firehose`) types are responsible for generating _splits_.
@ -923,6 +923,83 @@ Google Cloud Storage object:
|bucket|Name of the Google Cloud Storage bucket|None|yes| |bucket|Name of the Google Cloud Storage bucket|None|yes|
|path|The path where data is located.|None|yes| |path|The path where data is located.|None|yes|
### Azure Input Source
> You need to include the [`druid-azure-extensions`](../development/extensions-contrib/azure.md) as an extension to use the Azure input source.
The Azure input source is to support reading objects directly from Azure Blob store. Objects can be
specified as list of Azure Blob store URI strings. The Azure input source is splittable and can be used
by the [Parallel task](#parallel-task), where each worker task of `index_parallel` will read
a single object.
Sample specs:
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "azure",
"uris": ["azure://container/prefix1/file.json", "azure://container/prefix2/file2.json"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "azure",
"prefixes": ["azure://container/prefix1", "azure://container/prefix2"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "azure",
"objects": [
{ "bucket": "container", "path": "prefix1/file1.json"},
{ "bucket": "container", "path": "prefix2/file2.json"}
]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `google`.|None|yes|
|uris|JSON array of URIs where Azure Blob objects to be ingested are located. Should be in form "azure://\<container>/\<path-to-file\>"|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Azure Blob objects to be ingested. Should be in the form "azure://\<container>/\<prefix\>"|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Azure Blob objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
Azure Blob object:
|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the Azure Blob Storage container|None|yes|
|path|The path where data is located.|None|yes|
### HDFS Input Source ### HDFS Input Source
> You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFS input source. > You need to include the [`druid-hdfs-storage`](../development/extensions-core/hdfs.md) as an extension to use the HDFS input source.

View File

@ -126,6 +126,11 @@
<artifactId>jsr305</artifactId> <artifactId>jsr305</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests --> <!-- Tests -->
<dependency> <dependency>
@ -175,7 +180,7 @@
<limit> <limit>
<counter>INSTRUCTION</counter> <counter>INSTRUCTION</counter>
<value>COVEREDRATIO</value> <value>COVEREDRATIO</value>
<minimum>0.84</minimum> <minimum>0.85</minimum>
</limit> </limit>
<limit> <limit>
<counter>LINE</counter> <counter>LINE</counter>
@ -185,17 +190,17 @@
<limit> <limit>
<counter>BRANCH</counter> <counter>BRANCH</counter>
<value>COVEREDRATIO</value> <value>COVEREDRATIO</value>
<minimum>0.86</minimum> <minimum>0.87</minimum>
</limit> </limit>
<limit> <limit>
<counter>COMPLEXITY</counter> <counter>COMPLEXITY</counter>
<value>COVEREDRATIO</value> <value>COVEREDRATIO</value>
<minimum>0.80</minimum> <minimum>0.79</minimum>
</limit> </limit>
<limit> <limit>
<counter>METHOD</counter> <counter>METHOD</counter>
<value>COVEREDRATIO</value> <value>COVEREDRATIO</value>
<minimum>0.79</minimum> <minimum>0.78</minimum>
</limit> </limit>
<limit> <limit>
<counter>CLASS</counter> <counter>CLASS</counter>

View File

@ -22,7 +22,6 @@ package org.apache.druid.data.input.azure;
import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.InputSplit;
@ -31,6 +30,7 @@ import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter; import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory; import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage; import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.blob.CloudBlobHolder; import org.apache.druid.storage.azure.blob.CloudBlobHolder;
@ -47,14 +47,13 @@ import java.util.stream.StreamSupport;
*/ */
public class AzureInputSource extends CloudObjectInputSource<AzureEntity> public class AzureInputSource extends CloudObjectInputSource<AzureEntity>
{ {
@VisibleForTesting
static final int MAX_LISTING_LENGTH = 1024;
public static final String SCHEME = "azure"; public static final String SCHEME = "azure";
private final AzureStorage storage; private final AzureStorage storage;
private final AzureEntityFactory entityFactory; private final AzureEntityFactory entityFactory;
private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; private final AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private final AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter; private final AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter;
private final AzureInputDataConfig inputDataConfig;
@JsonCreator @JsonCreator
public AzureInputSource( public AzureInputSource(
@ -62,6 +61,7 @@ public class AzureInputSource extends CloudObjectInputSource<AzureEntity>
@JacksonInject AzureEntityFactory entityFactory, @JacksonInject AzureEntityFactory entityFactory,
@JacksonInject AzureCloudBlobIterableFactory azureCloudBlobIterableFactory, @JacksonInject AzureCloudBlobIterableFactory azureCloudBlobIterableFactory,
@JacksonInject AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter, @JacksonInject AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter,
@JacksonInject AzureInputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris, @JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes, @JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
@ -74,6 +74,7 @@ public class AzureInputSource extends CloudObjectInputSource<AzureEntity>
azureCloudBlobIterableFactory, azureCloudBlobIterableFactory,
"AzureCloudBlobIterableFactory" "AzureCloudBlobIterableFactory"
); );
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "AzureInputDataConfig");
this.azureCloudBlobToLocationConverter = Preconditions.checkNotNull(azureCloudBlobToLocationConverter, "AzureCloudBlobToLocationConverter"); this.azureCloudBlobToLocationConverter = Preconditions.checkNotNull(azureCloudBlobToLocationConverter, "AzureCloudBlobToLocationConverter");
} }
@ -85,6 +86,7 @@ public class AzureInputSource extends CloudObjectInputSource<AzureEntity>
entityFactory, entityFactory,
azureCloudBlobIterableFactory, azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter, azureCloudBlobToLocationConverter,
inputDataConfig,
null, null,
null, null,
ImmutableList.of(split.get()) ImmutableList.of(split.get())
@ -117,7 +119,7 @@ public class AzureInputSource extends CloudObjectInputSource<AzureEntity>
private Iterable<CloudBlobHolder> getIterableObjectsFromPrefixes() private Iterable<CloudBlobHolder> getIterableObjectsFromPrefixes()
{ {
return azureCloudBlobIterableFactory.create(getPrefixes(), MAX_LISTING_LENGTH); return azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength());
} }
@Override @Override
@ -128,7 +130,8 @@ public class AzureInputSource extends CloudObjectInputSource<AzureEntity>
storage, storage,
entityFactory, entityFactory,
azureCloudBlobIterableFactory, azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter azureCloudBlobToLocationConverter,
inputDataConfig
); );
} }
@ -148,6 +151,7 @@ public class AzureInputSource extends CloudObjectInputSource<AzureEntity>
return storage.equals(that.storage) && return storage.equals(that.storage) &&
entityFactory.equals(that.entityFactory) && entityFactory.equals(that.entityFactory) &&
azureCloudBlobIterableFactory.equals(that.azureCloudBlobIterableFactory) && azureCloudBlobIterableFactory.equals(that.azureCloudBlobIterableFactory) &&
azureCloudBlobToLocationConverter.equals(that.azureCloudBlobToLocationConverter); azureCloudBlobToLocationConverter.equals(that.azureCloudBlobToLocationConverter) &&
inputDataConfig.equals(that.inputDataConfig);
} }
} }

View File

@ -44,10 +44,6 @@ public class AzureAccountConfig
@NotNull @NotNull
private String key; private String key;
@JsonProperty
@NotNull
private String container;
@SuppressWarnings("unused") // Used by Jackson deserialization? @SuppressWarnings("unused") // Used by Jackson deserialization?
public void setProtocol(String protocol) public void setProtocol(String protocol)
{ {
@ -71,11 +67,6 @@ public class AzureAccountConfig
this.key = key; this.key = key;
} }
public void setContainer(String container)
{
this.container = container;
}
public String getProtocol() public String getProtocol()
{ {
return protocol; return protocol;
@ -95,9 +86,4 @@ public class AzureAccountConfig
{ {
return key; return key;
} }
public String getContainer()
{
return container;
}
} }

View File

@ -63,12 +63,12 @@ public class AzureByteSource extends ByteSource
return azureStorage.getBlobInputStream(offset, containerName, blobPath); return azureStorage.getBlobInputStream(offset, containerName, blobPath);
} }
catch (StorageException | URISyntaxException e) { catch (StorageException | URISyntaxException e) {
log.warn("Exception when opening stream to azure resource, containerName: %s, blobPath: %s, Error: %s",
containerName, blobPath, e.getMessage()
);
if (AzureUtils.AZURE_RETRY.apply(e)) { if (AzureUtils.AZURE_RETRY.apply(e)) {
throw new IOException("Recoverable exception", e); throw new IOException("Recoverable exception", e);
} }
log.error("Exception when opening stream to azure resource, containerName: %s, blobPath: %s, Error: %s",
containerName, blobPath, e.getMessage()
);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }

View File

@ -105,7 +105,7 @@ public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
URI currentUri = prefixesIterator.next(); URI currentUri = prefixesIterator.next();
currentContainer = currentUri.getAuthority(); currentContainer = currentUri.getAuthority();
currentPrefix = AzureUtils.extractAzureKey(currentUri); currentPrefix = AzureUtils.extractAzureKey(currentUri);
log.debug("prepareNextRequest:\ncurrentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s", log.debug("currentUri: %s\ncurrentContainer: %s\ncurrentPrefix: %s",
currentUri, currentContainer, currentPrefix currentUri, currentContainer, currentPrefix
); );
result = null; result = null;
@ -115,6 +115,12 @@ public class AzureCloudBlobIterator implements Iterator<CloudBlobHolder>
private void fetchNextBatch() private void fetchNextBatch()
{ {
try { try {
log.debug(
"fetching up to %s resources in container '%s' with prefix '%s'",
maxListingLength,
currentContainer,
currentPrefix
);
result = AzureUtils.retryAzureOperation(() -> storage.listBlobsWithPrefixInContainerSegmented( result = AzureUtils.retryAzureOperation(() -> storage.listBlobsWithPrefixInContainerSegmented(
currentContainer, currentContainer,
currentPrefix, currentPrefix,

View File

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.azure;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.NotNull;
/**
* Stores the configuration for segments written to Azure deep storage
*/
public class AzureDataSegmentConfig
{
@JsonProperty
@NotNull
private String container;
@JsonProperty
@NotNull
private String prefix = "";
public void setContainer(String container)
{
this.container = container;
}
public void setPrefix(String prefix)
{
this.prefix = prefix;
}
public String getContainer()
{
return container;
}
public String getPrefix()
{
return prefix;
}
}

View File

@ -36,13 +36,6 @@ public class AzureDataSegmentPuller
{ {
private static final Logger log = new Logger(AzureDataSegmentPuller.class); private static final Logger log = new Logger(AzureDataSegmentPuller.class);
// The azure storage hadoop access pattern is:
// wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path>
// (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage)
static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs";
static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net";
private final AzureByteSourceFactory byteSourceFactory; private final AzureByteSourceFactory byteSourceFactory;
@Inject @Inject

View File

@ -47,16 +47,19 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
private static final Logger log = new Logger(AzureDataSegmentPusher.class); private static final Logger log = new Logger(AzureDataSegmentPusher.class);
static final List<String> ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP = ImmutableList.of("druid.azure"); static final List<String> ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP = ImmutableList.of("druid.azure");
private final AzureStorage azureStorage; private final AzureStorage azureStorage;
private final AzureAccountConfig config; private final AzureAccountConfig accountConfig;
private final AzureDataSegmentConfig segmentConfig;
@Inject @Inject
public AzureDataSegmentPusher( public AzureDataSegmentPusher(
AzureStorage azureStorage, AzureStorage azureStorage,
AzureAccountConfig config AzureAccountConfig accountConfig,
AzureDataSegmentConfig segmentConfig
) )
{ {
this.azureStorage = azureStorage; this.azureStorage = azureStorage;
this.config = config; this.accountConfig = accountConfig;
this.segmentConfig = segmentConfig;
} }
@Deprecated @Deprecated
@ -69,12 +72,15 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
@Override @Override
public String getPathForHadoop() public String getPathForHadoop()
{ {
String prefix = segmentConfig.getPrefix();
boolean prefixIsNullOrEmpty = org.apache.commons.lang.StringUtils.isEmpty(prefix);
String hadoopPath = StringUtils.format( String hadoopPath = StringUtils.format(
"%s://%s@%s.%s/", "%s://%s@%s.%s/%s",
AzureDataSegmentPuller.AZURE_STORAGE_HADOOP_PROTOCOL, AzureUtils.AZURE_STORAGE_HADOOP_PROTOCOL,
config.getContainer(), segmentConfig.getContainer(),
config.getAccount(), accountConfig.getAccount(),
AzureDataSegmentPuller.AZURE_STORAGE_HOST_ADDRESS AzureUtils.AZURE_STORAGE_HOST_ADDRESS,
prefixIsNullOrEmpty ? "" : StringUtils.maybeRemoveTrailingSlash(prefix) + '/'
); );
log.info("Using Azure blob storage Hadoop path: %s", hadoopPath); log.info("Using Azure blob storage Hadoop path: %s", hadoopPath);
@ -85,7 +91,10 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
@Override @Override
public String getStorageDir(DataSegment dataSegment, boolean useUniquePath) public String getStorageDir(DataSegment dataSegment, boolean useUniquePath)
{ {
String prefix = segmentConfig.getPrefix();
boolean prefixIsNullOrEmpty = org.apache.commons.lang.StringUtils.isEmpty(prefix);
String seg = JOINER.join( String seg = JOINER.join(
prefixIsNullOrEmpty ? null : StringUtils.maybeRemoveTrailingSlash(prefix),
dataSegment.getDataSource(), dataSegment.getDataSource(),
StringUtils.format( StringUtils.format(
"%s_%s", "%s_%s",
@ -127,7 +136,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
return AzureUtils.retryAzureOperation( return AzureUtils.retryAzureOperation(
() -> uploadDataSegment(segment, binaryVersion, size, outFile, azurePath), () -> uploadDataSegment(segment, binaryVersion, size, outFile, azurePath),
config.getMaxTries() accountConfig.getMaxTries()
); );
} }
catch (Exception e) { catch (Exception e) {
@ -148,7 +157,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
"type", "type",
AzureStorageDruidModule.SCHEME, AzureStorageDruidModule.SCHEME,
"containerName", "containerName",
config.getContainer(), segmentConfig.getContainer(),
"blobPath", "blobPath",
uri.toString() uri.toString()
); );
@ -173,7 +182,7 @@ public class AzureDataSegmentPusher implements DataSegmentPusher
) )
throws StorageException, IOException, URISyntaxException throws StorageException, IOException, URISyntaxException
{ {
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePath); azureStorage.uploadBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath);
final DataSegment outSegment = segment final DataSegment outSegment = segment
.withSize(size) .withSize(size)

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.azure;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
/**
* Stores the configuration for options related to reading
* input data from Azure blob store into Druid
*/
public class AzureInputDataConfig
{
/**
* The maximum number of input files matching a given prefix to retrieve
* from Azure at a time.
*/
@JsonProperty
@Min(1)
private int maxListingLength = 1024;
public void setMaxListingLength(int maxListingLength)
{
this.maxListingLength = maxListingLength;
}
public int getMaxListingLength()
{
return maxListingLength;
}
}

View File

@ -46,7 +46,7 @@ public class AzureStorage
{ {
private static final boolean USE_FLAT_BLOB_LISTING = true; private static final boolean USE_FLAT_BLOB_LISTING = true;
private final Logger log = new Logger(AzureStorage.class); private static final Logger log = new Logger(AzureStorage.class);
private final CloudBlobClient cloudBlobClient; private final CloudBlobClient cloudBlobClient;

View File

@ -87,6 +87,8 @@ public class AzureStorageDruidModule implements DruidModule
@Override @Override
public void configure(Binder binder) public void configure(Binder binder)
{ {
JsonConfigProvider.bind(binder, "druid.azure", AzureInputDataConfig.class);
JsonConfigProvider.bind(binder, "druid.azure", AzureDataSegmentConfig.class);
JsonConfigProvider.bind(binder, "druid.azure", AzureAccountConfig.class); JsonConfigProvider.bind(binder, "druid.azure", AzureAccountConfig.class);
Binders.dataSegmentPusherBinder(binder) Binders.dataSegmentPusherBinder(binder)

View File

@ -39,24 +39,34 @@ public class AzureUtils
@VisibleForTesting @VisibleForTesting
static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net"; static final String AZURE_STORAGE_HOST_ADDRESS = "blob.core.windows.net";
// The azure storage hadoop access pattern is:
// wasb[s]://<containername>@<accountname>.blob.core.windows.net/<path>
// (from https://docs.microsoft.com/en-us/azure/hdinsight/hdinsight-hadoop-use-blob-storage)
static final String AZURE_STORAGE_HADOOP_PROTOCOL = "wasbs";
public static final Predicate<Throwable> AZURE_RETRY = e -> { public static final Predicate<Throwable> AZURE_RETRY = e -> {
if (e instanceof URISyntaxException) { if (e == null) {
return false;
}
for (Throwable t = e; t != null; t = t.getCause()) {
if (t instanceof URISyntaxException) {
return false; return false;
} }
if (e instanceof StorageException) { if (t instanceof StorageException) {
return true; return true;
} }
if (e instanceof IOException) { if (t instanceof IOException) {
return true; return true;
} }
}
return false; return false;
}; };
/** /**
* extracts the path component of the supplied uri with any leading '/' characters removed. * extracts the path component of the supplied uri with any leading '/' characters removed.
*
* @param uri the uri to extract the path for * @param uri the uri to extract the path for
* @return a String representing the path component of the uri with any leading '/' * @return a String representing the path component of the uri with any leading '/'
* characters removed. * characters removed.
@ -68,6 +78,7 @@ public class AzureUtils
/** /**
* extracts the blob path component of the supplied uri with any leading 'blob.core.windows.net/' string removed. * extracts the blob path component of the supplied uri with any leading 'blob.core.windows.net/' string removed.
*
* @param blobPath the path of the blob * @param blobPath the path of the blob
* @return a String representing the blob path component of the uri with any leading 'blob.core.windows.net/' string * @return a String representing the blob path component of the uri with any leading 'blob.core.windows.net/' string
* removed characters removed. * removed characters removed.

View File

@ -26,6 +26,8 @@ import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter; import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory; import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureDataSegmentConfig;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage; import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.AzureStorageDruidModule; import org.apache.druid.storage.azure.AzureStorageDruidModule;
import org.easymock.EasyMockSupport; import org.easymock.EasyMockSupport;
@ -64,6 +66,7 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
private AzureEntityFactory entityFactory; private AzureEntityFactory entityFactory;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter; private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter;
private AzureInputDataConfig inputDataConfig;
static { static {
try { try {
@ -86,20 +89,14 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
entityFactory = createMock(AzureEntityFactory.class); entityFactory = createMock(AzureEntityFactory.class);
azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class); azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class); azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class);
inputDataConfig = createMock(AzureInputDataConfig.class);
} }
@Test @Test
public void test_uriSerde_constructsProperAzureInputSource() throws Exception public void test_uriSerde_constructsProperAzureInputSource() throws Exception
{ {
final InjectableValues.Std injectableValues = new InjectableValues.Std(); final InjectableValues.Std injectableValues = initInjectableValues();
injectableValues.addValue(AzureStorage.class, azureStorage);
injectableValues.addValue(AzureEntityFactory.class, entityFactory);
injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory);
injectableValues.addValue(
AzureCloudBlobHolderToCloudObjectLocationConverter.class,
azureCloudBlobToLocationConverter
);
final ObjectMapper objectMapper = new DefaultObjectMapper() final ObjectMapper objectMapper = new DefaultObjectMapper()
.registerModules(new AzureStorageDruidModule().getJacksonModules()); .registerModules(new AzureStorageDruidModule().getJacksonModules());
objectMapper.setInjectableValues(injectableValues); objectMapper.setInjectableValues(injectableValues);
@ -117,14 +114,8 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
@Test @Test
public void test_prefixSerde_constructsProperAzureInputSource() throws Exception public void test_prefixSerde_constructsProperAzureInputSource() throws Exception
{ {
final InjectableValues.Std injectableValues = new InjectableValues.Std(); final InjectableValues.Std injectableValues = initInjectableValues();
injectableValues.addValue(AzureStorage.class, azureStorage); injectableValues.addValue(AzureDataSegmentConfig.class, inputDataConfig);
injectableValues.addValue(AzureEntityFactory.class, entityFactory);
injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory);
injectableValues.addValue(
AzureCloudBlobHolderToCloudObjectLocationConverter.class,
azureCloudBlobToLocationConverter
);
final ObjectMapper objectMapper = new DefaultObjectMapper() final ObjectMapper objectMapper = new DefaultObjectMapper()
.registerModules(new AzureStorageDruidModule().getJacksonModules()); .registerModules(new AzureStorageDruidModule().getJacksonModules());
objectMapper.setInjectableValues(injectableValues); objectMapper.setInjectableValues(injectableValues);
@ -142,14 +133,7 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
@Test @Test
public void test_objectSerde_constructsProperAzureInputSource() throws Exception public void test_objectSerde_constructsProperAzureInputSource() throws Exception
{ {
final InjectableValues.Std injectableValues = new InjectableValues.Std(); final InjectableValues.Std injectableValues = initInjectableValues();
injectableValues.addValue(AzureStorage.class, azureStorage);
injectableValues.addValue(AzureEntityFactory.class, entityFactory);
injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory);
injectableValues.addValue(
AzureCloudBlobHolderToCloudObjectLocationConverter.class,
azureCloudBlobToLocationConverter
);
final ObjectMapper objectMapper = new DefaultObjectMapper() final ObjectMapper objectMapper = new DefaultObjectMapper()
.registerModules(new AzureStorageDruidModule().getJacksonModules()); .registerModules(new AzureStorageDruidModule().getJacksonModules());
objectMapper.setInjectableValues(injectableValues); objectMapper.setInjectableValues(injectableValues);
@ -161,7 +145,20 @@ public class AzureInputSourceSerdeTest extends EasyMockSupport
objectMapper.writeValueAsBytes(inputSource), objectMapper.writeValueAsBytes(inputSource),
AzureInputSource.class); AzureInputSource.class);
verifyInputSourceWithObjects(roundTripInputSource); verifyInputSourceWithObjects(roundTripInputSource);
}
private InjectableValues.Std initInjectableValues()
{
final InjectableValues.Std injectableValues = new InjectableValues.Std();
injectableValues.addValue(AzureStorage.class, azureStorage);
injectableValues.addValue(AzureEntityFactory.class, entityFactory);
injectableValues.addValue(AzureCloudBlobIterableFactory.class, azureCloudBlobIterableFactory);
injectableValues.addValue(
AzureCloudBlobHolderToCloudObjectLocationConverter.class,
azureCloudBlobToLocationConverter
);
injectableValues.addValue(AzureInputDataConfig.class, inputDataConfig);
return injectableValues;
} }
private static void verifyInputSourceWithUris(final AzureInputSource inputSource) private static void verifyInputSourceWithUris(final AzureInputSource inputSource)

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter; import org.apache.druid.storage.azure.AzureCloudBlobHolderToCloudObjectLocationConverter;
import org.apache.druid.storage.azure.AzureCloudBlobIterable; import org.apache.druid.storage.azure.AzureCloudBlobIterable;
import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory; import org.apache.druid.storage.azure.AzureCloudBlobIterableFactory;
import org.apache.druid.storage.azure.AzureInputDataConfig;
import org.apache.druid.storage.azure.AzureStorage; import org.apache.druid.storage.azure.AzureStorage;
import org.apache.druid.storage.azure.blob.CloudBlobHolder; import org.apache.druid.storage.azure.blob.CloudBlobHolder;
import org.easymock.EasyMock; import org.easymock.EasyMock;
@ -55,11 +56,13 @@ public class AzureInputSourceTest extends EasyMockSupport
private static final String CONTAINER = "CONTAINER"; private static final String CONTAINER = "CONTAINER";
private static final String BLOB_PATH = "BLOB_PATH"; private static final String BLOB_PATH = "BLOB_PATH";
private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new CloudObjectLocation(CONTAINER, BLOB_PATH); private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new CloudObjectLocation(CONTAINER, BLOB_PATH);
private static final int MAX_LISTING_LENGTH = 10;
private AzureStorage storage; private AzureStorage storage;
private AzureEntityFactory entityFactory; private AzureEntityFactory entityFactory;
private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory; private AzureCloudBlobIterableFactory azureCloudBlobIterableFactory;
private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter; private AzureCloudBlobHolderToCloudObjectLocationConverter azureCloudBlobToLocationConverter;
private AzureInputDataConfig inputDataConfig;
private InputSplit<CloudObjectLocation> inputSplit; private InputSplit<CloudObjectLocation> inputSplit;
private AzureEntity azureEntity1; private AzureEntity azureEntity1;
@ -86,6 +89,7 @@ public class AzureInputSourceTest extends EasyMockSupport
azureEntity1 = createMock(AzureEntity.class); azureEntity1 = createMock(AzureEntity.class);
azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class); azureCloudBlobIterableFactory = createMock(AzureCloudBlobIterableFactory.class);
azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class); azureCloudBlobToLocationConverter = createMock(AzureCloudBlobHolderToCloudObjectLocationConverter.class);
inputDataConfig = createMock(AzureInputDataConfig.class);
cloudBlobDruid1 = createMock(CloudBlobHolder.class); cloudBlobDruid1 = createMock(CloudBlobHolder.class);
azureCloudBlobIterable = createMock(AzureCloudBlobIterable.class); azureCloudBlobIterable = createMock(AzureCloudBlobIterable.class);
} }
@ -99,6 +103,7 @@ public class AzureInputSourceTest extends EasyMockSupport
entityFactory, entityFactory,
azureCloudBlobIterableFactory, azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter, azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS, EMPTY_URIS,
EMPTY_PREFIXES, EMPTY_PREFIXES,
EMPTY_OBJECTS EMPTY_OBJECTS
@ -118,6 +123,7 @@ public class AzureInputSourceTest extends EasyMockSupport
entityFactory, entityFactory,
azureCloudBlobIterableFactory, azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter, azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS, EMPTY_URIS,
EMPTY_PREFIXES, EMPTY_PREFIXES,
objects objects
@ -135,7 +141,8 @@ public class AzureInputSourceTest extends EasyMockSupport
List<CloudObjectLocation> expectedCloudLocations = ImmutableList.of(CLOUD_OBJECT_LOCATION_1); List<CloudObjectLocation> expectedCloudLocations = ImmutableList.of(CLOUD_OBJECT_LOCATION_1);
List<CloudBlobHolder> expectedCloudBlobs = ImmutableList.of(cloudBlobDruid1); List<CloudBlobHolder> expectedCloudBlobs = ImmutableList.of(cloudBlobDruid1);
Iterator<CloudBlobHolder> expectedCloudBlobsIterator = expectedCloudBlobs.iterator(); Iterator<CloudBlobHolder> expectedCloudBlobsIterator = expectedCloudBlobs.iterator();
EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, AzureInputSource.MAX_LISTING_LENGTH)).andReturn( EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, MAX_LISTING_LENGTH)).andReturn(
azureCloudBlobIterable); azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.spliterator()) EasyMock.expect(azureCloudBlobIterable.spliterator())
.andReturn(Spliterators.spliteratorUnknownSize(expectedCloudBlobsIterator, 0)); .andReturn(Spliterators.spliteratorUnknownSize(expectedCloudBlobsIterator, 0));
@ -148,6 +155,7 @@ public class AzureInputSourceTest extends EasyMockSupport
entityFactory, entityFactory,
azureCloudBlobIterableFactory, azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter, azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS, EMPTY_URIS,
prefixes, prefixes,
EMPTY_OBJECTS EMPTY_OBJECTS
@ -173,6 +181,7 @@ public class AzureInputSourceTest extends EasyMockSupport
entityFactory, entityFactory,
azureCloudBlobIterableFactory, azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter, azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS, EMPTY_URIS,
prefixes, prefixes,
EMPTY_OBJECTS EMPTY_OBJECTS
@ -192,6 +201,7 @@ public class AzureInputSourceTest extends EasyMockSupport
entityFactory, entityFactory,
azureCloudBlobIterableFactory, azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter, azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS, EMPTY_URIS,
prefixes, prefixes,
EMPTY_OBJECTS EMPTY_OBJECTS
@ -211,6 +221,7 @@ public class AzureInputSourceTest extends EasyMockSupport
.withNonnullFields("entityFactory") .withNonnullFields("entityFactory")
.withNonnullFields("azureCloudBlobIterableFactory") .withNonnullFields("azureCloudBlobIterableFactory")
.withNonnullFields("azureCloudBlobToLocationConverter") .withNonnullFields("azureCloudBlobToLocationConverter")
.withNonnullFields("inputDataConfig")
.verify(); .verify();
} }

View File

@ -40,7 +40,7 @@ public class AzureDataSegmentPullerTest extends EasyMockSupport
private static final String SEGMENT_FILE_NAME = "segment"; private static final String SEGMENT_FILE_NAME = "segment";
private static final String CONTAINER_NAME = "container"; private static final String CONTAINER_NAME = "container";
private static final String BLOB_PATH = "path/to/storage/index.zip"; private static final String BLOB_PATH = "path/to/storage/index.zip";
private static final String BLOB_PATH_HADOOP = AzureDataSegmentPuller.AZURE_STORAGE_HOST_ADDRESS + "/path/to/storage/index.zip"; private static final String BLOB_PATH_HADOOP = AzureUtils.AZURE_STORAGE_HOST_ADDRESS + "/path/to/storage/index.zip";
private AzureStorage azureStorage; private AzureStorage azureStorage;
private AzureByteSourceFactory byteSourceFactory; private AzureByteSourceFactory byteSourceFactory;

View File

@ -19,11 +19,9 @@
package org.apache.druid.storage.azure; package org.apache.druid.storage.azure;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files; import com.google.common.io.Files;
import com.microsoft.azure.storage.StorageException; import com.microsoft.azure.storage.StorageException;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.MapUtils; import org.apache.druid.java.util.common.MapUtils;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
@ -51,7 +49,9 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
@Rule @Rule
public final TemporaryFolder tempFolder = new TemporaryFolder(); public final TemporaryFolder tempFolder = new TemporaryFolder();
private static final String ACCOUNT = "account";
private static final String CONTAINER_NAME = "container"; private static final String CONTAINER_NAME = "container";
private static final String PREFIX = "prefix";
private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip"; private static final String BLOB_PATH = "test/2015-04-12T00:00:00.000Z_2015-04-13T00:00:00.000Z/1/0/index.zip";
private static final DataSegment DATA_SEGMENT = new DataSegment( private static final DataSegment DATA_SEGMENT = new DataSegment(
"test", "test",
@ -65,8 +65,10 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
1 1
); );
private static final byte[] DATA = new byte[]{0x0, 0x0, 0x0, 0x1}; private static final byte[] DATA = new byte[]{0x0, 0x0, 0x0, 0x1};
private static final String UNIQUE_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip"; private static final String UNIQUE_MATCHER_NO_PREFIX = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/[A-Za-z0-9-]{36}/index\\.zip";
private static final String NON_UNIQUE_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip"; private static final String UNIQUE_MATCHER_PREFIX = PREFIX + "/" + UNIQUE_MATCHER_NO_PREFIX;
private static final String NON_UNIQUE_NO_PREFIX_MATCHER = "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip";
private static final String NON_UNIQUE_WITH_PREFIX_MATCHER = PREFIX + "/" + "foo/20150101T000000\\.000Z_20160101T000000\\.000Z/0/0/index\\.zip";
private static final DataSegment SEGMENT_TO_PUSH = new DataSegment( private static final DataSegment SEGMENT_TO_PUSH = new DataSegment(
"foo", "foo",
@ -82,24 +84,30 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
private AzureStorage azureStorage; private AzureStorage azureStorage;
private AzureAccountConfig azureAccountConfig; private AzureAccountConfig azureAccountConfig;
private ObjectMapper jsonMapper; private AzureDataSegmentConfig segmentConfigWithPrefix;
private AzureDataSegmentConfig segmentConfigWithoutPrefix;
@Before @Before
public void before() public void before()
{ {
azureStorage = createMock(AzureStorage.class); azureStorage = createMock(AzureStorage.class);
azureAccountConfig = new AzureAccountConfig(); azureAccountConfig = new AzureAccountConfig();
azureAccountConfig.setAccount("account"); azureAccountConfig.setAccount(ACCOUNT);
azureAccountConfig.setContainer("container");
jsonMapper = new DefaultObjectMapper(); segmentConfigWithPrefix = new AzureDataSegmentConfig();
segmentConfigWithPrefix.setContainer(CONTAINER_NAME);
segmentConfigWithPrefix.setPrefix(PREFIX + "/");
segmentConfigWithoutPrefix = new AzureDataSegmentConfig();
segmentConfigWithoutPrefix.setContainer(CONTAINER_NAME);
} }
@Test @Test
public void test_push_nonUniquePath_succeeds() throws Exception public void test_push_nonUniquePathNoPrefix_succeeds() throws Exception
{ {
boolean useUniquePath = false; boolean useUniquePath = false;
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithoutPrefix
);
// Create a mock segment on disk // Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin"); File tmp = tempFolder.newFile("version.bin");
@ -116,7 +124,70 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
Assert.assertTrue( Assert.assertTrue(
segment.getLoadSpec().get("blobPath").toString(), segment.getLoadSpec().get("blobPath").toString(),
Pattern.compile(NON_UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() Pattern.compile(NON_UNIQUE_NO_PREFIX_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
);
Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
verifyAll();
}
@Test
public void test_push_nonUniquePathWithPrefix_succeeds() throws Exception
{
boolean useUniquePath = false;
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix
);
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
Files.write(DATA, tmp);
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.eq(azurePath));
EasyMock.expectLastCall();
replayAll();
DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath);
Assert.assertTrue(
segment.getLoadSpec().get("blobPath").toString(),
Pattern.compile(NON_UNIQUE_WITH_PREFIX_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
);
Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
verifyAll();
}
@Test
public void test_push_uniquePathNoPrefix_succeeds() throws Exception
{
boolean useUniquePath = true;
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithoutPrefix);
// Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin");
Files.write(DATA, tmp);
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
azureStorage.uploadBlob(
EasyMock.anyObject(File.class),
EasyMock.eq(CONTAINER_NAME),
EasyMock.matches(UNIQUE_MATCHER_NO_PREFIX)
);
EasyMock.expectLastCall();
replayAll();
DataSegment segment = pusher.push(tempFolder.getRoot(), SEGMENT_TO_PUSH, useUniquePath);
Assert.assertTrue(
segment.getLoadSpec().get("blobPath").toString(),
Pattern.compile(UNIQUE_MATCHER_NO_PREFIX).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
); );
Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize()); Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
@ -128,7 +199,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
public void test_push_uniquePath_succeeds() throws Exception public void test_push_uniquePath_succeeds() throws Exception
{ {
boolean useUniquePath = true; boolean useUniquePath = true;
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix);
// Create a mock segment on disk // Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin"); File tmp = tempFolder.newFile("version.bin");
@ -136,7 +207,11 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
Files.write(DATA, tmp); Files.write(DATA, tmp);
String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath); String azurePath = pusher.getAzurePath(SEGMENT_TO_PUSH, useUniquePath);
azureStorage.uploadBlob(EasyMock.anyObject(File.class), EasyMock.eq(CONTAINER_NAME), EasyMock.matches(UNIQUE_MATCHER)); azureStorage.uploadBlob(
EasyMock.anyObject(File.class),
EasyMock.eq(CONTAINER_NAME),
EasyMock.matches(UNIQUE_MATCHER_PREFIX)
);
EasyMock.expectLastCall(); EasyMock.expectLastCall();
replayAll(); replayAll();
@ -145,7 +220,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
Assert.assertTrue( Assert.assertTrue(
segment.getLoadSpec().get("blobPath").toString(), segment.getLoadSpec().get("blobPath").toString(),
Pattern.compile(UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() Pattern.compile(UNIQUE_MATCHER_PREFIX).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
); );
Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize()); Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
@ -157,7 +232,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
public void test_push_exception_throwsException() throws Exception public void test_push_exception_throwsException() throws Exception
{ {
boolean useUniquePath = true; boolean useUniquePath = true;
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix);
// Create a mock segment on disk // Create a mock segment on disk
File tmp = tempFolder.newFile("version.bin"); File tmp = tempFolder.newFile("version.bin");
@ -175,7 +250,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
Assert.assertTrue( Assert.assertTrue(
segment.getLoadSpec().get("blobPath").toString(), segment.getLoadSpec().get("blobPath").toString(),
Pattern.compile(UNIQUE_MATCHER).matcher(segment.getLoadSpec().get("blobPath").toString()).matches() Pattern.compile(UNIQUE_MATCHER_NO_PREFIX).matcher(segment.getLoadSpec().get("blobPath").toString()).matches()
); );
Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize()); Assert.assertEquals(SEGMENT_TO_PUSH.getSize(), segment.getSize());
@ -187,7 +262,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
public void getAzurePathsTest() public void getAzurePathsTest()
{ {
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix);
final String storageDir = pusher.getStorageDir(DATA_SEGMENT, false); final String storageDir = pusher.getStorageDir(DATA_SEGMENT, false);
final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false); final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false);
@ -200,7 +275,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
@Test @Test
public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException public void uploadDataSegmentTest() throws StorageException, IOException, URISyntaxException
{ {
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix);
final int binaryVersion = 9; final int binaryVersion = 9;
final File compressedSegmentData = new File("index.zip"); final File compressedSegmentData = new File("index.zip");
final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false); final String azurePath = pusher.getAzurePath(DATA_SEGMENT, false);
@ -228,25 +303,41 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
} }
@Test @Test
public void getPathForHadoopTest() public void getPathForHadoopWithPrefixTest()
{ {
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix);
String hadoopPath = pusher.getPathForHadoop();
Assert.assertEquals("wasbs://container@account.blob.core.windows.net/prefix/", hadoopPath);
}
@Test
public void getPathForHadoopWithoutPrefixTest()
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithoutPrefix);
String hadoopPath = pusher.getPathForHadoop(); String hadoopPath = pusher.getPathForHadoop();
Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath); Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath);
} }
@Test @Test
public void test_getPathForHadoop_noArgs_succeeds() public void test_getPathForHadoop_noArgsWithoutPrefix_succeeds()
{ {
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithoutPrefix);
String hadoopPath = pusher.getPathForHadoop(""); String hadoopPath = pusher.getPathForHadoop("");
Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath); Assert.assertEquals("wasbs://container@account.blob.core.windows.net/", hadoopPath);
} }
@Test
public void test_getPathForHadoop_noArgsWithPrefix_succeeds()
{
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix);
String hadoopPath = pusher.getPathForHadoop("");
Assert.assertEquals("wasbs://container@account.blob.core.windows.net/prefix/", hadoopPath);
}
@Test @Test
public void test_getAllowedPropertyPrefixesForHadoop_returnsExpcetedPropertyPrefixes() public void test_getAllowedPropertyPrefixesForHadoop_returnsExpcetedPropertyPrefixes()
{ {
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix);
List<String> actualPropertyPrefixes = pusher.getAllowedPropertyPrefixesForHadoop(); List<String> actualPropertyPrefixes = pusher.getAllowedPropertyPrefixesForHadoop();
Assert.assertEquals(AzureDataSegmentPusher.ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP, actualPropertyPrefixes); Assert.assertEquals(AzureDataSegmentPusher.ALLOWED_PROPERTY_PREFIXES_FOR_HADOOP, actualPropertyPrefixes);
} }
@ -254,7 +345,7 @@ public class AzureDataSegmentPusherTest extends EasyMockSupport
@Test @Test
public void storageDirContainsNoColonsTest() public void storageDirContainsNoColonsTest()
{ {
AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig); AzureDataSegmentPusher pusher = new AzureDataSegmentPusher(azureStorage, azureAccountConfig, segmentConfigWithPrefix);
DataSegment withColons = DATA_SEGMENT.withVersion("2018-01-05T14:54:09.295Z"); DataSegment withColons = DATA_SEGMENT.withVersion("2018-01-05T14:54:09.295Z");
String segmentPath = pusher.getStorageDir(withColons, false); String segmentPath = pusher.getStorageDir(withColons, false);
Assert.assertFalse("Path should not contain any columns", segmentPath.contains(":")); Assert.assertFalse("Path should not contain any columns", segmentPath.contains(":"));

View File

@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
import com.google.inject.Binder; import com.google.inject.Binder;
import com.google.inject.Guice; import com.google.inject.Guice;
import com.google.inject.Injector; import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module; import com.google.inject.Module;
import com.microsoft.azure.storage.StorageCredentials; import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobClient;
@ -33,6 +32,7 @@ import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.guice.DruidGuiceExtensions; import org.apache.druid.guice.DruidGuiceExtensions;
import org.apache.druid.guice.JsonConfigurator; import org.apache.druid.guice.JsonConfigurator;
import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.LazySingleton;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.storage.azure.blob.ListBlobItemHolder; import org.apache.druid.storage.azure.blob.ListBlobItemHolder;
import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory; import org.apache.druid.storage.azure.blob.ListBlobItemHolderFactory;
import org.easymock.EasyMock; import org.easymock.EasyMock;
@ -53,6 +53,8 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
private static final String AZURE_ACCOUNT_NAME; private static final String AZURE_ACCOUNT_NAME;
private static final String AZURE_ACCOUNT_KEY; private static final String AZURE_ACCOUNT_KEY;
private static final String AZURE_CONTAINER; private static final String AZURE_CONTAINER;
private static final String AZURE_PREFIX;
private static final int AZURE_MAX_LISTING_LENGTH;
private static final String PATH = "path"; private static final String PATH = "path";
private static final Iterable<URI> EMPTY_PREFIXES_ITERABLE = ImmutableList.of(); private static final Iterable<URI> EMPTY_PREFIXES_ITERABLE = ImmutableList.of();
private static final Properties PROPERTIES; private static final Properties PROPERTIES;
@ -71,6 +73,8 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
AZURE_ACCOUNT_KEY = Base64.getUrlEncoder() AZURE_ACCOUNT_KEY = Base64.getUrlEncoder()
.encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8.toString())); .encodeToString("azureKey1".getBytes(StandardCharsets.UTF_8.toString()));
AZURE_CONTAINER = "azureContainer1"; AZURE_CONTAINER = "azureContainer1";
AZURE_PREFIX = "azurePrefix1";
AZURE_MAX_LISTING_LENGTH = 10;
PROPERTIES = initializePropertes(); PROPERTIES = initializePropertes();
} }
catch (Exception e) { catch (Exception e) {
@ -88,14 +92,38 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
} }
@Test @Test
public void test_getBlobClient_expectedClient() public void test_getAzureAccountConfig_expectedConfig()
{ {
injector = makeInjectorWithProperties(PROPERTIES); injector = makeInjectorWithProperties(PROPERTIES);
AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class)); AzureAccountConfig azureAccountConfig = injector.getInstance(AzureAccountConfig.class);
Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount()); Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount());
Assert.assertEquals(AZURE_ACCOUNT_KEY, azureAccountConfig.getKey()); Assert.assertEquals(AZURE_ACCOUNT_KEY, azureAccountConfig.getKey());
Assert.assertEquals(AZURE_CONTAINER, azureAccountConfig.getContainer()); }
@Test
public void test_getAzureDataSegmentConfig_expectedConfig()
{
injector = makeInjectorWithProperties(PROPERTIES);
AzureDataSegmentConfig segmentConfig = injector.getInstance(AzureDataSegmentConfig.class);
Assert.assertEquals(AZURE_CONTAINER, segmentConfig.getContainer());
Assert.assertEquals(AZURE_PREFIX, segmentConfig.getPrefix());
}
@Test
public void test_getAzureInputDataConfig_expectedConfig()
{
injector = makeInjectorWithProperties(PROPERTIES);
AzureInputDataConfig inputDataConfig = injector.getInstance(AzureInputDataConfig.class);
Assert.assertEquals(AZURE_MAX_LISTING_LENGTH, inputDataConfig.getMaxListingLength());
}
@Test
public void test_getBlobClient_expectedClient()
{
injector = makeInjectorWithProperties(PROPERTIES);
CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class); CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class);
StorageCredentials storageCredentials = cloudBlobClient.getCredentials(); StorageCredentials storageCredentials = cloudBlobClient.getCredentials();
@ -107,11 +135,6 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
public void test_getAzureStorageContainer_expectedClient() public void test_getAzureStorageContainer_expectedClient()
{ {
injector = makeInjectorWithProperties(PROPERTIES); injector = makeInjectorWithProperties(PROPERTIES);
AzureAccountConfig azureAccountConfig = injector.getInstance(Key.get(AzureAccountConfig.class));
Assert.assertEquals(AZURE_ACCOUNT_NAME, azureAccountConfig.getAccount());
Assert.assertEquals(AZURE_ACCOUNT_KEY, azureAccountConfig.getKey());
Assert.assertEquals(AZURE_CONTAINER, azureAccountConfig.getContainer());
CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class); CloudBlobClient cloudBlobClient = injector.getInstance(CloudBlobClient.class);
StorageCredentials storageCredentials = cloudBlobClient.getCredentials(); StorageCredentials storageCredentials = cloudBlobClient.getCredentials();
@ -204,6 +227,7 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
return Guice.createInjector( return Guice.createInjector(
ImmutableList.of( ImmutableList.of(
new DruidGuiceExtensions(), new DruidGuiceExtensions(),
new JacksonModule(),
new Module() new Module()
{ {
@Override @Override
@ -224,6 +248,8 @@ public class AzureStorageDruidModuleTest extends EasyMockSupport
props.put("druid.azure.account", AZURE_ACCOUNT_NAME); props.put("druid.azure.account", AZURE_ACCOUNT_NAME);
props.put("druid.azure.key", AZURE_ACCOUNT_KEY); props.put("druid.azure.key", AZURE_ACCOUNT_KEY);
props.put("druid.azure.container", AZURE_CONTAINER); props.put("druid.azure.container", AZURE_CONTAINER);
props.put("druid.azure.prefix", AZURE_PREFIX);
props.put("druid.azure.maxListingLength", String.valueOf(AZURE_MAX_LISTING_LENGTH));
return props; return props;
} }
} }

View File

@ -19,11 +19,14 @@
package org.apache.druid.storage.azure; package org.apache.druid.storage.azure;
import com.microsoft.azure.storage.StorageException;
import org.apache.druid.data.input.azure.AzureInputSource; import org.apache.druid.data.input.azure.AzureInputSource;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException;
public class AzureUtilsTest public class AzureUtilsTest
{ {
@ -35,6 +38,20 @@ public class AzureUtilsTest
+ BLOB_NAME; + BLOB_NAME;
private static final URI URI_WITH_PATH_WITH_LEADING_SLASH; private static final URI URI_WITH_PATH_WITH_LEADING_SLASH;
private static final URISyntaxException URI_SYNTAX_EXCEPTION = new URISyntaxException("", "");
private static final StorageException STORAGE_EXCEPTION = new StorageException("", "", null);
private static final IOException IO_EXCEPTION = new IOException();
private static final RuntimeException RUNTIME_EXCEPTION = new RuntimeException();
private static final RuntimeException NULL_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION = new RuntimeException("", null);
private static final RuntimeException IO_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION = new RuntimeException(
"",
new IOException()
);
private static final RuntimeException RUNTIME_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTON = new RuntimeException(
"",
new RuntimeException()
);
static { static {
try { try {
URI_WITH_PATH_WITH_LEADING_SLASH = new URI(AzureInputSource.SCHEME URI_WITH_PATH_WITH_LEADING_SLASH = new URI(AzureInputSource.SCHEME
@ -67,4 +84,60 @@ public class AzureUtilsTest
String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_NAME); String path = AzureUtils.maybeRemoveAzurePathPrefix(BLOB_NAME);
Assert.assertEquals(BLOB_NAME, path); Assert.assertEquals(BLOB_NAME, path);
} }
@Test
public void test_azureRetry_URISyntaxException_returnsFalse()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(URI_SYNTAX_EXCEPTION);
Assert.assertFalse(retry);
}
@Test
public void test_azureRetry_StorageException_returnsTrue()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(STORAGE_EXCEPTION);
Assert.assertTrue(retry);
}
@Test
public void test_azureRetry_IOException_returnsTrue()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(IO_EXCEPTION);
Assert.assertTrue(retry);
}
@Test
public void test_azureRetry_nullException_returnsFalse()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(null);
Assert.assertFalse(retry);
}
@Test
public void test_azureRetry_RunTimeException_returnsFalse()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(RUNTIME_EXCEPTION);
Assert.assertFalse(retry);
}
@Test
public void test_azureRetry_nullExceptionWrappedInRunTimeException_returnsFalse()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(NULL_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION);
Assert.assertFalse(retry);
}
@Test
public void test_azureRetry_IOExceptionWrappedInRunTimeException_returnsTrue()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(IO_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTION);
Assert.assertTrue(retry);
}
@Test
public void test_azureRetry_RunTimeExceptionWrappedInRunTimeException_returnsFalse()
{
boolean retry = AzureUtils.AZURE_RETRY.apply(RUNTIME_EXCEPTION_WRAPPED_IN_RUNTIME_EXCEPTON);
Assert.assertFalse(retry);
}
} }

View File

@ -28,6 +28,7 @@ import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.storage.google.GoogleInputDataConfig;
import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleUtils; import org.apache.druid.storage.google.GoogleUtils;
@ -40,13 +41,13 @@ import java.util.stream.StreamSupport;
public class GoogleCloudStorageInputSource extends CloudObjectInputSource<GoogleCloudStorageEntity> public class GoogleCloudStorageInputSource extends CloudObjectInputSource<GoogleCloudStorageEntity>
{ {
static final String SCHEME = "gs"; static final String SCHEME = "gs";
private static final int MAX_LISTING_LENGTH = 1024;
private final GoogleStorage storage; private final GoogleStorage storage;
private final GoogleInputDataConfig inputDataConfig;
@JsonCreator @JsonCreator
public GoogleCloudStorageInputSource( public GoogleCloudStorageInputSource(
@JacksonInject GoogleStorage storage, @JacksonInject GoogleStorage storage,
@JacksonInject GoogleInputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris, @JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes, @JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
@ -54,6 +55,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource<Google
{ {
super(SCHEME, uris, prefixes, objects); super(SCHEME, uris, prefixes, objects);
this.storage = storage; this.storage = storage;
this.inputDataConfig = inputDataConfig;
} }
@Override @Override
@ -73,7 +75,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource<Google
@Override @Override
public SplittableInputSource<CloudObjectLocation> withSplit(InputSplit<CloudObjectLocation> split) public SplittableInputSource<CloudObjectLocation> withSplit(InputSplit<CloudObjectLocation> split)
{ {
return new GoogleCloudStorageInputSource(storage, null, null, ImmutableList.of(split.get())); return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, ImmutableList.of(split.get()));
} }
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject) private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
@ -84,7 +86,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource<Google
private Iterable<StorageObject> storageObjectIterable() private Iterable<StorageObject> storageObjectIterable()
{ {
return () -> return () ->
GoogleUtils.lazyFetchingStorageObjectsIterator(storage, getPrefixes().iterator(), MAX_LISTING_LENGTH); GoogleUtils.lazyFetchingStorageObjectsIterator(storage, getPrefixes().iterator(), inputDataConfig.getMaxListingLength());
} }
@Override @Override

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.google;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
/**
* Stores the configuration for options related to reading
* input data from Google blob store into Druid
*/
public class GoogleInputDataConfig
{
/**
* The maximum number of input files matching a given prefix to retrieve
* from Google at a time.
*/
@JsonProperty
@Min(1)
private int maxListingLength = 1024;
public void setMaxListingLength(int maxListingLength)
{
this.maxListingLength = maxListingLength;
}
public int getMaxListingLength()
{
return maxListingLength;
}
}

View File

@ -46,6 +46,7 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.storage.google.GoogleInputDataConfig;
import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CompressionUtils; import org.apache.druid.utils.CompressionUtils;
@ -66,8 +67,9 @@ import java.util.stream.Stream;
public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTest public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTest
{ {
private static final long EXPECTED_MAX_LISTING_LENGTH = 1024L; private static final int MAX_LISTING_LENGTH = 10;
private static final GoogleStorage STORAGE = EasyMock.createMock(GoogleStorage.class); private static final GoogleStorage STORAGE = EasyMock.createMock(GoogleStorage.class);
private static final GoogleInputDataConfig INPUT_DATA_CONFIG = EasyMock.createMock(GoogleInputDataConfig.class);
private static final List<URI> EXPECTED_URIS = Arrays.asList( private static final List<URI> EXPECTED_URIS = Arrays.asList(
URI.create("gs://foo/bar/file.csv"), URI.create("gs://foo/bar/file.csv"),
@ -99,7 +101,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{ {
final ObjectMapper mapper = createGoogleObjectMapper(); final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withUris = final GoogleCloudStorageInputSource withUris =
new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null);
final GoogleCloudStorageInputSource serdeWithUris = final GoogleCloudStorageInputSource serdeWithUris =
mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class); mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withUris, serdeWithUris); Assert.assertEquals(withUris, serdeWithUris);
@ -110,7 +112,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{ {
final ObjectMapper mapper = createGoogleObjectMapper(); final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withPrefixes = final GoogleCloudStorageInputSource withPrefixes =
new GoogleCloudStorageInputSource(STORAGE, ImmutableList.of(), PREFIXES, null); new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null);
final GoogleCloudStorageInputSource serdeWithPrefixes = final GoogleCloudStorageInputSource serdeWithPrefixes =
mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class); mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes); Assert.assertEquals(withPrefixes, serdeWithPrefixes);
@ -123,6 +125,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
final GoogleCloudStorageInputSource withObjects = final GoogleCloudStorageInputSource withObjects =
new GoogleCloudStorageInputSource( new GoogleCloudStorageInputSource(
STORAGE, STORAGE,
INPUT_DATA_CONFIG,
null, null,
null, null,
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")) ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz"))
@ -137,7 +140,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{ {
GoogleCloudStorageInputSource inputSource = GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null);
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits( Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null), new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@ -150,12 +153,15 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
public void testWithPrefixesSplit() throws IOException public void testWithPrefixesSplit() throws IOException
{ {
EasyMock.reset(STORAGE); EasyMock.reset(STORAGE);
EasyMock.reset(INPUT_DATA_CONFIG);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
EasyMock.expect(INPUT_DATA_CONFIG.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
EasyMock.replay(STORAGE); EasyMock.replay(STORAGE);
EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource = GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, null, PREFIXES, null); new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null);
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits( Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null), new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@ -169,14 +175,18 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
public void testReader() throws IOException public void testReader() throws IOException
{ {
EasyMock.reset(STORAGE); EasyMock.reset(STORAGE);
EasyMock.reset(INPUT_DATA_CONFIG);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedGetObjectMock(EXPECTED_URIS.get(0)); addExpectedGetObjectMock(EXPECTED_URIS.get(0));
addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
addExpectedGetObjectMock(EXPECTED_URIS.get(1)); addExpectedGetObjectMock(EXPECTED_URIS.get(1));
EasyMock.expect(INPUT_DATA_CONFIG.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
EasyMock.replay(STORAGE); EasyMock.replay(STORAGE);
EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
STORAGE, STORAGE,
INPUT_DATA_CONFIG,
null, null,
PREFIXES, PREFIXES,
null null
@ -208,14 +218,18 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
public void testCompressedReader() throws IOException public void testCompressedReader() throws IOException
{ {
EasyMock.reset(STORAGE); EasyMock.reset(STORAGE);
EasyMock.reset(INPUT_DATA_CONFIG);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0))); addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)));
addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0)); addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0));
addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1))); addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)));
addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1)); addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1));
EasyMock.expect(INPUT_DATA_CONFIG.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
EasyMock.replay(STORAGE); EasyMock.replay(STORAGE);
EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
STORAGE, STORAGE,
INPUT_DATA_CONFIG,
null, null,
PREFIXES, PREFIXES,
null null
@ -250,7 +264,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class); Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class);
EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once(); EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once();
EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once(); EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once();
EasyMock.expect(listRequest.setMaxResults(EXPECTED_MAX_LISTING_LENGTH)).andReturn(listRequest).once(); EasyMock.expect(listRequest.setMaxResults((long) MAX_LISTING_LENGTH)).andReturn(listRequest).once();
EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())))) EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath()))))
.andReturn(listRequest) .andReturn(listRequest)
.once(); .once();

View File

@ -29,6 +29,7 @@ import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.SplittableInputSource; import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@ -42,10 +43,12 @@ import java.util.stream.StreamSupport;
public class S3InputSource extends CloudObjectInputSource<S3Entity> public class S3InputSource extends CloudObjectInputSource<S3Entity>
{ {
private final ServerSideEncryptingAmazonS3 s3Client; private final ServerSideEncryptingAmazonS3 s3Client;
private final S3InputDataConfig inputDataConfig;
@JsonCreator @JsonCreator
public S3InputSource( public S3InputSource(
@JacksonInject ServerSideEncryptingAmazonS3 s3Client, @JacksonInject ServerSideEncryptingAmazonS3 s3Client,
@JacksonInject S3InputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris, @JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes, @JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
@ -53,6 +56,7 @@ public class S3InputSource extends CloudObjectInputSource<S3Entity>
{ {
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects); super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client"); this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client");
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig");
} }
@Override @Override
@ -72,7 +76,7 @@ public class S3InputSource extends CloudObjectInputSource<S3Entity>
@Override @Override
public SplittableInputSource<CloudObjectLocation> withSplit(InputSplit<CloudObjectLocation> split) public SplittableInputSource<CloudObjectLocation> withSplit(InputSplit<CloudObjectLocation> split)
{ {
return new S3InputSource(s3Client, null, null, ImmutableList.of(split.get())); return new S3InputSource(s3Client, inputDataConfig, null, null, ImmutableList.of(split.get()));
} }
@Override @Override
@ -87,6 +91,6 @@ public class S3InputSource extends CloudObjectInputSource<S3Entity>
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes() private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
{ {
return () -> S3Utils.objectSummaryIterator(s3Client, getPrefixes(), MAX_LISTING_LENGTH); return () -> S3Utils.objectSummaryIterator(s3Client, getPrefixes(), inputDataConfig.getMaxListingLength());
} }
} }

View File

@ -37,8 +37,8 @@ public class S3DataSegmentPusherConfig
private boolean disableAcl = false; private boolean disableAcl = false;
@JsonProperty @JsonProperty
@Min(0) @Min(1)
private int maxListingLength = 1000; private int maxListingLength = 1024;
// use s3n by default for backward compatibility // use s3n by default for backward compatibility
@JsonProperty @JsonProperty
private boolean useS3aSchema = false; private boolean useS3aSchema = false;

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.storage.s3;
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.validation.constraints.Min;
/**
* Stores the configuration for options related to reading
* input data from Amazon S3 into Druid
*/
public class S3InputDataConfig
{
/**
* The maximum number of input files matching a given prefix to retrieve
* from Amazon S3 at a time.
*/
@JsonProperty
@Min(1)
private int maxListingLength = 1024;
public void setMaxListingLength(int maxListingLength)
{
this.maxListingLength = maxListingLength;
}
public int getMaxListingLength()
{
return maxListingLength;
}
}

View File

@ -54,6 +54,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.storage.s3.NoopServerSideEncryption; import org.apache.druid.storage.s3.NoopServerSideEncryption;
import org.apache.druid.storage.s3.S3InputDataConfig;
import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.testing.InitializedNullHandlingTest; import org.apache.druid.testing.InitializedNullHandlingTest;
@ -86,6 +87,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
S3_CLIENT, S3_CLIENT,
new NoopServerSideEncryption() new NoopServerSideEncryption()
); );
private static final S3InputDataConfig INPUT_DATA_CONFIG;
private static final int MAX_LISTING_LENGTH = 10;
private static final List<URI> EXPECTED_URIS = Arrays.asList( private static final List<URI> EXPECTED_URIS = Arrays.asList(
URI.create("s3://foo/bar/file.csv"), URI.create("s3://foo/bar/file.csv"),
@ -112,6 +115,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
private static final byte[] CONTENT = private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
static {
INPUT_DATA_CONFIG = new S3InputDataConfig();
INPUT_DATA_CONFIG.setMaxListingLength(MAX_LISTING_LENGTH);
}
@Rule @Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder(); public TemporaryFolder temporaryFolder = new TemporaryFolder();
@ -121,7 +129,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
@Test @Test
public void testSerdeWithUris() throws Exception public void testSerdeWithUris() throws Exception
{ {
final S3InputSource withUris = new S3InputSource(SERVICE, EXPECTED_URIS, null, null); final S3InputSource withUris = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, EXPECTED_URIS, null, null);
final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class); final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class);
Assert.assertEquals(withUris, serdeWithUris); Assert.assertEquals(withUris, serdeWithUris);
} }
@ -129,7 +137,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
@Test @Test
public void testSerdeWithPrefixes() throws Exception public void testSerdeWithPrefixes() throws Exception
{ {
final S3InputSource withPrefixes = new S3InputSource(SERVICE, null, PREFIXES, null); final S3InputSource withPrefixes = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null);
final S3InputSource serdeWithPrefixes = final S3InputSource serdeWithPrefixes =
MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes); Assert.assertEquals(withPrefixes, serdeWithPrefixes);
@ -141,6 +149,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
final S3InputSource withPrefixes = new S3InputSource( final S3InputSource withPrefixes = new S3InputSource(
SERVICE, SERVICE,
INPUT_DATA_CONFIG,
null, null,
null, null,
EXPECTED_LOCATION EXPECTED_LOCATION
@ -155,6 +164,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
{ {
final S3InputSource withPrefixes = new S3InputSource( final S3InputSource withPrefixes = new S3InputSource(
SERVICE, SERVICE,
INPUT_DATA_CONFIG,
ImmutableList.of(), ImmutableList.of(),
ImmutableList.of(), ImmutableList.of(),
EXPECTED_LOCATION EXPECTED_LOCATION
@ -171,6 +181,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
// constructor will explode // constructor will explode
new S3InputSource( new S3InputSource(
SERVICE, SERVICE,
INPUT_DATA_CONFIG,
EXPECTED_URIS, EXPECTED_URIS,
PREFIXES, PREFIXES,
EXPECTED_LOCATION EXPECTED_LOCATION
@ -184,6 +195,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
// constructor will explode // constructor will explode
new S3InputSource( new S3InputSource(
SERVICE, SERVICE,
INPUT_DATA_CONFIG,
EXPECTED_URIS, EXPECTED_URIS,
PREFIXES, PREFIXES,
ImmutableList.of() ImmutableList.of()
@ -197,6 +209,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
// constructor will explode // constructor will explode
new S3InputSource( new S3InputSource(
SERVICE, SERVICE,
INPUT_DATA_CONFIG,
ImmutableList.of(), ImmutableList.of(),
PREFIXES, PREFIXES,
EXPECTED_LOCATION EXPECTED_LOCATION
@ -206,7 +219,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
@Test @Test
public void testWithUrisSplit() public void testWithUrisSplit()
{ {
S3InputSource inputSource = new S3InputSource(SERVICE, EXPECTED_URIS, null, null); S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, EXPECTED_URIS, null, null);
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits( Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null), new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@ -224,7 +237,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); expectListObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
EasyMock.replay(S3_CLIENT); EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(SERVICE, null, PREFIXES, null); S3InputSource inputSource = new S3InputSource(SERVICE, INPUT_DATA_CONFIG, null, PREFIXES, null);
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits( Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null), new JsonInputFormat(JSONPathSpec.DEFAULT, null),
@ -245,6 +258,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
S3InputSource inputSource = new S3InputSource( S3InputSource inputSource = new S3InputSource(
SERVICE, SERVICE,
INPUT_DATA_CONFIG,
null, null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null null
@ -273,6 +287,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
S3InputSource inputSource = new S3InputSource( S3InputSource inputSource = new S3InputSource(
SERVICE, SERVICE,
INPUT_DATA_CONFIG,
null, null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null null
@ -314,6 +329,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
S3InputSource inputSource = new S3InputSource( S3InputSource inputSource = new S3InputSource(
SERVICE, SERVICE,
INPUT_DATA_CONFIG,
null, null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)), ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null null

View File

@ -50,7 +50,7 @@ public class S3DataSegmentPusherConfigTest
{ {
String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"}"; String jsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"}";
String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\"," String expectedJsonConfig = "{\"bucket\":\"bucket1\",\"baseKey\":\"dataSource1\","
+ "\"disableAcl\":false,\"maxListingLength\":1000,\"useS3aSchema\":false}"; + "\"disableAcl\":false,\"maxListingLength\":1024,\"useS3aSchema\":false}";
S3DataSegmentPusherConfig config = JSON_MAPPER.readValue(jsonConfig, S3DataSegmentPusherConfig.class); S3DataSegmentPusherConfig config = JSON_MAPPER.readValue(jsonConfig, S3DataSegmentPusherConfig.class);
Assert.assertEquals(expectedJsonConfig, JSON_MAPPER.writeValueAsString(config)); Assert.assertEquals(expectedJsonConfig, JSON_MAPPER.writeValueAsString(config));
@ -67,6 +67,6 @@ public class S3DataSegmentPusherConfigTest
Set<ConstraintViolation<S3DataSegmentPusherConfig>> violations = validator.validate(config); Set<ConstraintViolation<S3DataSegmentPusherConfig>> violations = validator.validate(config);
Assert.assertEquals(1, violations.size()); Assert.assertEquals(1, violations.size());
ConstraintViolation violation = Iterators.getOnlyElement(violations.iterator()); ConstraintViolation violation = Iterators.getOnlyElement(violations.iterator());
Assert.assertEquals("must be greater than or equal to 0", violation.getMessage()); Assert.assertEquals("must be greater than or equal to 1", violation.getMessage());
} }
} }