Revert "Add filter in cloud object input source for backward compatibility (#13437)" (#13450)

This reverts commit b12e5f300e.
This commit is contained in:
Kashif Faraz 2022-11-30 16:33:05 +05:30 committed by GitHub
parent 291ded22d5
commit 8ff1b2d5d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 23 additions and 181 deletions

View File

@ -21,9 +21,7 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.druid.data.input.AbstractInputSource; import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntity;
@ -52,13 +50,6 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
private final List<URI> uris; private final List<URI> uris;
private final List<URI> prefixes; private final List<URI> prefixes;
private final List<CloudObjectLocation> objects; private final List<CloudObjectLocation> objects;
/**
* Preserved filter for backward compatibility, should be removed on next major release;
* use objectGlob instead.
*/
@Deprecated
private final String filter;
private final String objectGlob; private final String objectGlob;
public CloudObjectInputSource( public CloudObjectInputSource(
@ -72,7 +63,6 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
this.uris = uris; this.uris = uris;
this.prefixes = prefixes; this.prefixes = prefixes;
this.objects = objects; this.objects = objects;
this.filter = null;
this.objectGlob = null; this.objectGlob = null;
illegalArgsChecker(); illegalArgsChecker();
@ -83,7 +73,6 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
@Nullable List<URI> uris, @Nullable List<URI> uris,
@Nullable List<URI> prefixes, @Nullable List<URI> prefixes,
@Nullable List<CloudObjectLocation> objects, @Nullable List<CloudObjectLocation> objects,
@Deprecated @Nullable String filter,
@Nullable String objectGlob @Nullable String objectGlob
) )
{ {
@ -91,12 +80,8 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
this.uris = uris; this.uris = uris;
this.prefixes = prefixes; this.prefixes = prefixes;
this.objects = objects; this.objects = objects;
this.filter = filter;
this.objectGlob = objectGlob; this.objectGlob = objectGlob;
Preconditions.checkArgument(
filter == null || objectGlob == null,
"Cannot use filter and objectGlob together. Try using objectGlob instead of filter."
);
illegalArgsChecker(); illegalArgsChecker();
} }
@ -122,14 +107,6 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
return objects; return objects;
} }
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getFilter()
{
return filter;
}
@Nullable @Nullable
@JsonProperty @JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL) @JsonInclude(JsonInclude.Include.NON_NULL)
@ -167,8 +144,6 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
if (StringUtils.isNotBlank(objectGlob)) { if (StringUtils.isNotBlank(objectGlob)) {
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob()); PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
objectStream = objectStream.filter(object -> m.matches(Paths.get(object.getPath()))); objectStream = objectStream.filter(object -> m.matches(Paths.get(object.getPath())));
} else if (StringUtils.isNotBlank(filter)) {
objectStream = objectStream.filter(object -> FilenameUtils.wildcardMatch(object.getPath(), getFilter()));
} }
return objectStream.map(object -> new InputSplit<>(Collections.singletonList(object))); return objectStream.map(object -> new InputSplit<>(Collections.singletonList(object)));
@ -180,8 +155,6 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
if (StringUtils.isNotBlank(objectGlob)) { if (StringUtils.isNotBlank(objectGlob)) {
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob()); PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
uriStream = uriStream.filter(uri -> m.matches(Paths.get(uri.toString()))); uriStream = uriStream.filter(uri -> m.matches(Paths.get(uri.toString())));
} else if (StringUtils.isNotBlank(filter)) {
uriStream = uriStream.filter(uri -> FilenameUtils.wildcardMatch(uri.toString(), filter));
} }
return uriStream.map(CloudObjectLocation::new).map(object -> new InputSplit<>(Collections.singletonList(object))); return uriStream.map(CloudObjectLocation::new).map(object -> new InputSplit<>(Collections.singletonList(object)));
@ -239,14 +212,13 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
Objects.equals(uris, that.uris) && Objects.equals(uris, that.uris) &&
Objects.equals(prefixes, that.prefixes) && Objects.equals(prefixes, that.prefixes) &&
Objects.equals(objects, that.objects) && Objects.equals(objects, that.objects) &&
Objects.equals(filter, that.filter) &&
Objects.equals(objectGlob, that.objectGlob); Objects.equals(objectGlob, that.objectGlob);
} }
@Override @Override
public int hashCode() public int hashCode()
{ {
return Objects.hash(scheme, uris, prefixes, objects, filter, objectGlob); return Objects.hash(scheme, uris, prefixes, objects, objectGlob);
} }
private void illegalArgsChecker() throws IllegalArgumentException private void illegalArgsChecker() throws IllegalArgumentException

View File

@ -72,7 +72,7 @@ public class CloudObjectInputSourceTest
public void testGetUris() public void testGetUris()
{ {
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, null, null) .useConstructor(SCHEME, URIS, null, null, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS) .defaultAnswer(Mockito.CALLS_REAL_METHODS)
); );
@ -86,7 +86,7 @@ public class CloudObjectInputSourceTest
public void testGetPrefixes() public void testGetPrefixes()
{ {
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, null, PREFIXES, null, null, null) .useConstructor(SCHEME, null, PREFIXES, null, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS) .defaultAnswer(Mockito.CALLS_REAL_METHODS)
); );
@ -100,7 +100,7 @@ public class CloudObjectInputSourceTest
public void testGetObjectGlob() public void testGetObjectGlob()
{ {
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, null, "**.parquet") .useConstructor(SCHEME, URIS, null, null, "**.parquet")
.defaultAnswer(Mockito.CALLS_REAL_METHODS) .defaultAnswer(Mockito.CALLS_REAL_METHODS)
); );
@ -111,12 +111,12 @@ public class CloudObjectInputSourceTest
public void testInequality() public void testInequality()
{ {
CloudObjectInputSource inputSource1 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() CloudObjectInputSource inputSource1 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, null, "**.parquet") .useConstructor(SCHEME, URIS, null, null, "**.parquet")
.defaultAnswer(Mockito.CALLS_REAL_METHODS) .defaultAnswer(Mockito.CALLS_REAL_METHODS)
); );
CloudObjectInputSource inputSource2 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() CloudObjectInputSource inputSource2 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, null, "**.csv") .useConstructor(SCHEME, URIS, null, null, "**.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS) .defaultAnswer(Mockito.CALLS_REAL_METHODS)
); );
@ -128,32 +128,9 @@ public class CloudObjectInputSourceTest
@Test @Test
public void testWithUrisFilter() public void testWithUrisFilter()
{ {
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
Mockito.withSettings() .useConstructor(SCHEME, URIS2, null, null, "**.csv")
.useConstructor(SCHEME, URIS2, null, null, "*.csv", null) .defaultAnswer(Mockito.CALLS_REAL_METHODS)
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
new MaxSizeSplitHintSpec(null, 1)
);
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
Assert.assertEquals("*.csv", inputSource.getFilter());
Assert.assertEquals(URIS, returnedLocationUris);
}
@Test
public void testWithUrisObjectGlob()
{
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class,
Mockito.withSettings()
.useConstructor(SCHEME, URIS2, null, null, null, "**.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
); );
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits( Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
@ -173,7 +150,7 @@ public class CloudObjectInputSourceTest
public void testWithUris() public void testWithUris()
{ {
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, URIS, null, null, null, null) .useConstructor(SCHEME, URIS, null, null, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS) .defaultAnswer(Mockito.CALLS_REAL_METHODS)
); );
@ -194,7 +171,7 @@ public class CloudObjectInputSourceTest
public void testWithObjectsFilter() public void testWithObjectsFilter()
{ {
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, null, null, OBJECTS_BEFORE_GLOB, null, "**.csv") .useConstructor(SCHEME, null, null, OBJECTS_BEFORE_GLOB, "**.csv")
.defaultAnswer(Mockito.CALLS_REAL_METHODS) .defaultAnswer(Mockito.CALLS_REAL_METHODS)
); );
@ -215,7 +192,7 @@ public class CloudObjectInputSourceTest
public void testWithObjects() public void testWithObjects()
{ {
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings() CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
.useConstructor(SCHEME, null, null, OBJECTS, null, null) .useConstructor(SCHEME, null, null, OBJECTS, null)
.defaultAnswer(Mockito.CALLS_REAL_METHODS) .defaultAnswer(Mockito.CALLS_REAL_METHODS)
); );

View File

