Able to filter Cloud objects with glob notation. (#12659)

In a heterogeneous environment, sometimes you don't have control over the input folder. Upstream can put any folder they want. In this situation the S3InputSource.java is unusable.

Most people like me solved it by using Airflow to fetch the full list of parquet files and pass it over to Druid. But doing this explodes the JSON spec. We had a situation where 1 of the JSON spec is 16MB and that's simply too much for Overlord.

This patch allows users to pass {"filter": "*.parquet"} and let Druid performs the filtering of the input files.

I am using the glob notation to be consistent with the LocalFirehose syntax.
This commit is contained in:
Didip Kerabat 2022-06-23 23:10:08 -07:00 committed by GitHub
parent 1fc2f6e4b0
commit 6ddb828c7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 802 additions and 61 deletions

View File

@ -21,6 +21,8 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
@ -41,9 +43,11 @@ import java.util.stream.Stream;
public abstract class CloudObjectInputSource extends AbstractInputSource
implements SplittableInputSource<List<CloudObjectLocation>>
{
private final String scheme;
private final List<URI> uris;
private final List<URI> prefixes;
private final List<CloudObjectLocation> objects;
private final String filter;
public CloudObjectInputSource(
String scheme,
@ -52,20 +56,30 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
@Nullable List<CloudObjectLocation> objects
)
{
this.scheme = scheme;
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;
this.filter = null;
if (!CollectionUtils.isNullOrEmpty(objects)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes));
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else {
throwIfIllegalArgs(true);
}
illegalArgsChecker();
}
public CloudObjectInputSource(
String scheme,
@Nullable List<URI> uris,
@Nullable List<URI> prefixes,
@Nullable List<CloudObjectLocation> objects,
@Nullable String filter
)
{
this.scheme = scheme;
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;
this.filter = filter;
illegalArgsChecker();
}
@JsonProperty
@ -87,6 +101,13 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
return objects;
}
@Nullable
@JsonProperty
public String getFilter()
{
return filter;
}
/**
* Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation}. This
* is called internally by {@link #formattableReader} and operates on the output of {@link #createSplits}.
@ -98,6 +119,9 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
* this input sources backend API. This is called internally by {@link #createSplits} and {@link #estimateNumSplits},
* only if {@link #prefixes} is set, otherwise the splits are created directly from {@link #uris} or {@link #objects}.
* Calling if {@link #prefixes} is not set is likely to either lead to an empty iterator or null pointer exception.
*
* If {@link #filter} is set, the filter will be applied on {@link #uris} or {@link #objects}.
* {@link #filter} uses a glob notation, for example: "*.parquet".
*/
protected abstract Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(SplitHintSpec splitHintSpec);
@ -108,12 +132,23 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
return objects.stream().map(object -> new InputSplit<>(Collections.singletonList(object)));
Stream<CloudObjectLocation> objectStream = objects.stream();
if (StringUtils.isNotBlank(filter)) {
objectStream = objectStream.filter(object -> FilenameUtils.wildcardMatch(object.getPath(), filter));
}
return objectStream.map(object -> new InputSplit<>(Collections.singletonList(object)));
}
if (!CollectionUtils.isNullOrEmpty(uris)) {
return uris.stream()
.map(CloudObjectLocation::new)
.map(object -> new InputSplit<>(Collections.singletonList(object)));
Stream<URI> uriStream = uris.stream();
if (StringUtils.isNotBlank(filter)) {
uriStream = uriStream.filter(uri -> FilenameUtils.wildcardMatch(uri.toString(), filter));
}
return uriStream.map(CloudObjectLocation::new).map(object -> new InputSplit<>(Collections.singletonList(object)));
}
return getPrefixesSplitStream(getSplitHintSpecOrDefault(splitHintSpec));
@ -164,15 +199,31 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
return false;
}
CloudObjectInputSource that = (CloudObjectInputSource) o;
return Objects.equals(uris, that.uris) &&
return Objects.equals(scheme, that.scheme) &&
Objects.equals(uris, that.uris) &&
Objects.equals(prefixes, that.prefixes) &&
Objects.equals(objects, that.objects);
Objects.equals(objects, that.objects) &&
Objects.equals(filter, that.filter);
}
@Override
public int hashCode()
{
return Objects.hash(uris, prefixes, objects);
return Objects.hash(scheme, uris, prefixes, objects, filter);
}
private void illegalArgsChecker() throws IllegalArgumentException
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes));
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else {
throwIfIllegalArgs(true);
}
}
private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException

View File

@ -0,0 +1,206 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class CloudObjectInputSourceTest
{
private static final String SCHEME = "s3";
private static final List<URI> URIS = Collections.singletonList(
URI.create("s3://foo/bar/file.csv")
);
private static final List<URI> URIS2 = Arrays.asList(
URI.create("s3://foo/bar/file.csv"),
URI.create("s3://bar/foo/file2.parquet")
);
private static final List<URI> PREFIXES = Arrays.asList(
URI.create("s3://foo/bar/"),
URI.create("s3://bar/foo/")
);
private static final List<CloudObjectLocation> OBJECTS = Collections.singletonList(
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv"))
);
private static final List<CloudObjectLocation> OBJECTS_BEFORE_FILTER = Arrays.asList(
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
new CloudObjectLocation(URI.create("s3://bar/foo/file2.parquet"))
);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testGetUris()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Assert.assertEquals(
URIS,
inputSource.getUris()
);
}
@Test
public void testGetPrefixes()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, null, PREFIXES, null, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Assert.assertEquals(
PREFIXES,
inputSource.getPrefixes()
);
}
@Test
public void testGetFilter()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, "*.parquet")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Assert.assertEquals("*.parquet", inputSource.getFilter());
}
@Test
public void testInequality()
{
CloudObjectInputSource inputSource1 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, "*.parquet")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
CloudObjectInputSource inputSource2 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, "*.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Assert.assertEquals("*.parquet", inputSource1.getFilter());
Assert.assertEquals("*.csv", inputSource2.getFilter());
Assert.assertFalse(inputSource2.equals(inputSource1));
}
@Test
public void testWithUrisFilter()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS2, null, null, "*.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
Assert.assertEquals("*.csv", inputSource.getFilter());
Assert.assertEquals(URIS, returnedLocationUris);
}
@Test
public void testWithUris()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
Assert.assertEquals(null, inputSource.getFilter());
Assert.assertEquals(URIS, returnedLocationUris);
}
@Test
public void testWithObjectsFilter()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, null, null, OBJECTS_BEFORE_FILTER, "*.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
Assert.assertEquals("*.csv", inputSource.getFilter());
Assert.assertEquals(URIS, returnedLocationUris);
}
@Test
public void testWithObjects()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, null, null, OBJECTS, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
Assert.assertEquals(null, inputSource.getFilter());
Assert.assertEquals(OBJECTS, returnedLocations);
}
}

View File

