Reverting #12659 from 24.0 release (take 3) (#13053)

* Fix conflicts during revert

* Remove filter from web-console

* remove unused const

* fix indent

* In branch: lower coverage requirement for azure-extensions/pom.xml, add subset of CloudObjectInputSourceTest.

Co-authored-by: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com>
Co-authored-by: Vadim Ogievetsky <vadim@ogievetsky.com>
This commit is contained in:
Gian Merlino 2022-09-08 06:10:59 -07:00 committed by GitHub
parent 3974a4153e
commit 72f0fd0583
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 76 additions and 759 deletions

View File

@ -21,8 +21,6 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
@ -43,11 +41,9 @@ import java.util.stream.Stream;
public abstract class CloudObjectInputSource extends AbstractInputSource
implements SplittableInputSource<List<CloudObjectLocation>>
{
private final String scheme;
private final List<URI> uris;
private final List<URI> prefixes;
private final List<CloudObjectLocation> objects;
private final String filter;
public CloudObjectInputSource(
String scheme,
@ -56,30 +52,20 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
@Nullable List<CloudObjectLocation> objects
)
{
this.scheme = scheme;
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;
this.filter = null;
illegalArgsChecker();
if (!CollectionUtils.isNullOrEmpty(objects)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes));
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else {
throwIfIllegalArgs(true);
}
public CloudObjectInputSource(
String scheme,
@Nullable List<URI> uris,
@Nullable List<URI> prefixes,
@Nullable List<CloudObjectLocation> objects,
@Nullable String filter
)
{
this.scheme = scheme;
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;
this.filter = filter;
illegalArgsChecker();
}
@JsonProperty
@ -101,13 +87,6 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
return objects;
}
@Nullable
@JsonProperty
public String getFilter()
{
return filter;
}
/**
* Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation}. This
* is called internally by {@link #formattableReader} and operates on the output of {@link #createSplits}.
@ -119,9 +98,6 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
* this input sources backend API. This is called internally by {@link #createSplits} and {@link #estimateNumSplits},
* only if {@link #prefixes} is set, otherwise the splits are created directly from {@link #uris} or {@link #objects}.
* Calling if {@link #prefixes} is not set is likely to either lead to an empty iterator or null pointer exception.
*
* If {@link #filter} is set, the filter will be applied on {@link #uris} or {@link #objects}.
* {@link #filter} uses a glob notation, for example: "*.parquet".
*/
protected abstract Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(SplitHintSpec splitHintSpec);
@ -132,23 +108,12 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
Stream<CloudObjectLocation> objectStream = objects.stream();
if (StringUtils.isNotBlank(filter)) {
objectStream = objectStream.filter(object -> FilenameUtils.wildcardMatch(object.getPath(), filter));
return objects.stream().map(object -> new InputSplit<>(Collections.singletonList(object)));
}
return objectStream.map(object -> new InputSplit<>(Collections.singletonList(object)));
}
if (!CollectionUtils.isNullOrEmpty(uris)) {
Stream<URI> uriStream = uris.stream();
if (StringUtils.isNotBlank(filter)) {
uriStream = uriStream.filter(uri -> FilenameUtils.wildcardMatch(uri.toString(), filter));
}
return uriStream.map(CloudObjectLocation::new).map(object -> new InputSplit<>(Collections.singletonList(object)));
return uris.stream()
.map(CloudObjectLocation::new)
.map(object -> new InputSplit<>(Collections.singletonList(object)));
}
return getPrefixesSplitStream(getSplitHintSpecOrDefault(splitHintSpec));
@ -199,31 +164,15 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
return false;
}
CloudObjectInputSource that = (CloudObjectInputSource) o;
return Objects.equals(scheme, that.scheme) &&
Objects.equals(uris, that.uris) &&
return Objects.equals(uris, that.uris) &&
Objects.equals(prefixes, that.prefixes) &&
Objects.equals(objects, that.objects) &&
Objects.equals(filter, that.filter);
Objects.equals(objects, that.objects);
}
@Override
public int hashCode()
{
return Objects.hash(scheme, uris, prefixes, objects, filter);
}
private void illegalArgsChecker() throws IllegalArgumentException
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes));
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else {
throwIfIllegalArgs(true);
}
return Objects.hash(uris, prefixes, objects);
}
private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException

View File

@ -19,6 +19,7 @@
package org.apache.druid.data.input.impl;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@ -57,11 +58,6 @@ public class CloudObjectInputSourceTest
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv"))
);
private static final List<CloudObjectLocation> OBJECTS_BEFORE_FILTER = Arrays.asList(
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
new CloudObjectLocation(URI.create("s3://bar/foo/file2.parquet"))
);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@ -69,7 +65,7 @@ public class CloudObjectInputSourceTest
public void testGetUris()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, null)
.useConstructor(SCHEME, URIS, null, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
@ -83,7 +79,7 @@ public class CloudObjectInputSourceTest
public void testGetPrefixes()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, null, PREFIXES, null, null)
.useConstructor(SCHEME, null, PREFIXES, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
@ -93,61 +89,11 @@ public class CloudObjectInputSourceTest
);
}
@Test
public void testGetFilter()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, "*.parquet")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Assert.assertEquals("*.parquet", inputSource.getFilter());
}
@Test
public void testInequality()
{
CloudObjectInputSource inputSource1 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, "*.parquet")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
CloudObjectInputSource inputSource2 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, "*.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Assert.assertEquals("*.parquet", inputSource1.getFilter());
Assert.assertEquals("*.csv", inputSource2.getFilter());
Assert.assertFalse(inputSource2.equals(inputSource1));
}
@Test
public void testWithUrisFilter()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS2, null, null, "*.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
Assert.assertEquals("*.csv", inputSource.getFilter());
Assert.assertEquals(URIS, returnedLocationUris);
}
@Test
public void testWithUris()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, null)
.useConstructor(SCHEME, URIS, null, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
@ -160,28 +106,6 @@ public class CloudObjectInputSourceTest
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
Assert.assertEquals(null, inputSource.getFilter());
Assert.assertEquals(URIS, returnedLocationUris);
}
@Test
public void testWithObjectsFilter()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, null, null, OBJECTS_BEFORE_FILTER, "*.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
Assert.assertEquals("*.csv", inputSource.getFilter());
Assert.assertEquals(URIS, returnedLocationUris);
}
@ -189,7 +113,7 @@ public class CloudObjectInputSourceTest
public void testWithObjects()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, null, null, OBJECTS, null)
.useConstructor(SCHEME, null, null, OBJECTS)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
@ -200,7 +124,12 @@ public class CloudObjectInputSourceTest
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
Assert.assertEquals(null, inputSource.getFilter());
Assert.assertEquals(OBJECTS, returnedLocations);
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(CloudObjectInputSource.class).usingGetClass().verify();
}
}

View File

