add 'prefixes' support to google input source (#8930)

* add prefixes support to google input source, making it symmetrical-ish with s3

* docs

* more better, and tests

* unused

* formatting

* javadoc

* dependencies

* oops

* review comments

* better javadoc
This commit is contained in:
Clint Wylie 2019-12-04 21:01:10 -08:00 committed by GitHub
parent 1cff73f3e0
commit 5ecdf94d83
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 684 additions and 234 deletions

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
public abstract class CloudObjectInputSource<T extends InputEntity> extends AbstractInputSource
implements SplittableInputSource<CloudObjectLocation>
{
private final List<URI> uris;
private final List<URI> prefixes;
private final List<CloudObjectLocation> objects;
public CloudObjectInputSource(
String scheme,
@Nullable List<URI> uris,
@Nullable List<URI> prefixes,
@Nullable List<CloudObjectLocation> objects
)
{
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;
if (!CollectionUtils.isNullOrEmpty(objects)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes));
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri));
} else {
throwIfIllegalArgs(true);
}
}
@JsonProperty
public List<URI> getUris()
{
return uris;
}
@JsonProperty
public List<URI> getPrefixes()
{
return prefixes;
}
@Nullable
@JsonProperty
public List<CloudObjectLocation> getObjects()
{
return objects;
}
/**
* Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation}. This
* is called internally by {@link #formattableReader} and operates on the output of {@link #createSplits}.
*/
protected abstract T createEntity(InputSplit<CloudObjectLocation> split);
/**
* Create a stream of {@link CloudObjectLocation} splits by listing objects that appear under {@link #prefixes} using
* this input sources backend API. This is called internally by {@link #createSplits} and {@link #estimateNumSplits},
* only if {@link #prefixes} is set, otherwise the splits are created directly from {@link #uris} or {@link #objects}.
* Calling if {@link #prefixes} is not set is likely to either lead to an empty iterator or null pointer exception.
*/
protected abstract Stream<InputSplit<CloudObjectLocation>> getPrefixesSplitStream();
@Override
public Stream<InputSplit<CloudObjectLocation>> createSplits(
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
return objects.stream().map(InputSplit::new);
}
if (!CollectionUtils.isNullOrEmpty(uris)) {
return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new);
}
return getPrefixesSplitStream();
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
if (!CollectionUtils.isNullOrEmpty(objects)) {
return objects.size();
}
if (!CollectionUtils.isNullOrEmpty(uris)) {
return uris.size();
}
return Ints.checkedCast(getPrefixesSplitStream().count());
}
@Override
public boolean needsFormat()
{
return true;
}
@Override
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
)
{
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
createSplits(inputFormat, null).map(this::createEntity),
temporaryDirectory
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CloudObjectInputSource that = (CloudObjectInputSource) o;
return Objects.equals(uris, that.uris) &&
Objects.equals(prefixes, that.prefixes) &&
Objects.equals(objects, that.objects);
}
@Override
public int hashCode()
{
return Objects.hash(uris, prefixes, objects);
}
private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException
{
if (clause) {
throw new IllegalArgumentException("exactly one of either uris or prefixes or objects must be specified");
}
}
}

View File

@ -22,6 +22,7 @@ package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.StringUtils;
import java.net.URI;
@ -45,6 +46,14 @@ import java.util.Objects;
*/
public class CloudObjectLocation
{
public static URI validateUriScheme(String scheme, URI uri)
{
if (!scheme.equalsIgnoreCase(uri.getScheme())) {
throw new IAE("Invalid URI scheme [%s] must be [%s]", uri.toString(), scheme);
}
return uri;
}
private final String bucket;
private final String path;
@ -125,5 +134,4 @@ public class CloudObjectLocation
{
return Objects.hash(bucket, path);
}
}

View File