@ -29,7 +29,7 @@ For general information on native batch indexing and parallel task indexing, see
## S3 input source
> You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the S3 input source.
> You need to include the [`druid-s3-extensions`](../development/extensions-core/s3.md) as an extension to use the S3 input source.
The S3 input source reads objects directly from S3. You can specify either:
- a list of S3 URI strings
@ -46,6 +46,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.json",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"]
},
"inputFormat": {
@ -62,6 +63,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.parquet",
"prefixes": ["s3://foo/bar/", "s3://bar/foo/"]
},
"inputFormat": {
@ -79,6 +81,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.json",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
@ -98,6 +101,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.json",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
@ -118,6 +122,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.json",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
@ -139,6 +144,7 @@ Sample specs:
|uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no|
|properties|Properties Object for overriding the default S3 configuration. See below for more information.|None|No (defaults will be used if not given)
Note that the S3 input source will skip all empty objects only when `prefixes` is specified.
@ -179,6 +185,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "google",
"filter": "*.json",
"uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"]
},
"inputFormat": {
@ -195,6 +202,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "google",
"filter": "*.parquet",
"prefixes": ["gs://foo/bar/", "gs://bar/foo/"]
},
"inputFormat": {
@ -212,6 +220,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "google",
"filter": "*.json",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
@ -231,6 +240,7 @@ Sample specs:
|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no|
Note that the Google Cloud Storage input source will skip all empty objects only when `prefixes` is specified.
@ -256,6 +266,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "azure",
"filter": "*.json",
"uris": ["azure://container/prefix1/file.json", "azure://container/prefix2/file2.json"]
},
"inputFormat": {
@ -272,6 +283,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "azure",
"filter": "*.parquet",
"prefixes": ["azure://container/prefix1/", "azure://container/prefix2/"]
},
"inputFormat": {
@ -289,6 +301,7 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "azure",
"filter": "*.json",
"objects": [
{ "bucket": "container", "path": "prefix1/file1.json"},
{ "bucket": "container", "path": "prefix2/file2.json"}
@ -308,6 +321,7 @@ Sample specs:
|uris|JSON array of URIs where the Azure objects to be ingested are located, in the form "azure://\<container>/\<path-to-file\>"|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Azure objects to ingest, in the form "azure://\<container>/\<prefix\>". Empty objects starting with one of the given prefixes are skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Azure objects to ingest.|None|`uris` or `prefixes` or `objects` must be set|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no|
Note that the Azure input source skips all empty objects only when `prefixes` is specified.
@ -546,7 +560,7 @@ Sample spec:
|property|description|required?|
|--------|-----------|---------|
|type|This should be "local".|yes|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information.|yes if `baseDir` is specified|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|yes if `baseDir` is specified|
|baseDir|Directory to search recursively for files to be ingested. Empty files under the `baseDir` will be skipped.|At least one of `baseDir` or `files` should be specified|
|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. Empty files will be skipped.|At least one of `baseDir` or `files` should be specified|

View File

@ -27,6 +27,9 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@ -74,10 +77,11 @@ public class OssInputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("filter") @Nullable String filter,
@JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig
)
{
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects);
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, filter);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig");
Preconditions.checkNotNull(client, "client");
this.inputSourceConfig = inputSourceConfig;
@ -130,6 +134,7 @@ public class OssInputSource extends CloudObjectInputSource
null,
null,
split.get(),
getFilter(),
getOssInputSourceConfig()
);
}
@ -163,16 +168,29 @@ public class OssInputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", filter=" + getFilter() +
", ossInputSourceConfig=" + getOssInputSourceConfig() +
'}';
}
private Iterable<OSSObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> OssUtils.objectSummaryIterator(
clientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength()
);
return () -> {
Iterator<OSSObjectSummary> iterator = OssUtils.objectSummaryIterator(
clientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength()
);
// Skip files that didn't match filter.
if (StringUtils.isNotBlank(getFilter())) {
iterator = Iterators.filter(
iterator,
object -> FilenameUtils.wildcardMatch(object.getKey(), getFilter())
);
}
return iterator;
};
}
}

View File

@ -142,6 +142,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
null,
null
);
final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class);
@ -157,6 +158,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
final OssInputSource serdeWithPrefixes =
@ -173,6 +175,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
null
);
final OssInputSource serdeWithPrefixes =
@ -196,6 +199,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
mockConfigPropertiesWithoutKeyAndSecret
);
Assert.assertNotNull(withPrefixes);
@ -215,6 +219,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
CLOUD_CONFIG_PROPERTIES
);
final OssInputSource serdeWithPrefixes =
@ -234,6 +239,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
null
);
final OssInputSource serdeWithPrefixes =
@ -251,6 +257,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(),
ImmutableList.of(),
EXPECTED_LOCATION,
null,
null
);
final OssInputSource serdeWithPrefixes =
@ -269,6 +276,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
EXPECTED_LOCATION,
null,
null
);
}
@ -284,6 +292,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
ImmutableList.of(),
null,
null
);
}
@ -299,6 +308,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(),
PREFIXES,
EXPECTED_LOCATION,
null,
null
);
}
@ -312,6 +322,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
null,
null
);
@ -337,6 +348,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
@ -363,6 +375,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
@ -392,6 +405,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
@ -420,6 +434,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
null
);
@ -450,6 +465,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
null
);
@ -493,6 +509,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
null,
null
);