@ -46,7 +46,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.json",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"]
},
"inputFormat": {
@ -63,7 +62,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.parquet",
"prefixes": ["s3://foo/bar/", "s3://bar/foo/"]
},
"inputFormat": {
@ -81,7 +79,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.json",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
@ -101,7 +98,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.json",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
@ -122,7 +118,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "s3",
"filter": "*.json",
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
"properties": {
"accessKeyId": "KLJ78979SDFdS2",
@ -182,7 +177,6 @@ Sample specs:
|uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no|
| endpointConfig |Config for overriding the default S3 endpoint and signing region. This would allow ingesting data from a different S3 store. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given)
| clientConfig |S3 client properties for the overridden s3 endpoint. This is used in conjunction with `endPointConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given)
| proxyConfig |Properties for specifying proxy information for the overridden s3 endpoint. This is used in conjunction with `clientConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given)
@ -226,7 +220,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "google",
"filter": "*.json",
"uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"]
},
"inputFormat": {
@ -243,7 +236,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "google",
"filter": "*.parquet",
"prefixes": ["gs://foo/bar/", "gs://bar/foo/"]
},
"inputFormat": {
@ -261,7 +253,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "google",
"filter": "*.json",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
@ -281,7 +272,6 @@ Sample specs:
|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no|
Note that the Google Cloud Storage input source will skip all empty objects only when `prefixes` is specified.
@ -307,7 +297,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "azure",
"filter": "*.json",
"uris": ["azure://container/prefix1/file.json", "azure://container/prefix2/file2.json"]
},
"inputFormat": {
@ -324,7 +313,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "azure",
"filter": "*.parquet",
"prefixes": ["azure://container/prefix1/", "azure://container/prefix2/"]
},
"inputFormat": {
@ -342,7 +330,6 @@ Sample specs:
"type": "index_parallel",
"inputSource": {
"type": "azure",
"filter": "*.json",
"objects": [
{ "bucket": "container", "path": "prefix1/file1.json"},
{ "bucket": "container", "path": "prefix2/file2.json"}
@ -362,7 +349,6 @@ Sample specs:
|uris|JSON array of URIs where the Azure objects to be ingested are located, in the form "azure://\<container>/\<path-to-file\>"|None|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Azure objects to ingest, in the form `azure://\<container>/\<prefix\>`. Empty objects starting with one of the given prefixes are skipped.|None|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Azure objects to ingest.|None|`uris` or `prefixes` or `objects` must be set|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no|
Note that the Azure input source skips all empty objects only when `prefixes` is specified.
@ -602,6 +588,7 @@ Sample spec:
|--------|-----------|---------|
|type|Set the value to `local`.|yes|
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|yes if `baseDir` is specified|
|type|This should be "local".|yes|
|baseDir|Directory to search recursively for files to be ingested. Empty files under the `baseDir` will be skipped.|At least one of `baseDir` or `files` should be specified|
|files|File paths to ingest. Some files can be ignored to avoid ingesting duplicate files if they are located under the specified `baseDir`. Empty files will be skipped.|At least one of `baseDir` or `files` should be specified|

View File

@ -27,9 +27,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@ -77,11 +74,10 @@ public class OssInputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("filter") @Nullable String filter,
@JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig
)
{
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, filter);
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig");
Preconditions.checkNotNull(client, "client");
this.inputSourceConfig = inputSourceConfig;
@ -134,7 +130,6 @@ public class OssInputSource extends CloudObjectInputSource
null,
null,
split.get(),
getFilter(),
getOssInputSourceConfig()
);
}
@ -168,29 +163,16 @@ public class OssInputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", filter=" + getFilter() +
", ossInputSourceConfig=" + getOssInputSourceConfig() +
'}';
}
private Iterable<OSSObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> {
Iterator<OSSObjectSummary> iterator = OssUtils.objectSummaryIterator(
return () -> OssUtils.objectSummaryIterator(
clientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength()
);
// Skip files that didn't match filter.
if (StringUtils.isNotBlank(getFilter())) {
iterator = Iterators.filter(
iterator,
object -> FilenameUtils.wildcardMatch(object.getKey(), getFilter())
);
}
return iterator;
};
}
}

View File

@ -142,7 +142,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
null,
null
);
final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class);
@ -158,7 +157,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
final OssInputSource serdeWithPrefixes =
@ -175,7 +173,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
null
);
final OssInputSource serdeWithPrefixes =
@ -199,7 +196,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
mockConfigPropertiesWithoutKeyAndSecret
);
Assert.assertNotNull(withPrefixes);
@ -219,7 +215,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
CLOUD_CONFIG_PROPERTIES
);
final OssInputSource serdeWithPrefixes =
@ -239,7 +234,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
null
);
final OssInputSource serdeWithPrefixes =
@ -257,7 +251,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(),
ImmutableList.of(),
EXPECTED_LOCATION,
null,
null
);
final OssInputSource serdeWithPrefixes =
@ -276,7 +269,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
EXPECTED_LOCATION,
null,
null
);
}
@ -292,7 +284,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
PREFIXES,
ImmutableList.of(),
null,
null
);
}
@ -308,7 +299,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(),
PREFIXES,
EXPECTED_LOCATION,
null,
null
);
}
@ -322,7 +312,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
EXPECTED_URIS,
null,
null,
null,
null
);
@ -348,7 +337,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
@ -375,7 +363,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
@ -405,7 +392,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
PREFIXES,
null,
null,
null
);
@ -434,7 +420,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
null
);
@ -465,7 +450,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null,
null,
null
);
@ -509,7 +493,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null,
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null,
null,
null
);

