mirror of https://github.com/apache/druid.git
Use standard library to correctly glob and stop at the correct folder structure when filtering cloud objects (#13027)
* Use standard library to correctly glob and stop at the correct folder structure when filtering cloud objects. Removed: import org.apache.commons.io.FilenameUtils; Add: import java.nio.file.FileSystems; import java.nio.file.PathMatcher; import java.nio.file.Paths; * Forgot to update CloudObjectInputSource as well. * Fix tests. * Removed unused exceptions. * Able to reduced user mistakes, by removing the protocol and the bucket on filter. * add 1 more test. * add comment on filterWithoutProtocolAndBucket * Fix lint issue. * Fix another lint issue. * Replace all mention of filter -> objectGlob per convo here: https://github.com/apache/druid/pull/13027#issuecomment-1266410707 * fix 1 bad constructor. * Fix the documentation. * Don’t do anything clever with the object path. * Remove unused imports. * Fix spelling error. * Fix incorrect search and replace. * Addressing Gian’s comment. * add filename on .spelling * Fix documentation. * fix documentation again Co-authored-by: Didip Kerabat <didip@apple.com>
This commit is contained in:
parent
77478f25fb
commit
56d5c9780d
|
@ -22,7 +22,6 @@ package org.apache.druid.data.input.impl;
|
|||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.druid.data.input.AbstractInputSource;
|
||||
import org.apache.druid.data.input.InputEntity;
|
||||
|
@ -36,6 +35,9 @@ import org.apache.druid.utils.CollectionUtils;
|
|||
import javax.annotation.Nullable;
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -48,7 +50,7 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
|||
private final List<URI> uris;
|
||||
private final List<URI> prefixes;
|
||||
private final List<CloudObjectLocation> objects;
|
||||
private final String filter;
|
||||
private final String objectGlob;
|
||||
|
||||
public CloudObjectInputSource(
|
||||
String scheme,
|
||||
|
@ -61,7 +63,7 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
|||
this.uris = uris;
|
||||
this.prefixes = prefixes;
|
||||
this.objects = objects;
|
||||
this.filter = null;
|
||||
this.objectGlob = null;
|
||||
|
||||
illegalArgsChecker();
|
||||
}
|
||||
|
@ -71,14 +73,14 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
|||
@Nullable List<URI> uris,
|
||||
@Nullable List<URI> prefixes,
|
||||
@Nullable List<CloudObjectLocation> objects,
|
||||
@Nullable String filter
|
||||
@Nullable String objectGlob
|
||||
)
|
||||
{
|
||||
this.scheme = scheme;
|
||||
this.uris = uris;
|
||||
this.prefixes = prefixes;
|
||||
this.objects = objects;
|
||||
this.filter = filter;
|
||||
this.objectGlob = objectGlob;
|
||||
|
||||
illegalArgsChecker();
|
||||
}
|
||||
|
@ -108,9 +110,9 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
|||
@Nullable
|
||||
@JsonProperty
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public String getFilter()
|
||||
public String getObjectGlob()
|
||||
{
|
||||
return filter;
|
||||
return objectGlob;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -125,8 +127,8 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
|||
* only if {@link #prefixes} is set, otherwise the splits are created directly from {@link #uris} or {@link #objects}.
|
||||
* Calling if {@link #prefixes} is not set is likely to either lead to an empty iterator or null pointer exception.
|
||||
*
|
||||
* If {@link #filter} is set, the filter will be applied on {@link #uris} or {@link #objects}.
|
||||
* {@link #filter} uses a glob notation, for example: "*.parquet".
|
||||
* If {@link #objectGlob} is set, the objectGlob will be applied on {@link #uris} or {@link #objects}.
|
||||
* {@link #objectGlob} uses a glob notation, for example: "**.parquet".
|
||||
*/
|
||||
protected abstract Stream<InputSplit<List<CloudObjectLocation>>> getPrefixesSplitStream(SplitHintSpec splitHintSpec);
|
||||
|
||||
|
@ -139,8 +141,9 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
|||
if (!CollectionUtils.isNullOrEmpty(objects)) {
|
||||
Stream<CloudObjectLocation> objectStream = objects.stream();
|
||||
|
||||
if (StringUtils.isNotBlank(filter)) {
|
||||
objectStream = objectStream.filter(object -> FilenameUtils.wildcardMatch(object.getPath(), filter));
|
||||
if (StringUtils.isNotBlank(objectGlob)) {
|
||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
||||
objectStream = objectStream.filter(object -> m.matches(Paths.get(object.getPath())));
|
||||
}
|
||||
|
||||
return objectStream.map(object -> new InputSplit<>(Collections.singletonList(object)));
|
||||
|
@ -149,8 +152,9 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
|||
if (!CollectionUtils.isNullOrEmpty(uris)) {
|
||||
Stream<URI> uriStream = uris.stream();
|
||||
|
||||
if (StringUtils.isNotBlank(filter)) {
|
||||
uriStream = uriStream.filter(uri -> FilenameUtils.wildcardMatch(uri.toString(), filter));
|
||||
if (StringUtils.isNotBlank(objectGlob)) {
|
||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
||||
uriStream = uriStream.filter(uri -> m.matches(Paths.get(uri.toString())));
|
||||
}
|
||||
|
||||
return uriStream.map(CloudObjectLocation::new).map(object -> new InputSplit<>(Collections.singletonList(object)));
|
||||
|
@ -208,13 +212,13 @@ public abstract class CloudObjectInputSource extends AbstractInputSource
|
|||
Objects.equals(uris, that.uris) &&
|
||||
Objects.equals(prefixes, that.prefixes) &&
|
||||
Objects.equals(objects, that.objects) &&
|
||||
Objects.equals(filter, that.filter);
|
||||
Objects.equals(objectGlob, that.objectGlob);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode()
|
||||
{
|
||||
return Objects.hash(scheme, uris, prefixes, objects, filter);
|
||||
return Objects.hash(scheme, uris, prefixes, objects, objectGlob);
|
||||
}
|
||||
|
||||
private void illegalArgsChecker() throws IllegalArgumentException
|
||||
|
|
|
@ -29,6 +29,9 @@ import org.junit.rules.ExpectedException;
|
|||
import org.mockito.Mockito;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
@ -57,7 +60,7 @@ public class CloudObjectInputSourceTest
|
|||
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv"))
|
||||
);
|
||||
|
||||
private static final List<CloudObjectLocation> OBJECTS_BEFORE_FILTER = Arrays.asList(
|
||||
private static final List<CloudObjectLocation> OBJECTS_BEFORE_GLOB = Arrays.asList(
|
||||
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
|
||||
new CloudObjectLocation(URI.create("s3://bar/foo/file2.parquet"))
|
||||
);
|
||||
|
@ -94,31 +97,31 @@ public class CloudObjectInputSourceTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetFilter()
|
||||
public void testGetObjectGlob()
|
||||
{
|
||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||
.useConstructor(SCHEME, URIS, null, null, "*.parquet")
|
||||
.useConstructor(SCHEME, URIS, null, null, "**.parquet")
|
||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||
);
|
||||
|
||||
Assert.assertEquals("*.parquet", inputSource.getFilter());
|
||||
Assert.assertEquals("**.parquet", inputSource.getObjectGlob());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testInequality()
|
||||
{
|
||||
CloudObjectInputSource inputSource1 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||
.useConstructor(SCHEME, URIS, null, null, "*.parquet")
|
||||
.useConstructor(SCHEME, URIS, null, null, "**.parquet")
|
||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||
);
|
||||
|
||||
CloudObjectInputSource inputSource2 = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||
.useConstructor(SCHEME, URIS, null, null, "*.csv")
|
||||
.useConstructor(SCHEME, URIS, null, null, "**.csv")
|
||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||
);
|
||||
|
||||
Assert.assertEquals("*.parquet", inputSource1.getFilter());
|
||||
Assert.assertEquals("*.csv", inputSource2.getFilter());
|
||||
Assert.assertEquals("**.parquet", inputSource1.getObjectGlob());
|
||||
Assert.assertEquals("**.csv", inputSource2.getObjectGlob());
|
||||
Assert.assertFalse(inputSource2.equals(inputSource1));
|
||||
}
|
||||
|
||||
|
@ -126,7 +129,7 @@ public class CloudObjectInputSourceTest
|
|||
public void testWithUrisFilter()
|
||||
{
|
||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||
.useConstructor(SCHEME, URIS2, null, null, "*.csv")
|
||||
.useConstructor(SCHEME, URIS2, null, null, "**.csv")
|
||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||
);
|
||||
|
||||
|
@ -139,7 +142,7 @@ public class CloudObjectInputSourceTest
|
|||
|
||||
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
|
||||
|
||||
Assert.assertEquals("*.csv", inputSource.getFilter());
|
||||
Assert.assertEquals("**.csv", inputSource.getObjectGlob());
|
||||
Assert.assertEquals(URIS, returnedLocationUris);
|
||||
}
|
||||
|
||||
|
@ -160,7 +163,7 @@ public class CloudObjectInputSourceTest
|
|||
|
||||
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
|
||||
|
||||
Assert.assertEquals(null, inputSource.getFilter());
|
||||
Assert.assertEquals(null, inputSource.getObjectGlob());
|
||||
Assert.assertEquals(URIS, returnedLocationUris);
|
||||
}
|
||||
|
||||
|
@ -168,7 +171,7 @@ public class CloudObjectInputSourceTest
|
|||
public void testWithObjectsFilter()
|
||||
{
|
||||
CloudObjectInputSource inputSource = Mockito.mock(CloudObjectInputSource.class, Mockito.withSettings()
|
||||
.useConstructor(SCHEME, null, null, OBJECTS_BEFORE_FILTER, "*.csv")
|
||||
.useConstructor(SCHEME, null, null, OBJECTS_BEFORE_GLOB, "**.csv")
|
||||
.defaultAnswer(Mockito.CALLS_REAL_METHODS)
|
||||
);
|
||||
|
||||
|
@ -181,7 +184,7 @@ public class CloudObjectInputSourceTest
|
|||
|
||||
List<URI> returnedLocationUris = returnedLocations.stream().map(object -> object.toUri(SCHEME)).collect(Collectors.toList());
|
||||
|
||||
Assert.assertEquals("*.csv", inputSource.getFilter());
|
||||
Assert.assertEquals("**.csv", inputSource.getObjectGlob());
|
||||
Assert.assertEquals(URIS, returnedLocationUris);
|
||||
}
|
||||
|
||||
|
@ -200,7 +203,19 @@ public class CloudObjectInputSourceTest
|
|||
|
||||
List<CloudObjectLocation> returnedLocations = splits.map(InputSplit::get).collect(Collectors.toList()).get(0);
|
||||
|
||||
Assert.assertEquals(null, inputSource.getFilter());
|
||||
Assert.assertEquals(null, inputSource.getObjectGlob());
|
||||
Assert.assertEquals(OBJECTS, returnedLocations);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGlobSubdirectories()
|
||||
{
|
||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:**.parquet");
|
||||
Assert.assertTrue(m.matches(Paths.get("db/date=2022-08-01/001.parquet")));
|
||||
Assert.assertTrue(m.matches(Paths.get("db/date=2022-08-01/002.parquet")));
|
||||
|
||||
PathMatcher m2 = FileSystems.getDefault().getPathMatcher("glob:db/date=2022-08-01/*.parquet");
|
||||
Assert.assertTrue(m2.matches(Paths.get("db/date=2022-08-01/001.parquet")));
|
||||
Assert.assertFalse(m2.matches(Paths.get("db/date=2022-08-01/_junk/0/001.parquet")));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "s3",
|
||||
"filter": "*.json",
|
||||
"objectGlob": "**.json",
|
||||
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"]
|
||||
},
|
||||
"inputFormat": {
|
||||
|
@ -63,7 +63,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "s3",
|
||||
"filter": "*.parquet",
|
||||
"objectGlob": "**.parquet",
|
||||
"prefixes": ["s3://foo/bar/", "s3://bar/foo/"]
|
||||
},
|
||||
"inputFormat": {
|
||||
|
@ -81,7 +81,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "s3",
|
||||
"filter": "*.json",
|
||||
"objectGlob": "**.json",
|
||||
"objects": [
|
||||
{ "bucket": "foo", "path": "bar/file1.json"},
|
||||
{ "bucket": "bar", "path": "foo/file2.json"}
|
||||
|
@ -101,7 +101,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "s3",
|
||||
"filter": "*.json",
|
||||
"objectGlob": "**.json",
|
||||
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
|
||||
"properties": {
|
||||
"accessKeyId": "KLJ78979SDFdS2",
|
||||
|
@ -122,7 +122,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "s3",
|
||||
"filter": "*.json",
|
||||
"objectGlob": "**.json",
|
||||
"uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"],
|
||||
"properties": {
|
||||
"accessKeyId": "KLJ78979SDFdS2",
|
||||
|
@ -182,7 +182,7 @@ Sample specs:
|
|||
|uris|JSON array of URIs where S3 objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|objects|JSON array of S3 Objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no|
|
||||
|objectGlob|A glob for the object part of the S3 URI. In the URI `s3://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br />The glob must match the entire object part, not just the filename. For example, the glob `*.json` does not match `s3://foo/bar/file.json`, because the object part is `bar/file.json`, and the`*` does not match the slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br />For more information, refer to the documentation for [`FileSystem#getPathMatcher`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-).|None|no|
|
||||
| endpointConfig |Config for overriding the default S3 endpoint and signing region. This would allow ingesting data from a different S3 store. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given)
|
||||
| clientConfig |S3 client properties for the overridden s3 endpoint. This is used in conjunction with `endPointConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given)
|
||||
| proxyConfig |Properties for specifying proxy information for the overridden s3 endpoint. This is used in conjunction with `clientConfig`. Please see [s3 config](../development/extensions-core/s3.md#connecting-to-s3-configuration) for more information.|None|No (defaults will be used if not given)
|
||||
|
@ -226,7 +226,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "google",
|
||||
"filter": "*.json",
|
||||
"objectGlob": "**.json",
|
||||
"uris": ["gs://foo/bar/file.json", "gs://bar/foo/file2.json"]
|
||||
},
|
||||
"inputFormat": {
|
||||
|
@ -243,7 +243,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "google",
|
||||
"filter": "*.parquet",
|
||||
"objectGlob": "**.parquet",
|
||||
"prefixes": ["gs://foo/bar/", "gs://bar/foo/"]
|
||||
},
|
||||
"inputFormat": {
|
||||
|
@ -261,7 +261,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "google",
|
||||
"filter": "*.json",
|
||||
"objectGlob": "**.json",
|
||||
"objects": [
|
||||
{ "bucket": "foo", "path": "bar/file1.json"},
|
||||
{ "bucket": "bar", "path": "foo/file2.json"}
|
||||
|
@ -281,7 +281,7 @@ Sample specs:
|
|||
|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested. Empty objects starting with one of the given prefixes will be skipped.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|objects|JSON array of Google Cloud Storage objects to be ingested.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no|
|
||||
|objectGlob|A glob for the object part of the S3 URI. In the URI `s3://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br />The glob must match the entire object part, not just the filename. For example, the glob `*.json` does not match `s3://foo/bar/file.json`, because the object part is `bar/file.json`, and the`*` does not match the slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br />For more information, refer to the documentation for [`FileSystem#getPathMatcher`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-).|None|no|
|
||||
|
||||
Note that the Google Cloud Storage input source will skip all empty objects only when `prefixes` is specified.
|
||||
|
||||
|
@ -307,7 +307,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "azure",
|
||||
"filter": "*.json",
|
||||
"objectGlob": "**.json",
|
||||
"uris": ["azure://container/prefix1/file.json", "azure://container/prefix2/file2.json"]
|
||||
},
|
||||
"inputFormat": {
|
||||
|
@ -324,7 +324,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "azure",
|
||||
"filter": "*.parquet",
|
||||
"objectGlob": "**.parquet",
|
||||
"prefixes": ["azure://container/prefix1/", "azure://container/prefix2/"]
|
||||
},
|
||||
"inputFormat": {
|
||||
|
@ -342,7 +342,7 @@ Sample specs:
|
|||
"type": "index_parallel",
|
||||
"inputSource": {
|
||||
"type": "azure",
|
||||
"filter": "*.json",
|
||||
"objectGlob": "**.json",
|
||||
"objects": [
|
||||
{ "bucket": "container", "path": "prefix1/file1.json"},
|
||||
{ "bucket": "container", "path": "prefix2/file2.json"}
|
||||
|
@ -362,7 +362,7 @@ Sample specs:
|
|||
|uris|JSON array of URIs where the Azure objects to be ingested are located, in the form `azure://<container>/<path-to-file>`|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|prefixes|JSON array of URI prefixes for the locations of Azure objects to ingest, in the form `azure://<container>/<prefix>`. Empty objects starting with one of the given prefixes are skipped.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|objects|JSON array of Azure objects to ingest.|None|`uris` or `prefixes` or `objects` must be set|
|
||||
|filter|A wildcard filter for files. See [here](http://commons.apache.org/proper/commons-io/apidocs/org/apache/commons/io/filefilter/WildcardFileFilter) for more information. Files matching the filter criteria are considered for ingestion. Files not matching the filter criteria are ignored.|None|no|
|
||||
|objectGlob|A glob for the object part of the S3 URI. In the URI `s3://foo/bar/file.json`, the glob is applied to `bar/file.json`.<br /><br />The glob must match the entire object part, not just the filename. For example, the glob `*.json` does not match `s3://foo/bar/file.json`, because the object part is `bar/file.json`, and the`*` does not match the slash. To match all objects ending in `.json`, use `**.json` instead.<br /><br />For more information, refer to the documentation for [`FileSystem#getPathMatcher`](https://docs.oracle.com/javase/8/docs/api/java/nio/file/FileSystem.html#getPathMatcher-java.lang.String-).|None|no|
|
||||
|
||||
Note that the Azure input source skips all empty objects only when `prefixes` is specified.
|
||||
|
||||
|
@ -752,7 +752,7 @@ Compared to the other native batch input sources, SQL input source behaves diffe
|
|||
|
||||
The Combining input source lets you read data from multiple input sources.
|
||||
It identifies the splits from delegate input sources and uses a worker task to process each split.
|
||||
Use the Combining input source only if all the delegates are splittable and can be used by the [Parallel task](./native-batch.md).
|
||||
Use the Combining input source only if all the delegates are splittable and can be used by the [Parallel task](./native-batch.md).
|
||||
|
||||
Similar to other input sources, the Combining input source supports a single `inputFormat`.
|
||||
Delegate input sources that require an `inputFormat` must have the same format for input data.
|
||||
|
|
|
@ -28,7 +28,6 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.Iterators;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.druid.data.input.InputEntity;
|
||||
import org.apache.druid.data.input.InputFileAttribute;
|
||||
|
@ -45,6 +44,9 @@ import org.apache.druid.utils.Streams;
|
|||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -77,11 +79,11 @@ public class OssInputSource extends CloudObjectInputSource
|
|||
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
||||
@JsonProperty("filter") @Nullable String filter,
|
||||
@JsonProperty("objectGlob") @Nullable String objectGlob,
|
||||
@JsonProperty("properties") @Nullable OssClientConfig inputSourceConfig
|
||||
)
|
||||
{
|
||||
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, filter);
|
||||
super(OssStorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob);
|
||||
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "inputDataConfig");
|
||||
Preconditions.checkNotNull(client, "client");
|
||||
this.inputSourceConfig = inputSourceConfig;
|
||||
|
@ -134,7 +136,7 @@ public class OssInputSource extends CloudObjectInputSource
|
|||
null,
|
||||
null,
|
||||
split.get(),
|
||||
getFilter(),
|
||||
getObjectGlob(),
|
||||
getOssInputSourceConfig()
|
||||
);
|
||||
}
|
||||
|
@ -168,7 +170,7 @@ public class OssInputSource extends CloudObjectInputSource
|
|||
"uris=" + getUris() +
|
||||
", prefixes=" + getPrefixes() +
|
||||
", objects=" + getObjects() +
|
||||
", filter=" + getFilter() +
|
||||
", objectGlob=" + getObjectGlob() +
|
||||
", ossInputSourceConfig=" + getOssInputSourceConfig() +
|
||||
'}';
|
||||
}
|
||||
|
@ -182,11 +184,13 @@ public class OssInputSource extends CloudObjectInputSource
|
|||
inputDataConfig.getMaxListingLength()
|
||||
);
|
||||
|
||||
// Skip files that didn't match filter.
|
||||
if (StringUtils.isNotBlank(getFilter())) {
|
||||
// Skip files that didn't match glob filter.
|
||||
if (StringUtils.isNotBlank(getObjectGlob())) {
|
||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
||||
|
||||
iterator = Iterators.filter(
|
||||
iterator,
|
||||
object -> FilenameUtils.wildcardMatch(object.getKey(), getFilter())
|
||||
object -> m.matches(Paths.get(object.getKey()))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Iterators;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.druid.data.input.InputFileAttribute;
|
||||
import org.apache.druid.data.input.InputSplit;
|
||||
|
@ -42,6 +41,9 @@ import org.apache.druid.utils.Streams;
|
|||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -72,10 +74,10 @@ public class AzureInputSource extends CloudObjectInputSource
|
|||
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
||||
@JsonProperty("filter") @Nullable String filter
|
||||
@JsonProperty("objectGlob") @Nullable String objectGlob
|
||||
)
|
||||
{
|
||||
super(SCHEME, uris, prefixes, objects, filter);
|
||||
super(SCHEME, uris, prefixes, objects, objectGlob);
|
||||
this.storage = Preconditions.checkNotNull(storage, "AzureStorage");
|
||||
this.entityFactory = Preconditions.checkNotNull(entityFactory, "AzureEntityFactory");
|
||||
this.azureCloudBlobIterableFactory = Preconditions.checkNotNull(
|
||||
|
@ -101,7 +103,7 @@ public class AzureInputSource extends CloudObjectInputSource
|
|||
null,
|
||||
null,
|
||||
split.get(),
|
||||
getFilter()
|
||||
getObjectGlob()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -131,11 +133,13 @@ public class AzureInputSource extends CloudObjectInputSource
|
|||
return () -> {
|
||||
Iterator<CloudBlobHolder> iterator = azureCloudBlobIterableFactory.create(getPrefixes(), inputDataConfig.getMaxListingLength()).iterator();
|
||||
|
||||
// Skip files that didn't match filter.
|
||||
if (StringUtils.isNotBlank(getFilter())) {
|
||||
// Skip files that didn't match glob filter.
|
||||
if (StringUtils.isNotBlank(getObjectGlob())) {
|
||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
||||
|
||||
iterator = Iterators.filter(
|
||||
iterator,
|
||||
object -> FilenameUtils.wildcardMatch(object.getName(), getFilter())
|
||||
object -> m.matches(Paths.get(object.getName()))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -183,7 +187,7 @@ public class AzureInputSource extends CloudObjectInputSource
|
|||
"uris=" + getUris() +
|
||||
", prefixes=" + getPrefixes() +
|
||||
", objects=" + getObjects() +
|
||||
", filter=" + getFilter() +
|
||||
", objectGlob=" + getObjectGlob() +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ package org.apache.druid.data.input.azure;
|
|||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.Iterators;
|
||||
import nl.jqno.equalsverifier.EqualsVerifier;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.druid.data.input.InputSplit;
|
||||
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
|
||||
import org.apache.druid.data.input.impl.CloudObjectLocation;
|
||||
|
@ -42,6 +41,9 @@ import org.junit.Before;
|
|||
import org.junit.Test;
|
||||
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -178,17 +180,19 @@ public class AzureInputSourceTest extends EasyMockSupport
|
|||
}
|
||||
|
||||
@Test
|
||||
public void test_getPrefixesSplitStream_withFilter_successfullyCreatesCloudLocation_returnsExpectedLocations()
|
||||
public void test_getPrefixesSplitStream_withObjectGlob_successfullyCreatesCloudLocation_returnsExpectedLocations()
|
||||
{
|
||||
List<URI> prefixes = ImmutableList.of(PREFIX_URI);
|
||||
List<List<CloudObjectLocation>> expectedCloudLocations = ImmutableList.of(ImmutableList.of(CLOUD_OBJECT_LOCATION_1));
|
||||
List<CloudBlobHolder> expectedCloudBlobs = ImmutableList.of(cloudBlobDruid1);
|
||||
Iterator<CloudBlobHolder> expectedCloudBlobsIterator = expectedCloudBlobs.iterator();
|
||||
String filter = "*.csv";
|
||||
String objectGlob = "**.csv";
|
||||
|
||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + objectGlob);
|
||||
|
||||
expectedCloudBlobsIterator = Iterators.filter(
|
||||
expectedCloudBlobsIterator,
|
||||
object -> FilenameUtils.wildcardMatch(object.getName(), filter)
|
||||
object -> m.matches(Paths.get(object.getName()))
|
||||
);
|
||||
|
||||
EasyMock.expect(inputDataConfig.getMaxListingLength()).andReturn(MAX_LISTING_LENGTH);
|
||||
|
@ -211,7 +215,7 @@ public class AzureInputSourceTest extends EasyMockSupport
|
|||
EMPTY_URIS,
|
||||
prefixes,
|
||||
EMPTY_OBJECTS,
|
||||
filter
|
||||
objectGlob
|
||||
);
|
||||
|
||||
Stream<InputSplit<List<CloudObjectLocation>>> cloudObjectStream = azureInputSource.getPrefixesSplitStream(
|
||||
|
@ -265,7 +269,7 @@ public class AzureInputSourceTest extends EasyMockSupport
|
|||
);
|
||||
|
||||
String actualToString = azureInputSource.toString();
|
||||
Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[], filter=null}", actualToString);
|
||||
Assert.assertEquals("AzureInputSource{uris=[], prefixes=[azure://container/blob], objects=[], objectGlob=null}", actualToString);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -279,7 +283,7 @@ public class AzureInputSourceTest extends EasyMockSupport
|
|||
.withNonnullFields("azureCloudBlobIterableFactory")
|
||||
.withNonnullFields("azureCloudBlobToLocationConverter")
|
||||
.withNonnullFields("inputDataConfig")
|
||||
.withNonnullFields("filter")
|
||||
.withNonnullFields("objectGlob")
|
||||
.withNonnullFields("scheme")
|
||||
.verify();
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.api.services.storage.model.StorageObject;
|
||||
import com.google.common.collect.Iterators;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.druid.data.input.InputEntity;
|
||||
import org.apache.druid.data.input.InputFileAttribute;
|
||||
|
@ -44,6 +43,9 @@ import javax.annotation.Nonnull;
|
|||
import javax.annotation.Nullable;
|
||||
import java.math.BigInteger;
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -63,10 +65,10 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
|
|||
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
||||
@JsonProperty("filter") @Nullable String filter
|
||||
@JsonProperty("objectGlob") @Nullable String objectGlob
|
||||
)
|
||||
{
|
||||
super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, filter);
|
||||
super(GoogleStorageDruidModule.SCHEME_GS, uris, prefixes, objects, objectGlob);
|
||||
this.storage = storage;
|
||||
this.inputDataConfig = inputDataConfig;
|
||||
}
|
||||
|
@ -115,7 +117,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
|
|||
@Override
|
||||
public SplittableInputSource<List<CloudObjectLocation>> withSplit(InputSplit<List<CloudObjectLocation>> split)
|
||||
{
|
||||
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getFilter());
|
||||
return new GoogleCloudStorageInputSource(storage, inputDataConfig, null, null, split.get(), getObjectGlob());
|
||||
}
|
||||
|
||||
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
|
||||
|
@ -132,11 +134,13 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
|
|||
inputDataConfig.getMaxListingLength()
|
||||
);
|
||||
|
||||
// Skip files that didn't match filter.
|
||||
if (StringUtils.isNotBlank(getFilter())) {
|
||||
// Skip files that didn't match glob filter.
|
||||
if (StringUtils.isNotBlank(getObjectGlob())) {
|
||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
||||
|
||||
iterator = Iterators.filter(
|
||||
iterator,
|
||||
object -> FilenameUtils.wildcardMatch(object.getName(), getFilter())
|
||||
object -> m.matches(Paths.get(object.getName()))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -151,7 +155,7 @@ public class GoogleCloudStorageInputSource extends CloudObjectInputSource
|
|||
"uris=" + getUris() +
|
||||
", prefixes=" + getPrefixes() +
|
||||
", objects=" + getObjects() +
|
||||
", filter=" + getFilter() +
|
||||
", objectGlob=" + getObjectGlob() +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
|||
URI.create("gs://bar/foo/file2.csv.gz")
|
||||
);
|
||||
|
||||
private static final List<URI> URIS_BEFORE_FILTER = Arrays.asList(
|
||||
private static final List<URI> URIS_BEFORE_GLOB = Arrays.asList(
|
||||
URI.create("gs://foo/bar/file.csv"),
|
||||
URI.create("gs://bar/foo/file2.csv"),
|
||||
URI.create("gs://bar/foo/file3.txt")
|
||||
|
@ -166,15 +166,15 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWithUrisFilter()
|
||||
public void testWithUrisGlob()
|
||||
{
|
||||
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
|
||||
STORAGE,
|
||||
INPUT_DATA_CONFIG,
|
||||
URIS_BEFORE_FILTER,
|
||||
URIS_BEFORE_GLOB,
|
||||
null,
|
||||
null,
|
||||
"*.csv"
|
||||
"**.csv"
|
||||
);
|
||||
|
||||
Stream<InputSplit<List<CloudObjectLocation>>> splits = inputSource.createSplits(
|
||||
|
@ -195,7 +195,7 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
|||
null,
|
||||
PREFIXES,
|
||||
EXPECTED_OBJECTS.get(0),
|
||||
"*.csv"
|
||||
"**.csv"
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -207,10 +207,10 @@ public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTe
|
|||
new GoogleCloudStorageInputSource(
|
||||
STORAGE,
|
||||
INPUT_DATA_CONFIG,
|
||||
URIS_BEFORE_FILTER,
|
||||
URIS_BEFORE_GLOB,
|
||||
PREFIXES,
|
||||
null,
|
||||
"*.csv"
|
||||
"**.csv"
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,6 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.base.Supplier;
|
||||
import com.google.common.base.Suppliers;
|
||||
import com.google.common.collect.Iterators;
|
||||
import org.apache.commons.io.FilenameUtils;
|
||||
import org.apache.druid.common.aws.AWSClientConfig;
|
||||
import org.apache.druid.common.aws.AWSEndpointConfig;
|
||||
import org.apache.druid.common.aws.AWSProxyConfig;
|
||||
|
@ -59,6 +58,9 @@ import org.apache.druid.utils.Streams;
|
|||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.net.URI;
|
||||
import java.nio.file.FileSystems;
|
||||
import java.nio.file.PathMatcher;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
@ -94,6 +96,7 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
* @param uris User provided uris to read input data
|
||||
* @param prefixes User provided prefixes to read input data
|
||||
* @param objects User provided cloud objects values to read input data
|
||||
* @param objectGlob User provided globbing rule to filter input data path
|
||||
* @param s3InputSourceConfig User provided properties for overriding the default S3 credentials
|
||||
* @param awsProxyConfig User provided proxy information for the overridden s3 client
|
||||
* @param awsEndpointConfig User provided s3 endpoint and region for overriding the default S3 endpoint
|
||||
|
@ -108,14 +111,14 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
@JsonProperty("uris") @Nullable List<URI> uris,
|
||||
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
|
||||
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects,
|
||||
@JsonProperty("filter") @Nullable String filter,
|
||||
@JsonProperty("objectGlob") @Nullable String objectGlob,
|
||||
@JsonProperty("properties") @Nullable S3InputSourceConfig s3InputSourceConfig,
|
||||
@JsonProperty("proxyConfig") @Nullable AWSProxyConfig awsProxyConfig,
|
||||
@JsonProperty("endpointConfig") @Nullable AWSEndpointConfig awsEndpointConfig,
|
||||
@JsonProperty("clientConfig") @Nullable AWSClientConfig awsClientConfig
|
||||
)
|
||||
{
|
||||
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, filter);
|
||||
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects, objectGlob);
|
||||
this.inputDataConfig = Preconditions.checkNotNull(inputDataConfig, "S3DataSegmentPusherConfig");
|
||||
Preconditions.checkNotNull(s3Client, "s3Client");
|
||||
this.s3InputSourceConfig = s3InputSourceConfig;
|
||||
|
@ -184,7 +187,7 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
List<URI> uris,
|
||||
List<URI> prefixes,
|
||||
List<CloudObjectLocation> objects,
|
||||
String filter,
|
||||
String objectGlob,
|
||||
S3InputSourceConfig s3InputSourceConfig,
|
||||
AWSProxyConfig awsProxyConfig,
|
||||
AWSEndpointConfig awsEndpointConfig,
|
||||
|
@ -199,7 +202,7 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
uris,
|
||||
prefixes,
|
||||
objects,
|
||||
filter,
|
||||
objectGlob,
|
||||
s3InputSourceConfig,
|
||||
awsProxyConfig,
|
||||
awsEndpointConfig,
|
||||
|
@ -215,7 +218,7 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
List<URI> uris,
|
||||
List<URI> prefixes,
|
||||
List<CloudObjectLocation> objects,
|
||||
String filter,
|
||||
String objectGlob,
|
||||
S3InputSourceConfig s3InputSourceConfig,
|
||||
AWSProxyConfig awsProxyConfig,
|
||||
AWSEndpointConfig awsEndpointConfig,
|
||||
|
@ -231,7 +234,7 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
uris,
|
||||
prefixes,
|
||||
objects,
|
||||
filter,
|
||||
objectGlob,
|
||||
s3InputSourceConfig,
|
||||
awsProxyConfig,
|
||||
awsEndpointConfig,
|
||||
|
@ -340,7 +343,7 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
null,
|
||||
null,
|
||||
split.get(),
|
||||
getFilter(),
|
||||
getObjectGlob(),
|
||||
getS3InputSourceConfig(),
|
||||
getAwsProxyConfig(),
|
||||
getAwsEndpointConfig(),
|
||||
|
@ -380,7 +383,7 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
"uris=" + getUris() +
|
||||
", prefixes=" + getPrefixes() +
|
||||
", objects=" + getObjects() +
|
||||
", filter=" + getFilter() +
|
||||
", objectGlob=" + getObjectGlob() +
|
||||
", s3InputSourceConfig=" + getS3InputSourceConfig() +
|
||||
", awsProxyConfig=" + getAwsProxyConfig() +
|
||||
", awsEndpointConfig=" + getAwsEndpointConfig() +
|
||||
|
@ -399,10 +402,12 @@ public class S3InputSource extends CloudObjectInputSource
|
|||
);
|
||||
|
||||
// Skip files that didn't match filter.
|
||||
if (org.apache.commons.lang.StringUtils.isNotBlank(getFilter())) {
|
||||
if (org.apache.commons.lang.StringUtils.isNotBlank(getObjectGlob())) {
|
||||
PathMatcher m = FileSystems.getDefault().getPathMatcher("glob:" + getObjectGlob());
|
||||
|
||||
iterator = Iterators.filter(
|
||||
iterator,
|
||||
object -> FilenameUtils.wildcardMatch(object.getKey(), getFilter())
|
||||
object -> m.matches(Paths.get(object.getKey()))
|
||||
);
|
||||
}
|
||||
|
||||
|
|
|
@ -126,13 +126,13 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
URI.create("s3://bar/foo/file2.csv.gz")
|
||||
);
|
||||
|
||||
private static final List<CloudObjectLocation> OBJECTS_BEFORE_FILTER = Arrays.asList(
|
||||
private static final List<CloudObjectLocation> OBJECTS_BEFORE_GLOB = Arrays.asList(
|
||||
new CloudObjectLocation(URI.create("s3://foo/bar/file.csv")),
|
||||
new CloudObjectLocation(URI.create("s3://bar/foo/file2.csv")),
|
||||
new CloudObjectLocation(URI.create("s3://bar/foo/file3.txt"))
|
||||
);
|
||||
|
||||
private static final List<URI> URIS_BEFORE_FILTER = Arrays.asList(
|
||||
private static final List<URI> URIS_BEFORE_GLOB = Arrays.asList(
|
||||
URI.create("s3://foo/bar/file.csv"),
|
||||
URI.create("s3://bar/foo/file2.csv"),
|
||||
URI.create("s3://bar/foo/file3.txt")
|
||||
|
@ -219,7 +219,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetFilter()
|
||||
public void testGetObjectGlob()
|
||||
{
|
||||
final S3InputSource withUris = new S3InputSource(
|
||||
SERVICE,
|
||||
|
@ -228,17 +228,16 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
EXPECTED_URIS,
|
||||
null,
|
||||
null,
|
||||
"*.parquet",
|
||||
"**.parquet",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
|
||||
);
|
||||
|
||||
Assert.assertEquals(
|
||||
"*.parquet",
|
||||
withUris.getFilter()
|
||||
"**.parquet",
|
||||
withUris.getObjectGlob()
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -671,16 +670,16 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWithUrisFilter()
|
||||
public void testWithUrisObjectGlob()
|
||||
{
|
||||
S3InputSource inputSource = new S3InputSource(
|
||||
SERVICE,
|
||||
SERVER_SIDE_ENCRYPTING_AMAZON_S3_BUILDER,
|
||||
INPUT_DATA_CONFIG,
|
||||
URIS_BEFORE_FILTER,
|
||||
URIS_BEFORE_GLOB,
|
||||
null,
|
||||
null,
|
||||
"*.csv",
|
||||
"**.csv",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -696,7 +695,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWithObjectsFilter()
|
||||
public void testWithObjectsGlob()
|
||||
{
|
||||
S3InputSource inputSource = new S3InputSource(
|
||||
SERVICE,
|
||||
|
@ -704,8 +703,8 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
INPUT_DATA_CONFIG,
|
||||
null,
|
||||
null,
|
||||
OBJECTS_BEFORE_FILTER,
|
||||
"*.csv",
|
||||
OBJECTS_BEFORE_GLOB,
|
||||
"**.csv",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
@ -721,7 +720,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testWithoutObjectsFilter()
|
||||
public void testWithoutObjectsGlob()
|
||||
{
|
||||
S3InputSource inputSource = new S3InputSource(
|
||||
SERVICE,
|
||||
|
@ -777,11 +776,11 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testGetPrefixesSplitStreamWithFilter()
|
||||
public void testGetPrefixesSplitStreamWithObjectGlob()
|
||||
{
|
||||
EasyMock.reset(S3_CLIENT);
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(URIS_BEFORE_FILTER.get(0)), CONTENT);
|
||||
expectListObjects(PREFIXES.get(1), ImmutableList.of(URIS_BEFORE_FILTER.get(1), URIS_BEFORE_FILTER.get(2)), CONTENT);
|
||||
expectListObjects(PREFIXES.get(0), ImmutableList.of(URIS_BEFORE_GLOB.get(0)), CONTENT);
|
||||
expectListObjects(PREFIXES.get(1), ImmutableList.of(URIS_BEFORE_GLOB.get(1), URIS_BEFORE_GLOB.get(2)), CONTENT);
|
||||
EasyMock.replay(S3_CLIENT);
|
||||
|
||||
S3InputSource inputSource = new S3InputSource(
|
||||
|
@ -791,7 +790,7 @@ public class S3InputSourceTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
PREFIXES,
|
||||
null,
|
||||
"*.csv",
|
||||
"**.csv",
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -373,6 +373,7 @@ non-nullable
|
|||
noop
|
||||
numerics
|
||||
numShards
|
||||
objectGlob
|
||||
parameterized
|
||||
parse_json
|
||||
parseable
|
||||
|
@ -639,7 +640,7 @@ taskId
|
|||
multiStageQuery.taskId
|
||||
multiStageQuery.payload.status
|
||||
multiStageQuery.payload.status.status
|
||||
multiStageQuery.payload.status.startTime
|
||||
multiStageQuery.payload.status.startTime
|
||||
multiStageQuery.payload.status.durationMs
|
||||
multiStageQuery.payload.status.pendingTasks
|
||||
multiStageQuery.payload.status.runningTasks
|
||||
|
@ -668,46 +669,46 @@ workerCount
|
|||
partitionCount
|
||||
startCount
|
||||
# MSQ errors and limits
|
||||
BroadcastTablesTooLarge
|
||||
CannotParseExternalData
|
||||
BroadcastTablesTooLarge
|
||||
CannotParseExternalData
|
||||
ColumnNameRestricted
|
||||
ColumnTypeNotSupported
|
||||
DurableStorageConfiguration
|
||||
ColumnTypeNotSupported
|
||||
DurableStorageConfiguration
|
||||
ColumnTypeNotSupported
|
||||
InsertCannotAllocateSegment
|
||||
InsertCannotBeEmpty
|
||||
InsertCannotOrderByDescending
|
||||
InsertCannotReplaceExistingSegment
|
||||
InsertLockPreempted
|
||||
InsertCannotBeEmpty
|
||||
InsertCannotOrderByDescending
|
||||
InsertCannotReplaceExistingSegment
|
||||
InsertLockPreempted
|
||||
InsertTimeNull
|
||||
CURRENT_TIMESTAMP
|
||||
CURRENT_TIMESTAMP
|
||||
InsertTimeOutOfBounds
|
||||
UnknownError
|
||||
TaskStartTimeout
|
||||
OutOfMemoryError
|
||||
SegmentGenerator
|
||||
maxFrameSize
|
||||
InvalidNullByte
|
||||
QueryNotSupported
|
||||
QueryNotSupported
|
||||
RowTooLarge
|
||||
TooManyBuckets
|
||||
TooManyInputFiles
|
||||
TooManyPartitions
|
||||
TooManyColumns
|
||||
TooManyWarnings
|
||||
TooManyWorkers
|
||||
NotEnoughMemory
|
||||
WorkerFailed
|
||||
WorkerRpcFailed
|
||||
InvalidNullByte
|
||||
QueryNotSupported
|
||||
QueryNotSupported
|
||||
RowTooLarge
|
||||
TooManyBuckets
|
||||
TooManyInputFiles
|
||||
TooManyPartitions
|
||||
TooManyColumns
|
||||
TooManyWarnings
|
||||
TooManyWorkers
|
||||
NotEnoughMemory
|
||||
WorkerFailed
|
||||
WorkerRpcFailed
|
||||
# MSQ context parameters
|
||||
maxNumTasks
|
||||
taskAssignment
|
||||
maxNumTasks
|
||||
taskAssignment
|
||||
finalizeAggregations
|
||||
indexSpec
|
||||
rowsInMemory
|
||||
segmentSortOrder
|
||||
rowsPerSegment
|
||||
rowsInMemory
|
||||
segmentSortOrder
|
||||
rowsPerSegment
|
||||
durableShuffleStorage
|
||||
# Aggregations
|
||||
groupByEnableMultiValueUnnesting
|
||||
|
@ -1383,22 +1384,22 @@ StaticS3Firehose
|
|||
prefetchTriggerBytes
|
||||
awaitSegmentAvailabilityTimeoutMillis
|
||||
- ../docs/ingestion/native-batch-firehose.md
|
||||
LocalFirehose
|
||||
LocalFirehose
|
||||
baseDir
|
||||
HttpFirehose
|
||||
HttpFirehose
|
||||
httpAuthenticationUsername
|
||||
DefaultPassword
|
||||
PasswordProviders
|
||||
EnvironmentVariablePasswordProvider
|
||||
ingestSegment
|
||||
ingestSegment
|
||||
maxInputSegmentBytesPerTask
|
||||
150MB
|
||||
foldCase
|
||||
sqls
|
||||
connectorConfig
|
||||
InlineFirehose
|
||||
InlineFirehose
|
||||
CombiningFirehose
|
||||
httpAuthenticationPassword
|
||||
httpAuthenticationPassword
|
||||
- ../docs/ingestion/native-batch-input-source.md
|
||||
accessKeyId
|
||||
secretAccessKey
|
||||
|
@ -1409,7 +1410,7 @@ countryName
|
|||
dataSchema's
|
||||
appendToExisting
|
||||
dropExisting
|
||||
timeChunk
|
||||
timeChunk
|
||||
PartitionsSpec
|
||||
forceGuaranteedRollup
|
||||
reportParseExceptions
|
||||
|
@ -2042,6 +2043,7 @@ durationToRetain
|
|||
ec2
|
||||
equalDistribution
|
||||
extractionFn
|
||||
filename
|
||||
file.encoding
|
||||
fillCapacity
|
||||
first_location
|
||||
|
@ -2202,86 +2204,86 @@ IngestionSpec
|
|||
druid-compressed-bigdecimal
|
||||
doubleSum
|
||||
- ../docs/querying/sql-functions.md
|
||||
ANY_VALUE
|
||||
APPROX_COUNT_DISTINCT_DS_HLL
|
||||
APPROX_COUNT_DISTINCT_DS_THETA
|
||||
APPROX_QUANTILE_DS
|
||||
APPROX_QUANTILE_FIXED_BUCKETS
|
||||
ARRAY_CONCAT_AGG
|
||||
BIT_AND
|
||||
BIT_OR
|
||||
BIT_XOR
|
||||
BITWISE_AND
|
||||
BITWISE_COMPLEMENT
|
||||
BITWISE_CONVERT_DOUBLE_TO_LONG_BITS
|
||||
BITWISE_CONVERT_LONG_BITS_TO_DOUBLE
|
||||
BITWISE_OR
|
||||
BITWISE_SHIFT_LEFT
|
||||
BITWISE_SHIFT_RIGHT
|
||||
BITWISE_XOR
|
||||
BLOOM_FILTER
|
||||
BTRIM
|
||||
CHAR_LENGTH
|
||||
CHARACTER_LENGTH
|
||||
CURRENT_DATE
|
||||
CURRENT_TIMESTAMP
|
||||
DATE_TRUNC
|
||||
DS_CDF
|
||||
DS_GET_QUANTILE
|
||||
DS_GET_QUANTILES
|
||||
DS_HISTOGRAM
|
||||
DS_HLL
|
||||
DS_QUANTILE_SUMMARY
|
||||
DS_QUANTILES_SKETCH
|
||||
DS_RANK
|
||||
DS_THETA
|
||||
EARLIEST_BY
|
||||
ANY_VALUE
|
||||
APPROX_COUNT_DISTINCT_DS_HLL
|
||||
APPROX_COUNT_DISTINCT_DS_THETA
|
||||
APPROX_QUANTILE_DS
|
||||
APPROX_QUANTILE_FIXED_BUCKETS
|
||||
ARRAY_CONCAT_AGG
|
||||
BIT_AND
|
||||
BIT_OR
|
||||
BIT_XOR
|
||||
BITWISE_AND
|
||||
BITWISE_COMPLEMENT
|
||||
BITWISE_CONVERT_DOUBLE_TO_LONG_BITS
|
||||
BITWISE_CONVERT_LONG_BITS_TO_DOUBLE
|
||||
BITWISE_OR
|
||||
BITWISE_SHIFT_LEFT
|
||||
BITWISE_SHIFT_RIGHT
|
||||
BITWISE_XOR
|
||||
BLOOM_FILTER
|
||||
BTRIM
|
||||
CHAR_LENGTH
|
||||
CHARACTER_LENGTH
|
||||
CURRENT_DATE
|
||||
CURRENT_TIMESTAMP
|
||||
DATE_TRUNC
|
||||
DS_CDF
|
||||
DS_GET_QUANTILE
|
||||
DS_GET_QUANTILES
|
||||
DS_HISTOGRAM
|
||||
DS_HLL
|
||||
DS_QUANTILE_SUMMARY
|
||||
DS_QUANTILES_SKETCH
|
||||
DS_RANK
|
||||
DS_THETA
|
||||
EARLIEST_BY
|
||||
_e_
|
||||
HLL_SKETCH_ESTIMATE
|
||||
HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS
|
||||
HLL_SKETCH_TO_STRING
|
||||
HLL_SKETCH_UNION
|
||||
LATEST_BY
|
||||
HLL_SKETCH_ESTIMATE
|
||||
HLL_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS
|
||||
HLL_SKETCH_TO_STRING
|
||||
HLL_SKETCH_UNION
|
||||
LATEST_BY
|
||||
base-10
|
||||
MV_APPEND
|
||||
MV_CONCAT
|
||||
MV_CONTAINS
|
||||
MV_FILTER_NONE
|
||||
MV_FILTER_ONLY
|
||||
MV_LENGTH
|
||||
MV_OFFSET
|
||||
MV_OFFSET_OF
|
||||
MV_ORDINAL
|
||||
MV_ORDINAL_OF
|
||||
MV_OVERLAP
|
||||
MV_PREPEND
|
||||
MV_SLICE
|
||||
MV_TO_STRING
|
||||
NULLIF
|
||||
MV_APPEND
|
||||
MV_CONCAT
|
||||
MV_CONTAINS
|
||||
MV_FILTER_NONE
|
||||
MV_FILTER_ONLY
|
||||
MV_LENGTH
|
||||
MV_OFFSET
|
||||
MV_OFFSET_OF
|
||||
MV_ORDINAL
|
||||
MV_ORDINAL_OF
|
||||
MV_OVERLAP
|
||||
MV_PREPEND
|
||||
MV_SLICE
|
||||
MV_TO_STRING
|
||||
NULLIF
|
||||
_n_th
|
||||
STDDEV_POP
|
||||
STDDEV_SAMP
|
||||
STRING_FORMAT
|
||||
STRING_TO_MV
|
||||
SUBSTR
|
||||
TDIGEST_GENERATE_SKETCH
|
||||
TDIGEST_QUANTILE
|
||||
TEXTCAT
|
||||
THETA_SKETCH_ESTIMATE
|
||||
THETA_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS
|
||||
THETA_SKETCH_INTERSECT
|
||||
THETA_SKETCH_NOT
|
||||
THETA_SKETCH_UNION
|
||||
TIME_CEIL
|
||||
TIME_EXTRACT
|
||||
TIME_FLOOR
|
||||
TIME_FORMAT
|
||||
TIME_IN_INTERVAL
|
||||
TIMESTAMP_TO_MILLIS
|
||||
TIMESTAMPADD
|
||||
TIMESTAMPDIFF
|
||||
TRUNC
|
||||
VAR_POP
|
||||
STDDEV_POP
|
||||
STDDEV_SAMP
|
||||
STRING_FORMAT
|
||||
STRING_TO_MV
|
||||
SUBSTR
|
||||
TDIGEST_GENERATE_SKETCH
|
||||
TDIGEST_QUANTILE
|
||||
TEXTCAT
|
||||
THETA_SKETCH_ESTIMATE
|
||||
THETA_SKETCH_ESTIMATE_WITH_ERROR_BOUNDS
|
||||
THETA_SKETCH_INTERSECT
|
||||
THETA_SKETCH_NOT
|
||||
THETA_SKETCH_UNION
|
||||
TIME_CEIL
|
||||
TIME_EXTRACT
|
||||
TIME_FLOOR
|
||||
TIME_FORMAT
|
||||
TIME_IN_INTERVAL
|
||||
TIMESTAMP_TO_MILLIS
|
||||
TIMESTAMPADD
|
||||
TIMESTAMPDIFF
|
||||
TRUNC
|
||||
VAR_POP
|
||||
VAR_SAMP
|
||||
KTable
|
||||
Aotearoa
|
||||
|
|
Loading…
Reference in New Issue