@ -60,12 +60,57 @@ This extension also provides an input source for Druid native batch ingestion to
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"prefixes": ["gs://foo/bar", "gs://bar/foo"]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
```json
...
"ioConfig": {
"type": "index_parallel",
"inputSource": {
"type": "google",
"objects": [
{ "bucket": "foo", "path": "bar/file1.json"},
{ "bucket": "bar", "path": "foo/file2.json"}
]
},
"inputFormat": {
"type": "json"
},
...
},
...
```
|property|description|default|required?|
|--------|-----------|-------|---------|
|type|This should be `google`.|N/A|yes|
|uris|JSON array of URIs where Google Cloud Storage files to be ingested are located.|N/A|yes|
|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|N/A|`uris` or `prefixes` or `objects` must be set|
|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set|
|objects|JSON array of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set|
Google Cloud Storage object:
|property|description|default|required?|
|--------|-----------|-------|---------|
|bucket|Name of the Google Cloud Storage bucket|N/A|yes|
|path|The path where data is located.|N/A|yes|
## Firehose
<a name="firehose"></a>

View File

@ -124,6 +124,13 @@
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-core</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-processing</artifactId>
@ -158,5 +165,11 @@
<artifactId>jackson-module-guice</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.google;
import com.google.common.base.Predicate;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.storage.google.GoogleByteSource;
import org.apache.druid.storage.google.GoogleStorage;
@ -33,36 +34,33 @@ import java.net.URI;
public class GoogleCloudStorageEntity extends RetryingInputEntity
{
private final GoogleStorage storage;
private final URI uri;
private final CloudObjectLocation location;
private final GoogleByteSource byteSource;
GoogleCloudStorageEntity(GoogleStorage storage, URI uri)
GoogleCloudStorageEntity(GoogleStorage storage, CloudObjectLocation location)
{
this.storage = storage;
this.uri = uri;
this.location = location;
this.byteSource = new GoogleByteSource(storage, location.getBucket(), location.getPath());
}
@Nullable
@Override
public URI getUri()
{
return uri;
return location.toUri(GoogleCloudStorageInputSource.SCHEME);
}
@Override
protected InputStream readFrom(long offset) throws IOException
{
// Get data of the given object and open an input stream
final String bucket = uri.getAuthority();
final String key = StringUtils.maybeRemoveLeadingSlash(uri.getPath());
final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key);
return byteSource.openStream(offset);
}
@Override
protected String getPath()
{
return StringUtils.maybeRemoveLeadingSlash(uri.getPath());
return StringUtils.maybeRemoveLeadingSlash(byteSource.getPath());
}
@Override

View File

@ -22,104 +22,78 @@ package org.apache.druid.data.input.google;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.storage.google.GoogleUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class GoogleCloudStorageInputSource extends AbstractInputSource implements SplittableInputSource<URI>
public class GoogleCloudStorageInputSource extends CloudObjectInputSource<GoogleCloudStorageEntity>
{
static final String SCHEME = "gs";
private final GoogleStorage storage;
private final List<URI> uris;
@JsonCreator
public GoogleCloudStorageInputSource(
@JacksonInject GoogleStorage storage,
@JsonProperty("uris") List<URI> uris
@JsonProperty("uris") @Nullable List<URI> uris,
@JsonProperty("prefixes") @Nullable List<URI> prefixes,
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
)
{
super(SCHEME, uris, prefixes, objects);
this.storage = storage;
this.uris = uris;
}
@JsonProperty("uris")
public List<URI> getUris()
{
return uris;
}
@Override
public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return uris.stream().map(InputSplit::new);
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
protected GoogleCloudStorageEntity createEntity(InputSplit<CloudObjectLocation> split)
{
return uris.size();
return new GoogleCloudStorageEntity(storage, split.get());
}
@Override
public SplittableInputSource<URI> withSplit(InputSplit<URI> split)
protected Stream<InputSplit<CloudObjectLocation>> getPrefixesSplitStream()
{
return new GoogleCloudStorageInputSource(storage, ImmutableList.of(split.get()));
return StreamSupport.stream(storageObjectIterable().spliterator(), false)
.map(this::byteSourceFromStorageObject)
.map(InputSplit::new);
}
@Override
public boolean needsFormat()
public SplittableInputSource<CloudObjectLocation> withSplit(InputSplit<CloudObjectLocation> split)
{
return true;
return new GoogleCloudStorageInputSource(storage, null, null, ImmutableList.of(split.get()));
}
private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject)
{
return new CloudObjectLocation(storageObject.getBucket(), storageObject.getName());
}
private Iterable<StorageObject> storageObjectIterable()
{
return () ->
GoogleUtils.lazyFetchingStorageObjectsIterator(storage, getPrefixes().iterator(), RetryUtils.DEFAULT_MAX_TRIES);
}
@Override
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
)
public String toString()
{
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity(
storage,
split.get()
)),
temporaryDirectory
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GoogleCloudStorageInputSource that = (GoogleCloudStorageInputSource) o;
return Objects.equals(uris, that.uris);
}
@Override
public int hashCode()
{
return Objects.hash(uris);
return "GoogleCloudStorageInputSource{" +
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
'}';
}
}