View File

@ -178,32 +178,32 @@
<limit>
<counter>INSTRUCTION</counter>
<value>COVEREDRATIO</value>
<minimum>0.86</minimum>
<minimum>0.70</minimum>
</limit>
<limit>
<counter>LINE</counter>
<value>COVEREDRATIO</value>
<minimum>0.85</minimum>
<minimum>0.70</minimum>
</limit>
<limit>
<counter>BRANCH</counter>
<value>COVEREDRATIO</value>
<minimum>0.88</minimum>
<minimum>0.70</minimum>
</limit>
<limit>
<counter>COMPLEXITY</counter>
<value>COVEREDRATIO</value>
<minimum>0.80</minimum>
<minimum>0.70</minimum>
</limit>
<limit>
<counter>METHOD</counter>
<value>COVEREDRATIO</value>
<minimum>0.78</minimum>
<minimum>0.70</minimum>
</limit>
<limit>
<counter>CLASS</counter>
<value>COVEREDRATIO</value>
<minimum>0.90</minimum>
<minimum>0.70</minimum>
</limit>
</limits>
</rule>

View File

@ -23,9 +23,6 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
@ -71,11 +68,10 @@ public class AzureInputSource extends CloudObjectInputSource
@JacksonInject AzureInputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("filter") @Nullable String filter
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
)
{
super(SCHEME, uris, prefixes, objects, filter);
super(SCHEME, uris, prefixes, objects);
this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
this.entityFactory = Preconditions.checkNotNull(entityFactory, "AzureEntityFactory");
this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
@ -100,8 +96,7 @@ public class AzureInputSource extends CloudObjectInputSource
inputDataConfig,
null,
null,
split.get(),
getFilter()
split.get()
);
}
@ -118,7 +113,6 @@ public class AzureInputSource extends CloudObjectInputSource
getIterableObjectsFromPrefixes().iterator(),
blobHolder -> new InputFileAttribute(blobHolder.getBlobLength())
);
return Streams.sequentialStreamFrom(splitIterator)
.map(objects -> objects.stream()
.map(azureCloudBlobToLocationConverter::createCloudObjectLocation)
@ -128,19 +122,7 @@ public class AzureInputSource extends CloudObjectInputSource
private Iterable<CloudBlobHolder> getIterableObjectsFromPrefixes()
{
return () -> {
Iterator<CloudBlobHolder> iterator = azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength()).iterator();
// Skip files that didn't match filter.
if (StringUtils.isNotBlank(getFilter())) {
iterator = Iterators.filter(
iterator,
object -> FilenameUtils.wildcardMatch(object.getName(), getFilter())
);
}
return iterator;
};
return azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength());
}
@Override
@ -183,7 +165,6 @@ public class AzureInputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", filter=" + getFilter() +
'}';
}
}

View File