View File

@ -23,6 +23,9 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
@ -68,10 +71,11 @@ public class AzureInputSource extends CloudObjectInputSource
@JacksonInject AzureInputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("filter") @Nullable String filter
)
{
super(SCHEME, uris, prefixes, objects);
super(SCHEME, uris, prefixes, objects, filter);
this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
this.entityFactory = Preconditions.checkNotNull(entityFactory, "AzureEntityFactory");
this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
@ -96,7 +100,8 @@ public class AzureInputSource extends CloudObjectInputSource
inputDataConfig,
null,
null,
split.get()
split.get(),
getFilter()
);
}
@ -113,6 +118,7 @@ public class AzureInputSource extends CloudObjectInputSource
getIterableObjectsFromPrefixes().iterator(),
blobHolder -> new InputFileAttribute(blobHolder.getBlobLength())
);
return Streams.sequentialStreamFrom(splitIterator)
.map(objects -> objects.stream()
.map(azureCloudBlobToLocationConverter::createCloudObjectLocation)
@ -122,7 +128,19 @@ public class AzureInputSource extends CloudObjectInputSource
private Iterable<CloudBlobHolder> getIterableObjectsFromPrefixes()
{
return azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength());
return () -> {
Iterator<CloudBlobHolder> iterator = azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength()).iterator();
// Skip files that didn't match filter.
if (StringUtils.isNotBlank(getFilter())) {
iterator = Iterators.filter(
iterator,
object -> FilenameUtils.wildcardMatch(object.getName(), getFilter())
);
}
return iterator;
};
}
@Override
@ -165,6 +183,7 @@ public class AzureInputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", filter=" + getFilter() +
'}';
}
}

View File

@ -20,7 +20,9 @@
package org.apache.druid.data.input.azure;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.commons.io.FilenameUtils;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
@ -54,7 +56,7 @@ public class AzureInputSourceTest extends EasyMockSupport
private final List<URI> EMPTY_PREFIXES = ImmutableList.of();
private final List<CloudObjectLocation> EMPTY_OBJECTS = ImmutableList.of();
private static final String CONTAINER = "CONTAINER";
private static final String BLOB_PATH = "BLOB_PATH";
private static final String BLOB_PATH = "BLOB_PATH.csv";
private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new CloudObjectLocation(CONTAINER, BLOB_PATH);
private static final int MAX_LISTING_LENGTH = 10;
@ -106,7 +108,8 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
EMPTY_OBJECTS
EMPTY_OBJECTS,
null
);
}
@ -126,7 +129,8 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
objects
objects,
null
);
Assert.assertEquals(1, inputSplit.get().size());
@ -159,7 +163,55 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS
EMPTY_OBJECTS,
null
);
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
new MaxSizeSplitHintSpec(null, 1)
);
List<List<CloudObjectLocation>> actualCloudLocationList = cloudObjectStream.map(InputSplit::get)
.collect(Collectors.toList());
verifyAll();
Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
}
@Test
public void test_getPrefixesSplitStream_withFilter_successfullyCreatesCloudLocation_returnsExpectedLocations()
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
List<List<CloudObjectLocation>> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
List<CloudBlobHolder> expectedCloudBlobs = ImmutableList.of(cloudBlobDruid1);
Iterator<CloudBlobHolder> expectedCloudBlobsIterator = expectedCloudBlobs.iterator();
String filter = "*.csv";
expectedCloudBlobsIterator = Iterators.filter(
expectedCloudBlobsIterator,
object -> FilenameUtils.wildcardMatch(object.getName(), filter)
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, MAX_LISTING_LENGTH)).andReturn(
azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
.andReturn(CLOUD_OBJECT_LOCATION_1);
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
replayAll();
azureInputSource = new AzureInputSource(
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
filter
);
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
@ -187,7 +239,8 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS
EMPTY_OBJECTS,
null
);
SplittableInputSource<List<CloudObjectLocation>> newInputSource = azureInputSource.withSplit(inputSplit);
@ -207,11 +260,12 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS
EMPTY_OBJECTS,
null
);
String actualToString = azureInputSource.toString();
Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[]}", actualToString);
Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[], filter=null}", actualToString);
}
@Test
@ -225,6 +279,8 @@ public class AzureInputSourceTest extends EasyMockSupport
.withNonnullFields("azureCloudBlobIterableFactory")
.withNonnullFields("azureCloudBlobToLocationConverter")
.withNonnullFields("inputDataConfig")
.withNonnullFields("filter")
.withNonnullFields("scheme")
.verify();
}

View File

@ -63,6 +63,12 @@
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>

View File

@ -23,6 +23,9 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@ -59,10 +62,11 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
@JacksonInject GoogleInputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("filter") @Nullable String filter
)
{
super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects);
super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, filter);
this.storage = storage;
this.inputDataConfig = inputDataConfig;
}
@ -111,7 +115,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
@Override
public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
{
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get());
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getFilter());
}
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
@ -121,12 +125,23 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
private Iterable<StorageObject> storageObjectIterable()
{
return () ->
GoogleUtils.lazyFetchingStorageObjectsIterator(
storage,
getPrefixes().iterator(),
inputDataConfig.getMaxListingLength()
return () -> {
Iterator<StorageObject> iterator = GoogleUtils.lazyFetchingStorageObjectsIterator(
storage,
getPrefixes().iterator(),
inputDataConfig.getMaxListingLength()
);
// Skip files that didn't match filter.
if (StringUtils.isNotBlank(getFilter())) {
iterator = Iterators.filter(
iterator,
object -> FilenameUtils.wildcardMatch(object.getName(), getFilter())
);
}
return iterator;
};
}
@Override
@ -136,6 +151,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", filter=" + getFilter() +
'}';
}
}

View File

@ -56,7 +56,9 @@ import org.apache.druid.utils.CompressionUtils;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -86,6 +88,12 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
URI.create("gs://bar/foo/file2.csv.gz")
);
private static final List<URI> URIS_BEFORE_FILTER = Arrays.asList(
URI.create("gs://foo/bar/file.csv"),
URI.create("gs://bar/foo/file2.csv"),
URI.create("gs://bar/foo/file3.txt")
);
private static final List<List<CloudObjectLocation>> EXPECTED_OBJECTS =
EXPECTED_URIS.stream()
.map(uri -> Collections.singletonList(new CloudObjectLocation(uri)))
@ -96,19 +104,19 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
URI.create("gs://bar/foo")
);
private static final List<CloudObjectLocation> EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
private static final DateTime NOW = DateTimes.nowUtc();
private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerde() throws Exception
{
final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withUris =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
final GoogleCloudStorageInputSource serdeWithUris =
mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withUris, serdeWithUris);
@ -119,7 +127,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{
final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withPrefixes =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null, null);
final GoogleCloudStorageInputSource serdeWithPrefixes =
mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
@ -135,7 +143,8 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
INPUT_DATA_CONFIG,
null,
null,
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz"))
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")),
null
);
final GoogleCloudStorageInputSource serdeWithObjects =
mapper.readValue(mapper.writeValueAsString(withObjects), GoogleCloudStorageInputSource.class);
@ -147,7 +156,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
@ -156,6 +165,55 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithUrisFilter()
{
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
STORAGE,
INPUT_DATA_CONFIG,
URIS_BEFORE_FILTER,
null,
null,
"*.csv"
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testIllegalObjectsAndPrefixes()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new GoogleCloudStorageInputSource(
STORAGE,
INPUT_DATA_CONFIG,
null,
PREFIXES,
EXPECTED_OBJECTS.get(0),
"*.csv"
);
}
@Test
public void testIllegalUrisAndPrefixes()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new GoogleCloudStorageInputSource(
STORAGE,
INPUT_DATA_CONFIG,
URIS_BEFORE_FILTER,
PREFIXES,
null,
"*.csv"
);
}
@Test
public void testWithPrefixesSplit() throws IOException
{
@ -168,7 +226,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
@ -190,7 +248,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
@ -221,6 +279,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
INPUT_DATA_CONFIG,
null,
PREFIXES,
null,
null
);
@ -264,6 +323,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
INPUT_DATA_CONFIG,
null,
PREFIXES,
null,
null
);

View File

@ -33,6 +33,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FilenameUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@ -94,11 +96,12 @@ public class S3InputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("filter") @Nullable String filter,
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
@JacksonInject AWSCredentialsProvider awsCredentialsProvider
)
{
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, filter);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig");
Preconditions.checkNotNull(s3Client, "s3Client");
this.s3InputSourceConfig = s3InputSourceConfig;
@ -138,10 +141,11 @@ public class S3InputSource extends CloudObjectInputSource
List<URI> uris,
List<URI> prefixes,
List<CloudObjectLocation> objects,
String filter,
S3InputSourceConfig s3InputSourceConfig
)
{
this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, filter, s3InputSourceConfig, null);
}
@VisibleForTesting
@ -152,11 +156,12 @@ public class S3InputSource extends CloudObjectInputSource
List<URI> uris,
List<URI> prefixes,
List<CloudObjectLocation> objects,
String filter,
S3InputSourceConfig s3InputSourceConfig,
int maxRetries
)
{
this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, s3InputSourceConfig, null);
this(s3Client, s3ClientBuilder, inputDataConfig, uris, prefixes, objects, filter, s3InputSourceConfig, null);
this.maxRetries = maxRetries;
}
@ -233,6 +238,7 @@ public class S3InputSource extends CloudObjectInputSource
null,
null,
split.get(),
getFilter(),
getS3InputSourceConfig(),
awsCredentialsProvider
);
@ -267,16 +273,30 @@ public class S3InputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", filter=" + getFilter() +
", s3InputSourceConfig=" + getS3InputSourceConfig() +
'}';
}
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> S3Utils.objectSummaryIterator(s3ClientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength(),
maxRetries
);
return () -> {
Iterator<S3ObjectSummary> iterator = S3Utils.objectSummaryIterator(
s3ClientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength(),
maxRetries
);
// Skip files that didn't match filter.
if (org.apache.commons.lang.StringUtils.isNotBlank(getFilter())) {
iterator = Iterators.filter(
iterator,
object -> FilenameUtils.wildcardMatch(object.getKey(), getFilter())
);
}
return iterator;
};
}
}

View File

@ -106,6 +106,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
private static final S3InputDataConfig INPUT_DATA_CONFIG;
private static final int MAX_LISTING_LENGTH = 10;
private static final List<CloudObjectLocation> EXPECTED_OBJECTS = Arrays.asList(
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
new CloudObjectLocation(URI.create("s3://bar/foo/file2.csv"))
);
private static final List<URI> EXPECTED_URIS = Arrays.asList(
URI.create("s3://foo/bar/file.csv"),
URI.create("s3://bar/foo/file2.csv")
@ -116,6 +121,18 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
URI.create("s3://bar/foo/file2.csv.gz")
);
private static final List<CloudObjectLocation> OBJECTS_BEFORE_FILTER = Arrays.asList(
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
new CloudObjectLocation(URI.create("s3://bar/foo/file2.csv")),
new CloudObjectLocation(URI.create("s3://bar/foo/file3.txt"))
);
private static final List<URI> URIS_BEFORE_FILTER = Arrays.asList(
URI.create("s3://foo/bar/file.csv"),
URI.create("s3://bar/foo/file2.csv"),
URI.create("s3://bar/foo/file3.txt")
);
private static final List<List<CloudObjectLocation>> EXPECTED_COORDS =
EXPECTED_URIS.stream()
.map(uri -> Collections.singletonList(new CloudObjectLocation(uri)))
@ -147,6 +164,66 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testGetUris()
{
final S3InputSource withUris = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
null,
null,
null
);
Assert.assertEquals(
EXPECTED_URIS,
withUris.getUris()
);
}
@Test
public void testGetPrefixes()
{
final S3InputSource withPrefixes = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
PREFIXES,
null,
null,
null
);
Assert.assertEquals(
PREFIXES,
withPrefixes.getPrefixes()
);
}
@Test
public void testGetFilter()
{
final S3InputSource withUris = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
null,
"*.parquet",
null
);
Assert.assertEquals(
"*.parquet",
withUris.getFilter()
);
}
@Test
public void testSerdeWithUris() throws Exception
{
@ -157,6 +234,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
null,
null
);
final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class);
@ -173,6 +251,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
final S3InputSource serdeWithPrefixes =
@ -190,6 +269,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
null
);
final S3InputSource serdeWithPrefixes =
@ -213,6 +293,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
CLOUD_CONFIG_PROPERTIES
);
final S3InputSource serdeWithPrefixes =
@ -243,6 +324,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
mockConfigPropertiesWithoutKeyAndSecret
);
Assert.assertNotNull(withPrefixes);
@ -265,6 +347,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
CLOUD_CONFIG_PROPERTIES
);
final S3InputSource serdeWithPrefixes =
@ -286,6 +369,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
null
);
@ -305,6 +389,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(),
ImmutableList.of(),
EXPECTED_LOCATION,
null,
null
);
final S3InputSource serdeWithPrefixes =
@ -312,6 +397,74 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
}
@Test
public void testWithNullJsonProps()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
null,
null,
null,
null
);
}
@Test
public void testIllegalObjectsAndUris()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
EXPECTED_OBJECTS,
null,
null
);
}
@Test
public void testIllegalObjectsAndPrefixes()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
PREFIXES,
EXPECTED_OBJECTS,
null,
null
);
}
@Test
public void testIllegalUrisAndPrefixes()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
PREFIXES,
null,
null,
null
);
}
@Test
public void testSerdeWithInvalidArgs()
{
@ -324,6 +477,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
EXPECTED_LOCATION,
null,
null
);
}
@ -340,6 +494,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
ImmutableList.of(),
null,
null
);
}
@ -356,6 +511,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(),
PREFIXES,
EXPECTED_LOCATION,
null,
null
);
}
@ -370,6 +526,73 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
null,
null
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithUrisFilter()
{
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
URIS_BEFORE_FILTER,
null,
null,
"*.csv",
null
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithObjectsFilter()
{
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
null,
OBJECTS_BEFORE_FILTER,
"*.csv",
null
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithoutObjectsFilter()
{
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
null,
EXPECTED_OBJECTS,
null,
null
);
@ -396,6 +619,35 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(S3_CLIENT);
}
@Test
public void testGetPrefixesSplitStreamWithFilter()
{
EasyMock.reset(S3_CLIENT);
expectListObjects(PREFIXES.get(0), ImmutableList.of(URIS_BEFORE_FILTER.get(0)), CONTENT);
expectListObjects(PREFIXES.get(1), ImmutableList.of(URIS_BEFORE_FILTER.get(1), URIS_BEFORE_FILTER.get(2)), CONTENT);
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
PREFIXES,
null,
"*.csv",
null
);
@ -423,6 +675,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
@ -453,6 +706,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
@ -482,6 +736,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
null
);
@ -513,6 +768,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
null
);
@ -556,6 +812,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(PREFIXES.get(0)),
null,
null,
null,
3 // only have three retries since they are slow
);
@ -599,6 +856,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
null,
null
);