View File

@ -23,6 +23,7 @@ import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;
import java.util.Objects;
public class GoogleByteSource extends ByteSource
{
@ -37,6 +38,16 @@ public class GoogleByteSource extends ByteSource
this.path = path;
}
public String getBucket()
{
return bucket;
}
public String getPath()
{
return path;
}
@Override
public InputStream openStream() throws IOException
{
@ -47,4 +58,24 @@ public class GoogleByteSource extends ByteSource
{
return storage.get(bucket, path, start);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GoogleByteSource that = (GoogleByteSource) o;
return Objects.equals(bucket, that.bucket) &&
Objects.equals(path, that.path);
}
@Override
public int hashCode()
{
return Objects.hash(bucket, path);
}
}

View File

@ -20,12 +20,23 @@
package org.apache.druid.storage.google;
import com.google.api.client.http.HttpResponseException;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.Predicate;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.NoSuchElementException;
public class GoogleUtils
{
public static final Predicate<Throwable> GOOGLE_RETRY = GoogleUtils::isRetryable;
public static boolean isRetryable(Throwable t)
{
if (t instanceof HttpResponseException) {
@ -35,5 +46,103 @@ public class GoogleUtils
return t instanceof IOException;
}
public static final Predicate<Throwable> GOOGLE_RETRY = e -> isRetryable(e);
private static <T> T retryGoogleCloudStorageOperation(RetryUtils.Task<T> f) throws Exception
{
return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES);
}
public static Iterator<StorageObject> lazyFetchingStorageObjectsIterator(
final GoogleStorage storage,
final Iterator<URI> uris,
final long maxListingLength
)
{
return new Iterator<StorageObject>()
{
private Storage.Objects.List listRequest;
private Objects results;
private URI currentUri;
private String currentBucket;
private String currentPrefix;
private String nextPageToken;
private Iterator<StorageObject> storageObjectsIterator;
{
nextPageToken = null;
prepareNextRequest();
fetchNextBatch();
}
private void prepareNextRequest()
{
try {
currentUri = uris.next();
currentBucket = currentUri.getAuthority();
currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath());
nextPageToken = null;
listRequest = storage.list(currentBucket)
.setPrefix(currentPrefix)
.setMaxResults(maxListingLength);
}
catch (IOException io) {
throw new RuntimeException(io);
}
}
private void fetchNextBatch()
{
try {
listRequest.setPageToken(nextPageToken);
results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute());
storageObjectsIterator = results.getItems().iterator();
nextPageToken = results.getNextPageToken();
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
@Override
public boolean hasNext()
{
return storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext();
}
@Override
public StorageObject next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
while (storageObjectsIterator.hasNext()) {
final StorageObject next = storageObjectsIterator.next();
// list with prefix can return directories, but they should always end with `/`, ignore them
if (!next.getName().endsWith("/")) {
return next;
}
}
if (nextPageToken != null) {
fetchNextBatch();
} else if (uris.hasNext()) {
prepareNextRequest();
fetchNextBatch();
}
if (!storageObjectsIterator.hasNext()) {
throw new ISE(
"Failed to further iterate on bucket[%s] and prefix[%s]. The last page token was [%s]",
currentBucket,
currentPrefix,
nextPageToken
);
}
return next();
}
};
}
}