@ -20,9 +20,7 @@
package org.apache.druid.data.input.azure;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.commons.io.FilenameUtils;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation;
@ -56,7 +54,7 @@ public class AzureInputSourceTest extends EasyMockSupport
private final List<URI> EMPTY_PREFIXES = ImmutableList.of();
private final List<CloudObjectLocation> EMPTY_OBJECTS = ImmutableList.of();
private static final String CONTAINER = "CONTAINER";
private static final String BLOB_PATH = "BLOB_PATH.csv";
private static final String BLOB_PATH = "BLOB_PATH";
private static final CloudObjectLocation CLOUD_OBJECT_LOCATION_1 = new CloudObjectLocation(CONTAINER, BLOB_PATH);
private static final int MAX_LISTING_LENGTH = 10;
@ -108,8 +106,7 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
EMPTY_OBJECTS,
null
EMPTY_OBJECTS
);
}
@ -129,8 +126,7 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
EMPTY_PREFIXES,
objects,
null
objects
);
Assert.assertEquals(1, inputSplit.get().size());
@ -163,55 +159,7 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
null
);
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
new MaxSizeSplitHintSpec(null, 1)
);
List<List<CloudObjectLocation>> actualCloudLocationList = cloudObjectStream.map(InputSplit::get)
.collect(Collectors.toList());
verifyAll();
Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
}
@Test
public void test_getPrefixesSplitStream_withFilter_successfullyCreatesCloudLocation_returnsExpectedLocations()
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
List<List<CloudObjectLocation>> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
List<CloudBlobHolder> expectedCloudBlobs = ImmutableList.of(cloudBlobDruid1);
Iterator<CloudBlobHolder> expectedCloudBlobsIterator = expectedCloudBlobs.iterator();
String filter = "*.csv";
expectedCloudBlobsIterator = Iterators.filter(
expectedCloudBlobsIterator,
object -> FilenameUtils.wildcardMatch(object.getName(), filter)
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, MAX_LISTING_LENGTH)).andReturn(
azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
.andReturn(CLOUD_OBJECT_LOCATION_1);
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
replayAll();
azureInputSource = new AzureInputSource(
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
filter
EMPTY_OBJECTS
);
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
@ -239,8 +187,7 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
null
EMPTY_OBJECTS
);
SplittableInputSource<List<CloudObjectLocation>> newInputSource = azureInputSource.withSplit(inputSplit);
@ -260,12 +207,11 @@ public class AzureInputSourceTest extends EasyMockSupport
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
null
EMPTY_OBJECTS
);
String actualToString = azureInputSource.toString();
Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[], filter=null}", actualToString);
Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[]}", actualToString);
}
@Test
@ -279,8 +225,6 @@ public class AzureInputSourceTest extends EasyMockSupport
.withNonnullFields("azureCloudBlobIterableFactory")
.withNonnullFields("azureCloudBlobToLocationConverter")
.withNonnullFields("inputDataConfig")
.withNonnullFields("filter")
.withNonnullFields("scheme")
.verify();
}

View File

@ -63,12 +63,6 @@
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>

View File

@ -23,9 +23,6 @@ import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFileAttribute;
import org.apache.druid.data.input.InputSplit;
@ -62,11 +59,10 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
@JacksonInject GoogleInputDataConfig inputDataConfig,
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("filter") @Nullable String filter
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
)
{
super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, filter);
super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects);
this.storage = storage;
this.inputDataConfig = inputDataConfig;
}
@ -115,7 +111,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
@Override
public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
{
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getFilter());
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get());
}
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
@ -125,23 +121,12 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
private Iterable<StorageObject> storageObjectIterable()
{
return () -> {
Iterator<StorageObject> iterator = GoogleUtils.lazyFetchingStorageObjectsIterator(
return () ->
GoogleUtils.lazyFetchingStorageObjectsIterator(
storage,
getPrefixes().iterator(),
inputDataConfig.getMaxListingLength()
);
// Skip files that didn't match filter.
if (StringUtils.isNotBlank(getFilter())) {
iterator = Iterators.filter(
iterator,
object -> FilenameUtils.wildcardMatch(object.getName(), getFilter())
);
}
return iterator;
};
}
@Override
@ -151,7 +136,6 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", filter=" + getFilter() +
'}';
}
}

View File

@ -56,9 +56,7 @@ import org.apache.druid.utils.CompressionUtils;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -88,12 +86,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
URI.create("gs://bar/foo/file2.csv.gz")
);
private static final List<URI> URIS_BEFORE_FILTER = Arrays.asList(
URI.create("gs://foo/bar/file.csv"),
URI.create("gs://bar/foo/file2.csv"),
URI.create("gs://bar/foo/file3.txt")
);
private static final List<List<CloudObjectLocation>> EXPECTED_OBJECTS =
EXPECTED_URIS.stream()
.map(uri -> Collections.singletonList(new CloudObjectLocation(uri)))
@ -104,19 +96,19 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
URI.create("gs://bar/foo")
);
private static final List<CloudObjectLocation> EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
private static final DateTime NOW = DateTimes.nowUtc();
private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testSerde() throws Exception
{
final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withUris =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null);
final GoogleCloudStorageInputSource serdeWithUris =
mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withUris, serdeWithUris);
@ -127,7 +119,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{
final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withPrefixes =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null, null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null);
final GoogleCloudStorageInputSource serdeWithPrefixes =
mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
@ -143,8 +135,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
INPUT_DATA_CONFIG,
null,
null,
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")),
null
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz"))
);
final GoogleCloudStorageInputSource serdeWithObjects =
mapper.readValue(mapper.writeValueAsString(withObjects), GoogleCloudStorageInputSource.class);
@ -156,7 +147,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
@ -165,55 +156,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithUrisFilter()
{
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
STORAGE,
INPUT_DATA_CONFIG,
URIS_BEFORE_FILTER,
null,
null,
"*.csv"
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testIllegalObjectsAndPrefixes()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new GoogleCloudStorageInputSource(
STORAGE,
INPUT_DATA_CONFIG,
null,
PREFIXES,
EXPECTED_OBJECTS.get(0),
"*.csv"
);
}
@Test
public void testIllegalUrisAndPrefixes()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new GoogleCloudStorageInputSource(
STORAGE,
INPUT_DATA_CONFIG,
URIS_BEFORE_FILTER,
PREFIXES,
null,
"*.csv"
);
}
@Test
public void testWithPrefixesSplit() throws IOException
{
@ -226,7 +168,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
@ -248,7 +190,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
@ -279,7 +221,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
INPUT_DATA_CONFIG,
null,
PREFIXES,
null,
null
);
@ -323,7 +264,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
INPUT_DATA_CONFIG,
null,
PREFIXES,
null,
null
);

View File

@ -35,8 +35,6 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.Iterators;
import org.apache.commons.io.FilenameUtils;
import org.apache.druid.common.aws.AWSClientConfig;
import org.apache.druid.common.aws.AWSEndpointConfig;
import org.apache.druid.common.aws.AWSProxyConfig;
@ -107,7 +105,6 @@ public class S3InputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@JsonProperty("filter") @Nullable String filter,
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
@JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
@JsonProperty("endpointConfig") @Nullable AWSEndpointConfig awsEndpointConfig,
@ -115,7 +112,7 @@ public class S3InputSource extends CloudObjectInputSource
@JacksonInject AWSCredentialsProvider awsCredentialsProvider
)
{
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, filter);
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig");
Preconditions.checkNotNull(s3Client, "s3Client");
this.s3InputSourceConfig = s3InputSourceConfig;
@ -185,7 +182,6 @@ public class S3InputSource extends CloudObjectInputSource
List<URI> uris,
List<URI> prefixes,
List<CloudObjectLocation> objects,
String filter,
S3InputSourceConfig s3InputSourceConfig,
AWSProxyConfig awsProxyConfig,
AWSEndpointConfig awsEndpointConfig,
@ -198,7 +194,6 @@ public class S3InputSource extends CloudObjectInputSource
uris,
prefixes,
objects,
filter,
s3InputSourceConfig,
awsProxyConfig,
awsEndpointConfig,
@ -215,7 +210,6 @@ public class S3InputSource extends CloudObjectInputSource
List<URI> uris,
List<URI> prefixes,
List<CloudObjectLocation> objects,
String filter,
S3InputSourceConfig s3InputSourceConfig,
AWSProxyConfig awsProxyConfig,
AWSEndpointConfig awsEndpointConfig,
@ -230,7 +224,6 @@ public class S3InputSource extends CloudObjectInputSource
uris,
prefixes,
objects,
filter,
s3InputSourceConfig,
awsProxyConfig,
awsEndpointConfig,
@ -335,7 +328,6 @@ public class S3InputSource extends CloudObjectInputSource
null,
null,
split.get(),
getFilter(),
getS3InputSourceConfig(),
getAwsProxyConfig(),
getAwsEndpointConfig(),
@ -376,7 +368,6 @@ public class S3InputSource extends CloudObjectInputSource
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
", filter=" + getFilter() +
", s3InputSourceConfig=" + getS3InputSourceConfig() +
", awsProxyConfig=" + getAwsProxyConfig() +
", awsEndpointConfig=" + getAwsEndpointConfig() +
@ -386,23 +377,10 @@ public class S3InputSource extends CloudObjectInputSource
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> {
Iterator<S3ObjectSummary> iterator = S3Utils.objectSummaryIterator(
s3ClientSupplier.get(),
return () -> S3Utils.objectSummaryIterator(s3ClientSupplier.get(),
getPrefixes(),
inputDataConfig.getMaxListingLength(),
maxRetries
);
// Skip files that didn't match filter.
if (org.apache.commons.lang.StringUtils.isNotBlank(getFilter())) {
iterator = Iterators.filter(
iterator,
object -> FilenameUtils.wildcardMatch(object.getKey(), getFilter())
);
}
return iterator;
};
}
}

View File

@ -111,11 +111,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
private static final S3InputDataConfig INPUT_DATA_CONFIG;
private static final int MAX_LISTING_LENGTH = 10;
private static final List<CloudObjectLocation> EXPECTED_OBJECTS = Arrays.asList(
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
new CloudObjectLocation(URI.create("s3://bar/foo/file2.csv"))
);
private static final List<URI> EXPECTED_URIS = Arrays.asList(
URI.create("s3://foo/bar/file.csv"),
URI.create("s3://bar/foo/file2.csv")
@ -126,18 +121,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
URI.create("s3://bar/foo/file2.csv.gz")
);
private static final List<CloudObjectLocation> OBJECTS_BEFORE_FILTER = Arrays.asList(
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
new CloudObjectLocation(URI.create("s3://bar/foo/file2.csv")),
new CloudObjectLocation(URI.create("s3://bar/foo/file3.txt"))
);
private static final List<URI> URIS_BEFORE_FILTER = Arrays.asList(
URI.create("s3://foo/bar/file.csv"),
URI.create("s3://bar/foo/file2.csv"),
URI.create("s3://bar/foo/file3.txt")
);
private static final List<List<CloudObjectLocation>> EXPECTED_COORDS =
EXPECTED_URIS.stream()
.map(uri -> Collections.singletonList(new CloudObjectLocation(uri)))
@ -172,76 +155,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Test
public void testGetUris()
{
final S3InputSource withUris = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
EXPECTED_URIS,
withUris.getUris()
);
}
@Test
public void testGetPrefixes()
{
final S3InputSource withPrefixes = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
PREFIXES,
null,
null,
null,
null,
null,
null
);
Assert.assertEquals(
PREFIXES,
withPrefixes.getPrefixes()
);
}
@Test
public void testGetFilter()
{
final S3InputSource withUris = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
null,
"*.parquet",
null,
null,
null,
null
);
Assert.assertEquals(
"*.parquet",
withUris.getFilter()
);
}
@Test
public void testSerdeWithUris() throws Exception
{
@ -255,7 +168,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class);
@ -275,7 +187,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
final S3InputSource serdeWithPrefixes =
@ -296,7 +207,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
final S3InputSource serdeWithPrefixes =
@ -321,7 +231,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
CLOUD_CONFIG_PROPERTIES,
null,
null,
@ -386,7 +295,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
mockConfigPropertiesWithoutKeyAndSecret,
mockAwsProxyConfig,
mockAwsEndpointConfig,
@ -421,7 +329,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
mockConfigPropertiesWithoutKeyAndSecret,
PROXY_CONFIG,
ENDPOINT_CONFIG,
@ -447,7 +354,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
EXPECTED_LOCATION,
null,
CLOUD_CONFIG_PROPERTIES,
null,
null,
@ -475,7 +381,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
final S3InputSource serdeWithPrefixes =
@ -497,7 +402,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
final S3InputSource serdeWithPrefixes =
@ -505,86 +409,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
}
@Test
public void testWithNullJsonProps()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
null,
null,
null,
null,
null,
null,
null
);
}
@Test
public void testIllegalObjectsAndUris()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
null,
EXPECTED_OBJECTS,
null,
null,
null,
null,
null
);
}
@Test
public void testIllegalObjectsAndPrefixes()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
PREFIXES,
EXPECTED_OBJECTS,
null,
null,
null,
null,
null
);
}
@Test
public void testIllegalUrisAndPrefixes()
{
expectedException.expect(IllegalArgumentException.class);
// constructor will explode
new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
EXPECTED_URIS,
PREFIXES,
null,
null,
null,
null,
null,
null
);
}
@Test
public void testSerdeWithInvalidArgs()
{
@ -600,7 +424,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
}
@ -620,7 +443,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
}
@ -640,7 +462,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
}
@ -658,82 +479,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithUrisFilter()
{
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
URIS_BEFORE_FILTER,
null,
null,
"*.csv",
null,
null,
null,
null
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithObjectsFilter()
{
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
null,
OBJECTS_BEFORE_FILTER,
"*.csv",
null,
null,
null,
null
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
null
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithoutObjectsFilter()
{
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
null,
EXPECTED_OBJECTS,
null,
null,
null,
null,
null
);
@ -763,38 +508,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList()));
EasyMock.verify(S3_CLIENT);
}
@Test
public void testGetPrefixesSplitStreamWithFilter()
{
EasyMock.reset(S3_CLIENT);
expectListObjects(PREFIXES.get(0), ImmutableList.of(URIS_BEFORE_FILTER.get(0)), CONTENT);
expectListObjects(PREFIXES.get(1), ImmutableList.of(URIS_BEFORE_FILTER.get(1), URIS_BEFORE_FILTER.get(2)), CONTENT);
EasyMock.replay(S3_CLIENT);
S3InputSource inputSource = new S3InputSource(
SERVICE,
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
INPUT_DATA_CONFIG,
null,
PREFIXES,
null,
"*.csv",
null,
null,
null,
null
);
@ -825,7 +538,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
@ -859,7 +571,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
@ -892,7 +603,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
@ -927,7 +637,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);
@ -974,7 +683,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
3 // only have three retries since they are slow
);
@ -1021,7 +729,6 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
null,
null,
null,
null,
null
);

View File

@ -454,24 +454,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
),
};
const inputSourceFilter: Field<IoConfig> = {
name: 'inputSource.filter',
label: 'File filter',
type: 'string',
suggestions: FILTER_SUGGESTIONS,
placeholder: '*',
info: (
<p>
A wildcard filter for files. See{' '}
<ExternalLink href="https://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html">
here
</ExternalLink>{' '}
for format information. Files matching the filter criteria are considered for ingestion.
Files not matching the filter criteria are ignored.
</p>
),
};
switch (ingestionComboType) {
case 'index_parallel:http':
return [
@ -651,7 +633,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
</>
),
},
inputSourceFilter,
{
name: 'inputSource.properties.accessKeyId.type',
label: 'Access key ID type',
@ -813,7 +794,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
</>
),
},
inputSourceFilter,
];
case 'index_parallel:google':
@ -872,7 +852,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
</>
),
},
inputSourceFilter,
];
case 'index_parallel:hdfs':

View File

@ -433,26 +433,6 @@ export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
),
},
// Cloud common
{
name: 'filter',
label: 'File filter',
type: 'string',
suggestions: FILTER_SUGGESTIONS,
placeholder: '*',
defined: typeIs('s3', 'azure', 'google'),
info: (
<p>
A wildcard filter for files. See{' '}
<ExternalLink href="https://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter.html">
here
</ExternalLink>{' '}
for format information. Files matching the filter criteria are considered for ingestion.
Files not matching the filter criteria are ignored.
</p>
),
},
// S3 auth extra
{
name: 'properties.accessKeyId.type',