@ -79,12 +79,11 @@ public class OssInputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris, @JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes, @JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects, @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@Deprecated @JsonProperty("filter") @Nullable String filter,
@JsonProperty("objectGlob") @Nullable String objectGlob, @JsonProperty("objectGlob") @Nullable String objectGlob,
@JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig @JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig
) )
{ {
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, filter, objectGlob); super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig"); this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig");
Preconditions.checkNotNull(client, "client"); Preconditions.checkNotNull(client, "client");
this.inputSourceConfig = inputSourceConfig; this.inputSourceConfig = inputSourceConfig;
@ -137,7 +136,6 @@ public class OssInputSource extends CloudObjectInputSource
null, null,
null, null,
split.get(), split.get(),
getFilter(),
getObjectGlob(), getObjectGlob(),
getOssInputSourceConfig() getOssInputSourceConfig()
); );

View File

@ -143,7 +143,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null, null,
null, null,
null, null,
null,
null null
); );
final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class); final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class);
@ -160,7 +159,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
PREFIXES, PREFIXES,
null, null,
null, null,
null,
null null
); );
final OssInputSource serdeWithPrefixes = final OssInputSource serdeWithPrefixes =
@ -178,7 +176,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null, null,
EXPECTED_LOCATION, EXPECTED_LOCATION,
null, null,
null,
null null
); );
final OssInputSource serdeWithPrefixes = final OssInputSource serdeWithPrefixes =
@ -203,7 +200,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null, null,
EXPECTED_LOCATION, EXPECTED_LOCATION,
null, null,
null,
mockConfigPropertiesWithoutKeyAndSecret mockConfigPropertiesWithoutKeyAndSecret
); );
Assert.assertNotNull(withPrefixes); Assert.assertNotNull(withPrefixes);
@ -224,7 +220,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null, null,
EXPECTED_LOCATION, EXPECTED_LOCATION,
null, null,
null,
CLOUD_CONFIG_PROPERTIES CLOUD_CONFIG_PROPERTIES
); );
final OssInputSource serdeWithPrefixes = final OssInputSource serdeWithPrefixes =
@ -245,7 +240,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null, null,
EXPECTED_LOCATION, EXPECTED_LOCATION,
null, null,
null,
null null
); );
final OssInputSource serdeWithPrefixes = final OssInputSource serdeWithPrefixes =
@ -264,7 +258,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(), ImmutableList.of(),
EXPECTED_LOCATION, EXPECTED_LOCATION,
null, null,
null,
null null
); );
final OssInputSource serdeWithPrefixes = final OssInputSource serdeWithPrefixes =
@ -284,7 +277,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
PREFIXES, PREFIXES,
EXPECTED_LOCATION, EXPECTED_LOCATION,
null, null,
null,
null null
); );
} }
@ -301,7 +293,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
PREFIXES, PREFIXES,
ImmutableList.of(), ImmutableList.of(),
null, null,
null,
null null
); );
} }
@ -318,7 +309,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
PREFIXES, PREFIXES,
EXPECTED_LOCATION, EXPECTED_LOCATION,
null, null,
null,
null null
); );
} }
@ -333,7 +323,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
null, null,
null, null,
null, null,
null,
null null
); );
@ -360,7 +349,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
PREFIXES, PREFIXES,
null, null,
null, null,
null,
null null
); );
@ -388,7 +376,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
PREFIXES, PREFIXES,
null, null,
null, null,
null,
null null
); );
@ -419,7 +406,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
PREFIXES, PREFIXES,
null, null,
null, null,
null,
null null
); );
@ -449,7 +435,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null, null,
null, null,
null,
null null
); );
@ -481,7 +466,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
null, null,
null, null,
null,
null null
); );
@ -526,7 +510,6 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)), ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
null, null,
null, null,
null,
null null
); );

View File

@ -74,11 +74,10 @@ public class AzureInputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris, @JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes, @JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects, @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@Deprecated @JsonProperty("filter") @Nullable String filter,
@JsonProperty("objectGlob") @Nullable String objectGlob @JsonProperty("objectGlob") @Nullable String objectGlob
) )
{ {
super(SCHEME, uris, prefixes, objects, filter, objectGlob); super(SCHEME, uris, prefixes, objects, objectGlob);
this.storage = Preconditions.checkNotNull(storage, "AzureStorage"); this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
this.entityFactory = Preconditions.checkNotNull(entityFactory, "AzureEntityFactory"); this.entityFactory = Preconditions.checkNotNull(entityFactory, "AzureEntityFactory");
this.azureCloudBlobIterableFactory = Preconditions.checkNotNull( this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
@ -104,7 +103,6 @@ public class AzureInputSource extends CloudObjectInputSource
null, null,
null, null,
split.get(), split.get(),
getFilter(),
getObjectGlob() getObjectGlob()
); );
} }

View File

@ -22,7 +22,6 @@ package org.apache.druid.data.input.azure;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import nl.jqno.equalsverifier.EqualsVerifier; import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.commons.io.FilenameUtils;
import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CloudObjectLocation;
@ -112,7 +111,6 @@ public class AzureInputSourceTest extends EasyMockSupport
EMPTY_URIS, EMPTY_URIS,
EMPTY_PREFIXES, EMPTY_PREFIXES,
EMPTY_OBJECTS, EMPTY_OBJECTS,
null,
null null
); );
} }
@ -134,7 +132,6 @@ public class AzureInputSourceTest extends EasyMockSupport
EMPTY_URIS, EMPTY_URIS,
EMPTY_PREFIXES, EMPTY_PREFIXES,
objects, objects,
null,
null null
); );
@ -169,55 +166,6 @@ public class AzureInputSourceTest extends EasyMockSupport
EMPTY_URIS, EMPTY_URIS,
prefixes, prefixes,
EMPTY_OBJECTS, EMPTY_OBJECTS,
null,
null
);
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
new MaxSizeSplitHintSpec(null, 1)
);
List<List<CloudObjectLocation>> actualCloudLocationList = cloudObjectStream.map(InputSplit::get)
.collect(Collectors.toList());
verifyAll();
Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
}
@Test
public void test_getPrefixesSplitStream_withFilter_successfullyCreatesCloudLocation_returnsExpectedLocations()
{
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
List<List<CloudObjectLocation>> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
List<CloudBlobHolder> expectedCloudBlobs = ImmutableList.of(cloudBlobDruid1);
Iterator<CloudBlobHolder> expectedCloudBlobsIterator = expectedCloudBlobs.iterator();
String filter = "*.csv";
expectedCloudBlobsIterator = Iterators.filter(
expectedCloudBlobsIterator,
object -> FilenameUtils.wildcardMatch(object.getName(), filter)
);
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, MAX_LISTING_LENGTH)).andReturn(
azureCloudBlobIterable);
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
.andReturn(CLOUD_OBJECT_LOCATION_1);
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
replayAll();
azureInputSource = new AzureInputSource(
storage,
entityFactory,
azureCloudBlobIterableFactory,
azureCloudBlobToLocationConverter,
inputDataConfig,
EMPTY_URIS,
prefixes,
EMPTY_OBJECTS,
filter,
null null
); );
@ -267,7 +215,6 @@ public class AzureInputSourceTest extends EasyMockSupport
EMPTY_URIS, EMPTY_URIS,
prefixes, prefixes,
EMPTY_OBJECTS, EMPTY_OBJECTS,
null,
objectGlob objectGlob
); );
@ -297,7 +244,6 @@ public class AzureInputSourceTest extends EasyMockSupport
EMPTY_URIS, EMPTY_URIS,
prefixes, prefixes,
EMPTY_OBJECTS, EMPTY_OBJECTS,
null,
null null
); );
@ -319,7 +265,6 @@ public class AzureInputSourceTest extends EasyMockSupport
EMPTY_URIS, EMPTY_URIS,
prefixes, prefixes,
EMPTY_OBJECTS, EMPTY_OBJECTS,
null,
null null
); );

View File

@ -65,11 +65,10 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris, @JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes, @JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects, @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@Deprecated @JsonProperty("filter") @Nullable String filter,
@JsonProperty("objectGlob") @Nullable String objectGlob @JsonProperty("objectGlob") @Nullable String objectGlob
) )
{ {
super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, filter, objectGlob); super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, objectGlob);
this.storage = storage; this.storage = storage;
this.inputDataConfig = inputDataConfig; this.inputDataConfig = inputDataConfig;
} }
@ -118,7 +117,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
@Override @Override
public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split) public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
{ {
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getFilter(), getObjectGlob()); return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getObjectGlob());
} }
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject) private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)

View File

@ -116,7 +116,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{ {
final ObjectMapper mapper = createGoogleObjectMapper(); final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withUris = final GoogleCloudStorageInputSource withUris =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null, null); new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
final GoogleCloudStorageInputSource serdeWithUris = final GoogleCloudStorageInputSource serdeWithUris =
mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class); mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withUris, serdeWithUris); Assert.assertEquals(withUris, serdeWithUris);
@ -127,7 +127,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{ {
final ObjectMapper mapper = createGoogleObjectMapper(); final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withPrefixes = final GoogleCloudStorageInputSource withPrefixes =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null, null, null); new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null, null);
final GoogleCloudStorageInputSource serdeWithPrefixes = final GoogleCloudStorageInputSource serdeWithPrefixes =
mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class); mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes); Assert.assertEquals(withPrefixes, serdeWithPrefixes);
@ -144,7 +144,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
null, null,
null, null,
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")), ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")),
null,
null null
); );
final GoogleCloudStorageInputSource serdeWithObjects = final GoogleCloudStorageInputSource serdeWithObjects =
@ -157,7 +156,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
{ {
GoogleCloudStorageInputSource inputSource = GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null, null); new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits( Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
@ -175,7 +174,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
URIS_BEFORE_GLOB, URIS_BEFORE_GLOB,
null, null,
null, null,
null,
"**.csv" "**.csv"
); );
@ -186,26 +184,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList())); Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
} }
@Test
public void testWithUrisFilter()
{
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
STORAGE,
INPUT_DATA_CONFIG,
URIS_BEFORE_GLOB,
null,
null,
"*.csv",
null
);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
null
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test @Test
public void testIllegalObjectsAndPrefixes() public void testIllegalObjectsAndPrefixes()
{ {
@ -217,7 +195,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
null, null,
PREFIXES, PREFIXES,
EXPECTED_OBJECTS.get(0), EXPECTED_OBJECTS.get(0),
null,
"**.csv" "**.csv"
); );
} }
@ -233,7 +210,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
URIS_BEFORE_GLOB, URIS_BEFORE_GLOB,
PREFIXES, PREFIXES,
null, null,
null,
"**.csv" "**.csv"
); );
} }
@ -250,7 +226,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
EasyMock.replay(INPUT_DATA_CONFIG); EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource = GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null, null); new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits( Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
@ -272,7 +248,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
EasyMock.replay(INPUT_DATA_CONFIG); EasyMock.replay(INPUT_DATA_CONFIG);
GoogleCloudStorageInputSource inputSource = GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null, null); new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits( Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
@ -304,7 +280,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
null, null,
PREFIXES, PREFIXES,
null, null,
null,
null null
); );
@ -349,7 +324,6 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
null, null,
PREFIXES, PREFIXES,
null, null,
null,
null null
); );

View File

@ -111,7 +111,6 @@ public class S3InputSource extends CloudObjectInputSource
@JsonProperty("uris") @Nullable List<URI> uris, @JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes, @JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects, @JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
@Deprecated @JsonProperty("filter") @Nullable String filter,
@JsonProperty("objectGlob") @Nullable String objectGlob, @JsonProperty("objectGlob") @Nullable String objectGlob,
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig, @JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
@JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig, @JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
@ -119,7 +118,7 @@ public class S3InputSource extends CloudObjectInputSource
@JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig @JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig
) )
{ {
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, filter, objectGlob); super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob);
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig"); this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig");
Preconditions.checkNotNull(s3Client, "s3Client"); Preconditions.checkNotNull(s3Client, "s3Client");
this.s3InputSourceConfig = s3InputSourceConfig; this.s3InputSourceConfig = s3InputSourceConfig;
@ -203,7 +202,6 @@ public class S3InputSource extends CloudObjectInputSource
uris, uris,
prefixes, prefixes,
objects, objects,
null,
objectGlob, objectGlob,
s3InputSourceConfig, s3InputSourceConfig,
awsProxyConfig, awsProxyConfig,
@ -236,7 +234,6 @@ public class S3InputSource extends CloudObjectInputSource
uris, uris,
prefixes, prefixes,
objects, objects,
null,
objectGlob, objectGlob,
s3InputSourceConfig, s3InputSourceConfig,
awsProxyConfig, awsProxyConfig,
@ -346,7 +343,6 @@ public class S3InputSource extends CloudObjectInputSource
null, null,
null, null,
split.get(), split.get(),
getFilter(),
getObjectGlob(), getObjectGlob(),
getS3InputSourceConfig(), getS3InputSourceConfig(),
getAwsProxyConfig(), getAwsProxyConfig(),