View File

@ -23,66 +23,271 @@ import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.initialization.DruidModule;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.storage.google.GoogleStorage;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.utils.CompressionUtils;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class GoogleCloudStorageInputSourceTest
public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTest
{
private static final GoogleStorage STORAGE = new GoogleStorage(null);
private static final GoogleStorage STORAGE = EasyMock.createMock(GoogleStorage.class);
private static final List<URI> EXPECTED_URIS = Arrays.asList(
URI.create("gs://foo/bar/file.csv"),
URI.create("gs://bar/foo/file2.csv")
);
private static final List<URI> EXPECTED_COMPRESSED_URIS = Arrays.asList(
URI.create("gs://foo/bar/file.csv.gz"),
URI.create("gs://bar/foo/file2.csv.gz")
);
private static final List<CloudObjectLocation> EXPECTED_OBJECTS =
EXPECTED_URIS.stream().map(CloudObjectLocation::new).collect(Collectors.toList());
private static final List<URI> PREFIXES = Arrays.asList(
URI.create("gs://foo/bar"),
URI.create("gs://bar/foo")
);
private static final List<CloudObjectLocation> EXPECTED_LOCATION =
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv"));
private static final DateTime NOW = DateTimes.nowUtc();
private static final byte[] CONTENT =
StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis()));
@Test
public void testSerde() throws Exception
{
final ObjectMapper mapper = createGoogleObjectMapper();
final List<URI> uris = Arrays.asList(
new URI("gs://foo/bar/file.gz"),
new URI("gs://bar/foo/file2.gz")
);
final List<URI> prefixes = Arrays.asList(
new URI("gs://foo/bar"),
new URI("gs://bar/foo")
);
final GoogleCloudStorageInputSource withUris = new GoogleCloudStorageInputSource(STORAGE, uris);
final GoogleCloudStorageInputSource withUris =
new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null);
final GoogleCloudStorageInputSource serdeWithUris =
mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withUris, serdeWithUris);
}
@Test
public void testSerdePrefixes() throws Exception
{
final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withPrefixes =
new GoogleCloudStorageInputSource(STORAGE, ImmutableList.of(), PREFIXES, null);
final GoogleCloudStorageInputSource serdeWithPrefixes =
mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withPrefixes, serdeWithPrefixes);
}
@Test
public void testSerdeObjects() throws Exception
{
final ObjectMapper mapper = createGoogleObjectMapper();
final GoogleCloudStorageInputSource withObjects =
new GoogleCloudStorageInputSource(
STORAGE,
null,
null,
ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz"))
);
final GoogleCloudStorageInputSource serdeWithObjects =
mapper.readValue(mapper.writeValueAsString(withObjects), GoogleCloudStorageInputSource.class);
Assert.assertEquals(withObjects, serdeWithObjects);
}
@Test
public void testWithUrisSplit()
{
final List<URI> uris = Arrays.asList(
URI.create("gs://foo/bar/file.gz"),
URI.create("gs://bar/foo/file2.gz")
);
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, uris);
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null);
Stream<InputSplit<URI>> splits = inputSource.createSplits(
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
null
);
Assert.assertEquals(uris, splits.map(InputSplit::get).collect(Collectors.toList()));
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testWithPrefixesSplit() throws IOException
{
EasyMock.reset(STORAGE);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
EasyMock.replay(STORAGE);
GoogleCloudStorageInputSource inputSource =
new GoogleCloudStorageInputSource(STORAGE, null, PREFIXES, null);
Stream<InputSplit<CloudObjectLocation>> splits = inputSource.createSplits(
new JsonInputFormat(JSONPathSpec.DEFAULT, null),
null
);
Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList()));
}
@Test
public void testReader() throws IOException
{
EasyMock.reset(STORAGE);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0)));
addExpectedGetObjectMock(EXPECTED_URIS.get(0));
addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1)));
addExpectedGetObjectMock(EXPECTED_URIS.get(1));
EasyMock.replay(STORAGE);
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
STORAGE,
null,
PREFIXES,
null
);
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
);
InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
null
);
CloseableIterator<InputRow> iterator = reader.read();
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
Assert.assertEquals(NOW, nextRow.getTimestamp());
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
}
@Test
public void testCompressedReader() throws IOException
{
EasyMock.reset(STORAGE);
addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0)));
addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0));
addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1)));
addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1));
EasyMock.replay(STORAGE);
GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(
STORAGE,
null,
PREFIXES,
null
);
InputRowSchema someSchema = new InputRowSchema(
new TimestampSpec("time", "auto", null),
new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))),
ImmutableList.of("count")
);
InputSourceReader reader = inputSource.reader(
someSchema,
new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0),
null
);
CloseableIterator<InputRow> iterator = reader.read();
while (iterator.hasNext()) {
InputRow nextRow = iterator.next();
Assert.assertEquals(NOW, nextRow.getTimestamp());
Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0));
Assert.assertEquals("world", nextRow.getDimension("dim2").get(0));
}
}
private static void addExpectedPrefixObjects(URI prefix, List<URI> uris) throws IOException
{
final String bucket = prefix.getAuthority();
Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class);
EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once();
EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once();
EasyMock.expect(listRequest.setMaxResults(EasyMock.anyLong())).andReturn(listRequest).once();
EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath()))))
.andReturn(listRequest)
.once();
List<StorageObject> mockObjects = new ArrayList<>();
for (URI uri : uris) {
StorageObject s = new StorageObject();
s.setBucket(bucket);
s.setName(uri.getPath());
mockObjects.add(s);
}
Objects response = new Objects();
response.setItems(mockObjects);
EasyMock.expect(listRequest.execute()).andReturn(response).once();
EasyMock.expect(response.getItems()).andReturn(mockObjects).once();
EasyMock.replay(listRequest);
}
private static void addExpectedGetObjectMock(URI uri) throws IOException
{
CloudObjectLocation location = new CloudObjectLocation(uri);
EasyMock.expect(
STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L))
).andReturn(new ByteArrayInputStream(CONTENT)).once();
}
private static void addExpectedGetCompressedObjectMock(URI uri) throws IOException
{
CloudObjectLocation location = new CloudObjectLocation(uri);
ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped);
EasyMock.expect(
STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L))
).andReturn(new ByteArrayInputStream(gzipped.toByteArray())).once();
}
public static ObjectMapper createGoogleObjectMapper()

