mirror of https://github.com/apache/druid.git
Add filter in cloud object input source for backward compatibility (#13437)
https://github.com/apache/druid/pull/13027 PR replaces `filter` parameter with `objectGlob` in ingestion input source. However, this will cause existing ingestion jobs to fail if they are using a filter already. This PR adds old filter functionality alongside objectGlob to preserve backward compatibility.
This commit is contained in:
parent
58c896ea0b
commit
b12e5f300e
|
@ -21,7 +21,9 @@ package org.apache.druid.data.input.impl;
|
||||||
|
|
||||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.primitives.Ints;
|
import com.google.common.primitives.Ints;
|
||||||
|
import org.apache.commons.io.FilenameUtils;
|
||||||
import org.apache.commons.lang.StringUtils;
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.druid.data.input.AbstractInputSource;
|
import org.apache.druid.data.input.AbstractInputSource;
|
||||||
import org.apache.druid.data.input.InputEntity;
|
import org.apache.druid.data.input.InputEntity;
|
||||||
|
@ -50,6 +52,13 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
||||||
private final List<URI> uris;
|
private final List<URI> uris;
|
||||||
private final List<URI> prefixes;
|
private final List<URI> prefixes;
|
||||||
private final List<CloudObjectLocation> objects;
|
private final List<CloudObjectLocation> objects;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Preserved filter for backward compatibility, should be removed on next major release;
|
||||||
|
* use objectGlob instead.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
private final String filter;
|
||||||
private final String objectGlob;
|
private final String objectGlob;
|
||||||
|
|
||||||
public CloudObjectInputSource(
|
public CloudObjectInputSource(
|
||||||
|
@ -63,6 +72,7 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
||||||
this.uris = uris;
|
this.uris = uris;
|
||||||
this.prefixes = prefixes;
|
this.prefixes = prefixes;
|
||||||
this.objects = objects;
|
this.objects = objects;
|
||||||
|
this.filter = null;
|
||||||
this.objectGlob = null;
|
this.objectGlob = null;
|
||||||
|
|
||||||
illegalArgsChecker();
|
illegalArgsChecker();
|
||||||
|
@ -73,6 +83,7 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
||||||
@Nullable List<URI> uris,
|
@Nullable List<URI> uris,
|
||||||
@Nullable List<URI> prefixes,
|
@Nullable List<URI> prefixes,
|
||||||
@Nullable List<CloudObjectLocation> objects,
|
@Nullable List<CloudObjectLocation> objects,
|
||||||
|
@Deprecated @Nullable String filter,
|
||||||
@Nullable String objectGlob
|
@Nullable String objectGlob
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
|
@ -80,8 +91,12 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
||||||
this.uris = uris;
|
this.uris = uris;
|
||||||
this.prefixes = prefixes;
|
this.prefixes = prefixes;
|
||||||
this.objects = objects;
|
this.objects = objects;
|
||||||
|
this.filter = filter;
|
||||||
this.objectGlob = objectGlob;
|
this.objectGlob = objectGlob;
|
||||||
|
Preconditions.checkArgument(
|
||||||
|
filter == null || objectGlob == null,
|
||||||
|
"Cannot use filter and objectGlob together. Try using objectGlob instead of filter."
|
||||||
|
);
|
||||||
illegalArgsChecker();
|
illegalArgsChecker();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,6 +122,14 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
||||||
return objects;
|
return objects;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
@JsonProperty
|
||||||
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
|
public String getFilter()
|
||||||
|
{
|
||||||
|
return filter;
|
||||||
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
@JsonProperty
|
@JsonProperty
|
||||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||||
|
@ -144,6 +167,8 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
||||||
if (StringUtils.isNotBlank(objectGlob)) {
|
if (StringUtils.isNotBlank(objectGlob)) {
|
||||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
||||||
objectStream = objectStream.filter(object -> m.matches(Paths.get(object.getPath())));
|
objectStream = objectStream.filter(object -> m.matches(Paths.get(object.getPath())));
|
||||||
|
} else if (StringUtils.isNotBlank(filter)) {
|
||||||
|
objectStream = objectStream.filter(object -> FilenameUtils.wildcardMatch(object.getPath(), getFilter()));
|
||||||
}
|
}
|
||||||
|
|
||||||
return objectStream.map(object -> new InputSplit<>(Collections.singletonList(object)));
|
return objectStream.map(object -> new InputSplit<>(Collections.singletonList(object)));
|
||||||
|
@ -155,6 +180,8 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
||||||
if (StringUtils.isNotBlank(objectGlob)) {
|
if (StringUtils.isNotBlank(objectGlob)) {
|
||||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
||||||
uriStream = uriStream.filter(uri -> m.matches(Paths.get(uri.toString())));
|
uriStream = uriStream.filter(uri -> m.matches(Paths.get(uri.toString())));
|
||||||
|
} else if (StringUtils.isNotBlank(filter)) {
|
||||||
|
uriStream = uriStream.filter(uri -> FilenameUtils.wildcardMatch(uri.toString(), filter));
|
||||||
}
|
}
|
||||||
|
|
||||||
return uriStream.map(CloudObjectLocation::new).map(object -> new InputSplit<>(Collections.singletonList(object)));
|
return uriStream.map(CloudObjectLocation::new).map(object -> new InputSplit<>(Collections.singletonList(object)));
|
||||||
|
@ -212,13 +239,14 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
||||||
Objects.equals(uris, that.uris) &&
|
Objects.equals(uris, that.uris) &&
|
||||||
Objects.equals(prefixes, that.prefixes) &&
|
Objects.equals(prefixes, that.prefixes) &&
|
||||||
Objects.equals(objects, that.objects) &&
|
Objects.equals(objects, that.objects) &&
|
||||||
|
Objects.equals(filter, that.filter) &&
|
||||||
Objects.equals(objectGlob, that.objectGlob);
|
Objects.equals(objectGlob, that.objectGlob);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode()
|
public int hashCode()
|
||||||
{
|
{
|
||||||
return Objects.hash(scheme, uris, prefixes, objects, objectGlob);
|
return Objects.hash(scheme, uris, prefixes, objects, filter, objectGlob);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void illegalArgsChecker() throws IllegalArgumentException
|
private void illegalArgsChecker() throws IllegalArgumentException
|
||||||
|
|
|
@ -72,7 +72,7 @@ public class CloudObjectInputSourceTest
|
||||||
public void testGetUris()
|
public void testGetUris()
|
||||||
{
|
{
|
||||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||||
.useConstructor(SCHEME, URIS, null, null, null)
|
.useConstructor(SCHEME, URIS, null, null, null, null)
|
||||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ public class CloudObjectInputSourceTest
|
||||||
public void testGetPrefixes()
|
public void testGetPrefixes()
|
||||||
{
|
{
|
||||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||||
.useConstructor(SCHEME, null, PREFIXES, null, null)
|
.useConstructor(SCHEME, null, PREFIXES, null, null, null)
|
||||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -100,7 +100,7 @@ public class CloudObjectInputSourceTest
|
||||||
public void testGetObjectGlob()
|
public void testGetObjectGlob()
|
||||||
{
|
{
|
||||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||||
.useConstructor(SCHEME, URIS, null, null, "**.parquet")
|
.useConstructor(SCHEME, URIS, null, null, null, "**.parquet")
|
||||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -111,12 +111,12 @@ public class CloudObjectInputSourceTest
|
||||||
public void testInequality()
|
public void testInequality()
|
||||||
{
|
{
|
||||||
CloudObjectInputSource inputSource1 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
CloudObjectInputSource inputSource1 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||||
.useConstructor(SCHEME, URIS, null, null, "**.parquet")
|
.useConstructor(SCHEME, URIS, null, null, null, "**.parquet")
|
||||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
);
|
);
|
||||||
|
|
||||||
CloudObjectInputSource inputSource2 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
CloudObjectInputSource inputSource2 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||||
.useConstructor(SCHEME, URIS, null, null, "**.csv")
|
.useConstructor(SCHEME, URIS, null, null, null, "**.csv")
|
||||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -128,8 +128,31 @@ public class CloudObjectInputSourceTest
|
||||||
@Test
|
@Test
|
||||||
public void testWithUrisFilter()
|
public void testWithUrisFilter()
|
||||||
{
|
{
|
||||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class,
|
||||||
.useConstructor(SCHEME, URIS2, null, null, "**.csv")
|
Mockito.withSettings()
|
||||||
|
.useConstructor(SCHEME, URIS2, null, null, "*.csv", null)
|
||||||
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
|
);
|
||||||
|
|
||||||
|
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
||||||
|
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
|
||||||
|
new MaxSizeSplitHintSpec(null, 1)
|
||||||
|
);
|
||||||
|
|
||||||
|
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
|
||||||
|
|
||||||
|
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
|
||||||
|
|
||||||
|
Assert.assertEquals("*.csv", inputSource.getFilter());
|
||||||
|
Assert.assertEquals(URIS, returnedLocationUris);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithUrisObjectGlob()
|
||||||
|
{
|
||||||
|
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class,
|
||||||
|
Mockito.withSettings()
|
||||||
|
.useConstructor(SCHEME, URIS2, null, null, null, "**.csv")
|
||||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -150,7 +173,7 @@ public class CloudObjectInputSourceTest
|
||||||
public void testWithUris()
|
public void testWithUris()
|
||||||
{
|
{
|
||||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||||
.useConstructor(SCHEME, URIS, null, null, null)
|
.useConstructor(SCHEME, URIS, null, null, null, null)
|
||||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -171,7 +194,7 @@ public class CloudObjectInputSourceTest
|
||||||
public void testWithObjectsFilter()
|
public void testWithObjectsFilter()
|
||||||
{
|
{
|
||||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||||
.useConstructor(SCHEME, null, null, OBJECTS_BEFORE_GLOB, "**.csv")
|
.useConstructor(SCHEME, null, null, OBJECTS_BEFORE_GLOB, null, "**.csv")
|
||||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -192,7 +215,7 @@ public class CloudObjectInputSourceTest
|
||||||
public void testWithObjects()
|
public void testWithObjects()
|
||||||
{
|
{
|
||||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||||
.useConstructor(SCHEME, null, null, OBJECTS, null)
|
.useConstructor(SCHEME, null, null, OBJECTS, null, null)
|
||||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -79,11 +79,12 @@ public class OssInputSource extends CloudObjectInputSource
|
||||||
@JsonProperty("uris") @Nullable List<URI> uris,
|
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
||||||
|
@Deprecated @JsonProperty("filter") @Nullable String filter,
|
||||||
@JsonProperty("objectGlob") @Nullable String objectGlob,
|
@JsonProperty("objectGlob") @Nullable String objectGlob,
|
||||||
@JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig
|
@JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob);
|
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, filter, objectGlob);
|
||||||
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig");
|
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig");
|
||||||
Preconditions.checkNotNull(client, "client");
|
Preconditions.checkNotNull(client, "client");
|
||||||
this.inputSourceConfig = inputSourceConfig;
|
this.inputSourceConfig = inputSourceConfig;
|
||||||
|
@ -136,6 +137,7 @@ public class OssInputSource extends CloudObjectInputSource
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
split.get(),
|
split.get(),
|
||||||
|
getFilter(),
|
||||||
getObjectGlob(),
|
getObjectGlob(),
|
||||||
getOssInputSourceConfig()
|
getOssInputSourceConfig()
|
||||||
);
|
);
|
||||||
|
|
|
@ -143,6 +143,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class);
|
final OssInputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), OssInputSource.class);
|
||||||
|
@ -159,6 +160,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
final OssInputSource serdeWithPrefixes =
|
final OssInputSource serdeWithPrefixes =
|
||||||
|
@ -176,6 +178,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
null,
|
null,
|
||||||
EXPECTED_LOCATION,
|
EXPECTED_LOCATION,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
final OssInputSource serdeWithPrefixes =
|
final OssInputSource serdeWithPrefixes =
|
||||||
|
@ -200,6 +203,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
null,
|
null,
|
||||||
EXPECTED_LOCATION,
|
EXPECTED_LOCATION,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
mockConfigPropertiesWithoutKeyAndSecret
|
mockConfigPropertiesWithoutKeyAndSecret
|
||||||
);
|
);
|
||||||
Assert.assertNotNull(withPrefixes);
|
Assert.assertNotNull(withPrefixes);
|
||||||
|
@ -220,6 +224,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
null,
|
null,
|
||||||
EXPECTED_LOCATION,
|
EXPECTED_LOCATION,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
CLOUD_CONFIG_PROPERTIES
|
CLOUD_CONFIG_PROPERTIES
|
||||||
);
|
);
|
||||||
final OssInputSource serdeWithPrefixes =
|
final OssInputSource serdeWithPrefixes =
|
||||||
|
@ -240,6 +245,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
null,
|
null,
|
||||||
EXPECTED_LOCATION,
|
EXPECTED_LOCATION,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
final OssInputSource serdeWithPrefixes =
|
final OssInputSource serdeWithPrefixes =
|
||||||
|
@ -258,6 +264,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
ImmutableList.of(),
|
ImmutableList.of(),
|
||||||
EXPECTED_LOCATION,
|
EXPECTED_LOCATION,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
final OssInputSource serdeWithPrefixes =
|
final OssInputSource serdeWithPrefixes =
|
||||||
|
@ -277,6 +284,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
EXPECTED_LOCATION,
|
EXPECTED_LOCATION,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -293,6 +301,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
ImmutableList.of(),
|
ImmutableList.of(),
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -309,6 +318,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
EXPECTED_LOCATION,
|
EXPECTED_LOCATION,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -323,6 +333,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -349,6 +360,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -376,6 +388,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -406,6 +419,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -435,6 +449,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
|
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -466,6 +481,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
|
ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)),
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -510,6 +526,7 @@ public class OssInputSourceTest extends InitializedNullHandlingTest
|
||||||
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
|
ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)),
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -74,10 +74,11 @@ public class AzureInputSource extends CloudObjectInputSource
|
||||||
@JsonProperty("uris") @Nullable List<URI> uris,
|
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
||||||
|
@Deprecated @JsonProperty("filter") @Nullable String filter,
|
||||||
@JsonProperty("objectGlob") @Nullable String objectGlob
|
@JsonProperty("objectGlob") @Nullable String objectGlob
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(SCHEME, uris, prefixes, objects, objectGlob);
|
super(SCHEME, uris, prefixes, objects, filter, objectGlob);
|
||||||
this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
|
this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
|
||||||
this.entityFactory = Preconditions.checkNotNull(entityFactory, "AzureEntityFactory");
|
this.entityFactory = Preconditions.checkNotNull(entityFactory, "AzureEntityFactory");
|
||||||
this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
|
this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
|
||||||
|
@ -103,6 +104,7 @@ public class AzureInputSource extends CloudObjectInputSource
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
split.get(),
|
split.get(),
|
||||||
|
getFilter(),
|
||||||
getObjectGlob()
|
getObjectGlob()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ package org.apache.druid.data.input.azure;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Iterators;
|
import com.google.common.collect.Iterators;
|
||||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||||
|
import org.apache.commons.io.FilenameUtils;
|
||||||
import org.apache.druid.data.input.InputSplit;
|
import org.apache.druid.data.input.InputSplit;
|
||||||
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
|
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
|
||||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||||
|
@ -111,6 +112,7 @@ public class AzureInputSourceTest extends EasyMockSupport
|
||||||
EMPTY_URIS,
|
EMPTY_URIS,
|
||||||
EMPTY_PREFIXES,
|
EMPTY_PREFIXES,
|
||||||
EMPTY_OBJECTS,
|
EMPTY_OBJECTS,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -132,6 +134,7 @@ public class AzureInputSourceTest extends EasyMockSupport
|
||||||
EMPTY_URIS,
|
EMPTY_URIS,
|
||||||
EMPTY_PREFIXES,
|
EMPTY_PREFIXES,
|
||||||
objects,
|
objects,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -166,6 +169,55 @@ public class AzureInputSourceTest extends EasyMockSupport
|
||||||
EMPTY_URIS,
|
EMPTY_URIS,
|
||||||
prefixes,
|
prefixes,
|
||||||
EMPTY_OBJECTS,
|
EMPTY_OBJECTS,
|
||||||
|
null,
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
|
||||||
|
new MaxSizeSplitHintSpec(null, 1)
|
||||||
|
);
|
||||||
|
|
||||||
|
List<List<CloudObjectLocation>> actualCloudLocationList = cloudObjectStream.map(InputSplit::get)
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
verifyAll();
|
||||||
|
Assert.assertEquals(expectedCloudLocations, actualCloudLocationList);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test_getPrefixesSplitStream_withFilter_successfullyCreatesCloudLocation_returnsExpectedLocations()
|
||||||
|
{
|
||||||
|
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
|
||||||
|
List<List<CloudObjectLocation>> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
|
||||||
|
List<CloudBlobHolder> expectedCloudBlobs = ImmutableList.of(cloudBlobDruid1);
|
||||||
|
Iterator<CloudBlobHolder> expectedCloudBlobsIterator = expectedCloudBlobs.iterator();
|
||||||
|
String filter = "*.csv";
|
||||||
|
|
||||||
|
expectedCloudBlobsIterator = Iterators.filter(
|
||||||
|
expectedCloudBlobsIterator,
|
||||||
|
object -> FilenameUtils.wildcardMatch(object.getName(), filter)
|
||||||
|
);
|
||||||
|
|
||||||
|
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
|
||||||
|
EasyMock.expect(azureCloudBlobIterableFactory.create(prefixes, MAX_LISTING_LENGTH)).andReturn(
|
||||||
|
azureCloudBlobIterable);
|
||||||
|
EasyMock.expect(azureCloudBlobIterable.iterator()).andReturn(expectedCloudBlobsIterator);
|
||||||
|
EasyMock.expect(azureCloudBlobToLocationConverter.createCloudObjectLocation(cloudBlobDruid1))
|
||||||
|
.andReturn(CLOUD_OBJECT_LOCATION_1);
|
||||||
|
EasyMock.expect(cloudBlobDruid1.getBlobLength()).andReturn(100L).anyTimes();
|
||||||
|
EasyMock.expect(cloudBlobDruid1.getName()).andReturn(BLOB_PATH).anyTimes();
|
||||||
|
|
||||||
|
replayAll();
|
||||||
|
|
||||||
|
azureInputSource = new AzureInputSource(
|
||||||
|
storage,
|
||||||
|
entityFactory,
|
||||||
|
azureCloudBlobIterableFactory,
|
||||||
|
azureCloudBlobToLocationConverter,
|
||||||
|
inputDataConfig,
|
||||||
|
EMPTY_URIS,
|
||||||
|
prefixes,
|
||||||
|
EMPTY_OBJECTS,
|
||||||
|
filter,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -215,6 +267,7 @@ public class AzureInputSourceTest extends EasyMockSupport
|
||||||
EMPTY_URIS,
|
EMPTY_URIS,
|
||||||
prefixes,
|
prefixes,
|
||||||
EMPTY_OBJECTS,
|
EMPTY_OBJECTS,
|
||||||
|
null,
|
||||||
objectGlob
|
objectGlob
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -244,6 +297,7 @@ public class AzureInputSourceTest extends EasyMockSupport
|
||||||
EMPTY_URIS,
|
EMPTY_URIS,
|
||||||
prefixes,
|
prefixes,
|
||||||
EMPTY_OBJECTS,
|
EMPTY_OBJECTS,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -265,6 +319,7 @@ public class AzureInputSourceTest extends EasyMockSupport
|
||||||
EMPTY_URIS,
|
EMPTY_URIS,
|
||||||
prefixes,
|
prefixes,
|
||||||
EMPTY_OBJECTS,
|
EMPTY_OBJECTS,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -65,10 +65,11 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
|
||||||
@JsonProperty("uris") @Nullable List<URI> uris,
|
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
||||||
|
@Deprecated @JsonProperty("filter") @Nullable String filter,
|
||||||
@JsonProperty("objectGlob") @Nullable String objectGlob
|
@JsonProperty("objectGlob") @Nullable String objectGlob
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, objectGlob);
|
super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, filter, objectGlob);
|
||||||
this.storage = storage;
|
this.storage = storage;
|
||||||
this.inputDataConfig = inputDataConfig;
|
this.inputDataConfig = inputDataConfig;
|
||||||
}
|
}
|
||||||
|
@ -117,7 +118,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
|
||||||
@Override
|
@Override
|
||||||
public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
|
public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
|
||||||
{
|
{
|
||||||
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getObjectGlob());
|
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getFilter(), getObjectGlob());
|
||||||
}
|
}
|
||||||
|
|
||||||
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
|
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
|
||||||
|
|
|
@ -116,7 +116,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
{
|
{
|
||||||
final ObjectMapper mapper = createGoogleObjectMapper();
|
final ObjectMapper mapper = createGoogleObjectMapper();
|
||||||
final GoogleCloudStorageInputSource withUris =
|
final GoogleCloudStorageInputSource withUris =
|
||||||
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
|
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null, null);
|
||||||
final GoogleCloudStorageInputSource serdeWithUris =
|
final GoogleCloudStorageInputSource serdeWithUris =
|
||||||
mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class);
|
mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class);
|
||||||
Assert.assertEquals(withUris, serdeWithUris);
|
Assert.assertEquals(withUris, serdeWithUris);
|
||||||
|
@ -127,7 +127,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
{
|
{
|
||||||
final ObjectMapper mapper = createGoogleObjectMapper();
|
final ObjectMapper mapper = createGoogleObjectMapper();
|
||||||
final GoogleCloudStorageInputSource withPrefixes =
|
final GoogleCloudStorageInputSource withPrefixes =
|
||||||
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null, null);
|
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, ImmutableList.of(), PREFIXES, null, null, null);
|
||||||
final GoogleCloudStorageInputSource serdeWithPrefixes =
|
final GoogleCloudStorageInputSource serdeWithPrefixes =
|
||||||
mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class);
|
mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class);
|
||||||
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
|
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
|
||||||
|
@ -144,6 +144,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")),
|
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")),
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
final GoogleCloudStorageInputSource serdeWithObjects =
|
final GoogleCloudStorageInputSource serdeWithObjects =
|
||||||
|
@ -156,7 +157,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
{
|
{
|
||||||
|
|
||||||
GoogleCloudStorageInputSource inputSource =
|
GoogleCloudStorageInputSource inputSource =
|
||||||
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null);
|
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, EXPECTED_URIS, ImmutableList.of(), null, null, null);
|
||||||
|
|
||||||
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
||||||
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
|
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
|
||||||
|
@ -174,6 +175,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
URIS_BEFORE_GLOB,
|
URIS_BEFORE_GLOB,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
"**.csv"
|
"**.csv"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -184,6 +186,26 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
|
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWithUrisFilter()
|
||||||
|
{
|
||||||
|
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
|
||||||
|
STORAGE,
|
||||||
|
INPUT_DATA_CONFIG,
|
||||||
|
URIS_BEFORE_GLOB,
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
"*.csv",
|
||||||
|
null
|
||||||
|
);
|
||||||
|
|
||||||
|
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
||||||
|
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
|
||||||
|
null
|
||||||
|
);
|
||||||
|
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testIllegalObjectsAndPrefixes()
|
public void testIllegalObjectsAndPrefixes()
|
||||||
{
|
{
|
||||||
|
@ -195,6 +217,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
null,
|
null,
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
EXPECTED_OBJECTS.get(0),
|
EXPECTED_OBJECTS.get(0),
|
||||||
|
null,
|
||||||
"**.csv"
|
"**.csv"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -210,6 +233,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
URIS_BEFORE_GLOB,
|
URIS_BEFORE_GLOB,
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
"**.csv"
|
"**.csv"
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -226,7 +250,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
EasyMock.replay(INPUT_DATA_CONFIG);
|
EasyMock.replay(INPUT_DATA_CONFIG);
|
||||||
|
|
||||||
GoogleCloudStorageInputSource inputSource =
|
GoogleCloudStorageInputSource inputSource =
|
||||||
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
|
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null, null);
|
||||||
|
|
||||||
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
||||||
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
|
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
|
||||||
|
@ -248,7 +272,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
EasyMock.replay(INPUT_DATA_CONFIG);
|
EasyMock.replay(INPUT_DATA_CONFIG);
|
||||||
|
|
||||||
GoogleCloudStorageInputSource inputSource =
|
GoogleCloudStorageInputSource inputSource =
|
||||||
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null);
|
new GoogleCloudStorageInputSource(STORAGE, INPUT_DATA_CONFIG, null, PREFIXES, null, null, null);
|
||||||
|
|
||||||
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
||||||
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
|
new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
|
||||||
|
@ -280,6 +304,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
null,
|
null,
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -324,6 +349,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
||||||
null,
|
null,
|
||||||
PREFIXES,
|
PREFIXES,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
null
|
null
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -111,6 +111,7 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
@JsonProperty("uris") @Nullable List<URI> uris,
|
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
||||||
|
@Deprecated @JsonProperty("filter") @Nullable String filter,
|
||||||
@JsonProperty("objectGlob") @Nullable String objectGlob,
|
@JsonProperty("objectGlob") @Nullable String objectGlob,
|
||||||
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
|
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
|
||||||
@JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
|
@JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
|
||||||
|
@ -118,7 +119,7 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
@JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig
|
@JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig
|
||||||
)
|
)
|
||||||
{
|
{
|
||||||
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob);
|
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, filter, objectGlob);
|
||||||
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig");
|
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig");
|
||||||
Preconditions.checkNotNull(s3Client, "s3Client");
|
Preconditions.checkNotNull(s3Client, "s3Client");
|
||||||
this.s3InputSourceConfig = s3InputSourceConfig;
|
this.s3InputSourceConfig = s3InputSourceConfig;
|
||||||
|
@ -202,6 +203,7 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
uris,
|
uris,
|
||||||
prefixes,
|
prefixes,
|
||||||
objects,
|
objects,
|
||||||
|
null,
|
||||||
objectGlob,
|
objectGlob,
|
||||||
s3InputSourceConfig,
|
s3InputSourceConfig,
|
||||||
awsProxyConfig,
|
awsProxyConfig,
|
||||||
|
@ -234,6 +236,7 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
uris,
|
uris,
|
||||||
prefixes,
|
prefixes,
|
||||||
objects,
|
objects,
|
||||||
|
null,
|
||||||
objectGlob,
|
objectGlob,
|
||||||
s3InputSourceConfig,
|
s3InputSourceConfig,
|
||||||
awsProxyConfig,
|
awsProxyConfig,
|
||||||
|
@ -343,6 +346,7 @@ public class S3InputSource extends CloudObjectInputSource
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
split.get(),
|
split.get(),
|
||||||
|
getFilter(),
|
||||||
getObjectGlob(),
|
getObjectGlob(),
|
||||||
getS3InputSourceConfig(),
|
getS3InputSourceConfig(),
|
||||||
getAwsProxyConfig(),
|
getAwsProxyConfig(),
|
||||||
|
|
Loading…
Reference in New Issue