View File

@ -113,11 +113,6 @@
<artifactId>aws-java-sdk-s3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>org.apache.druid</groupId>
@ -155,6 +150,11 @@
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -26,6 +26,7 @@ import com.google.common.base.Predicate;
import org.apache.druid.data.input.RetryingInputEntity;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
@ -47,7 +48,7 @@ public class S3Entity extends RetryingInputEntity
@Override
public URI getUri()
{
return null;
return object.toUri(S3StorageDruidModule.SCHEME);
}
@Override

View File

@ -25,35 +25,25 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.data.input.impl.CloudObjectInputSource;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.data.input.impl.InputEntityIteratingReader;
import org.apache.druid.data.input.impl.SplittableInputSource;
import org.apache.druid.storage.s3.S3StorageDruidModule;
import org.apache.druid.storage.s3.S3Utils;
import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3;
import org.apache.druid.utils.CollectionUtils;
import javax.annotation.Nullable;
import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class S3InputSource extends AbstractInputSource implements SplittableInputSource<CloudObjectLocation>
public class S3InputSource extends CloudObjectInputSource<S3Entity>
{
private static final int MAX_LISTING_LENGTH = 1024;
private final ServerSideEncryptingAmazonS3 s3Client;
private final List<URI> uris;
private final List<URI> prefixes;
private final List<CloudObjectLocation> objects;
@JsonCreator
public S3InputSource(
@ -63,144 +53,42 @@ public class S3InputSource extends AbstractInputSource implements SplittableInpu
@JsonProperty("objects") @Nullable List<CloudObjectLocation> objects
)
{
super(S3StorageDruidModule.SCHEME, uris, prefixes, objects);
this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client");
this.uris = uris;
this.prefixes = prefixes;
this.objects = objects;
if (!CollectionUtils.isNullOrEmpty(objects)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes));
} else if (!CollectionUtils.isNullOrEmpty(uris)) {
throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes));
uris.forEach(S3Utils::checkURI);
} else if (!CollectionUtils.isNullOrEmpty(prefixes)) {
prefixes.forEach(S3Utils::checkURI);
} else {
throwIfIllegalArgs(true);
}
}
@JsonProperty
public List<URI> getUris()
{
return uris;
}
@JsonProperty
public List<URI> getPrefixes()
{
return prefixes;
}
@JsonProperty
public List<CloudObjectLocation> getObjects()
{
return objects;
}
@Override
public Stream<InputSplit<CloudObjectLocation>> createSplits(
InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec
)
protected S3Entity createEntity(InputSplit<CloudObjectLocation> split)
{
if (objects != null) {
return objects.stream().map(InputSplit::new);
}
if (uris != null) {
return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new);
}
return new S3Entity(s3Client, split.get());
}
@Override
protected Stream<InputSplit<CloudObjectLocation>> getPrefixesSplitStream()
{
return StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false)
.map(S3Utils::summaryToCloudObjectLocation)
.map(InputSplit::new);
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
if (objects != null) {
return objects.size();
}
if (uris != null) {
return uris.size();
}
return (int) StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false).count();
}
@Override
public SplittableInputSource<CloudObjectLocation> withSplit(InputSplit<CloudObjectLocation> split)
{
return new S3InputSource(s3Client, null, null, ImmutableList.of(split.get()));
}
@Override
public boolean needsFormat()
{
return true;
}
@Override
protected InputSourceReader formattableReader(
InputRowSchema inputRowSchema,
InputFormat inputFormat,
@Nullable File temporaryDirectory
)
{
return new InputEntityIteratingReader(
inputRowSchema,
inputFormat,
// formattableReader() is supposed to be called in each task that actually creates segments.
// The task should already have only one split in parallel indexing,
// while there's no need to make splits using splitHintSpec in sequential indexing.
createSplits(inputFormat, null).map(split -> new S3Entity(s3Client, split.get())),
temporaryDirectory
);
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
S3InputSource that = (S3InputSource) o;
return Objects.equals(uris, that.uris) &&
Objects.equals(prefixes, that.prefixes) &&
Objects.equals(objects, that.objects);
}
@Override
public int hashCode()
{
return Objects.hash(uris, prefixes, objects);
}
@Override
public String toString()
{
return "S3InputSource{" +
"uris=" + uris +
", prefixes=" + prefixes +
", objects=" + objects +
"uris=" + getUris() +
", prefixes=" + getPrefixes() +
", objects=" + getObjects() +
'}';
}
private Iterable<S3ObjectSummary> getIterableObjectsFromPrefixes()
{
return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, prefixes.iterator(), MAX_LISTING_LENGTH);
}
private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException
{
if (clause) {
throw new IllegalArgumentException("exactly one of either uris or prefixes or objects must be specified");
}
return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, getPrefixes().iterator(), MAX_LISTING_LENGTH);
}
}

View File

@ -34,7 +34,6 @@ import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import org.apache.druid.data.input.impl.CloudObjectLocation;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.RetryUtils;
@ -289,10 +288,8 @@ public class S3Utils
{
if (uri.getScheme().equalsIgnoreCase(S3StorageDruidModule.SCHEME_S3_ZIP)) {
uri = URI.create(SCHEME + uri.toString().substring(S3StorageDruidModule.SCHEME_S3_ZIP.length()));
} else if (!SCHEME.equalsIgnoreCase(uri.getScheme())) {
throw new IAE("Invalid URI scheme [%s] must be [%s]", uri.toString(), SCHEME);
}
return uri;
return CloudObjectLocation.validateUriScheme(SCHEME, uri);
}
// Copied from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder()