diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java new file mode 100644 index 00000000000..c8b7a82bf34 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.primitives.Ints; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputEntity; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.utils.CollectionUtils; + +import javax.annotation.Nullable; +import java.io.File; +import java.net.URI; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; + +public abstract class CloudObjectInputSource extends AbstractInputSource + implements SplittableInputSource +{ + private final List uris; + private final List prefixes; + private final List objects; + + public CloudObjectInputSource( + String scheme, + @Nullable List uris, + @Nullable List prefixes, + @Nullable List objects + ) + { + this.uris = uris; + this.prefixes = prefixes; + this.objects = objects; + + if (!CollectionUtils.isNullOrEmpty(objects)) { + throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes)); + } else if (!CollectionUtils.isNullOrEmpty(uris)) { + throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes)); + uris.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri)); + } else if (!CollectionUtils.isNullOrEmpty(prefixes)) { + prefixes.forEach(uri -> CloudObjectLocation.validateUriScheme(scheme, uri)); + } else { + throwIfIllegalArgs(true); + } + } + + @JsonProperty + public List getUris() + { + return uris; + } + + @JsonProperty + public List getPrefixes() + { + return prefixes; + } + + @Nullable + @JsonProperty + public List getObjects() + { + return objects; + } + + /** + * Create the correct {@link InputEntity} for this input source given a split on a {@link CloudObjectLocation}. This + * is called internally by {@link #formattableReader} and operates on the output of {@link #createSplits}. + */ + protected abstract T createEntity(InputSplit split); + + /** + * Create a stream of {@link CloudObjectLocation} splits by listing objects that appear under {@link #prefixes} using + * this input sources backend API. This is called internally by {@link #createSplits} and {@link #estimateNumSplits}, + * only if {@link #prefixes} is set, otherwise the splits are created directly from {@link #uris} or {@link #objects}. + * Calling if {@link #prefixes} is not set is likely to either lead to an empty iterator or null pointer exception. + */ + protected abstract Stream> getPrefixesSplitStream(); + + @Override + public Stream> createSplits( + InputFormat inputFormat, + @Nullable SplitHintSpec splitHintSpec + ) + { + if (!CollectionUtils.isNullOrEmpty(objects)) { + return objects.stream().map(InputSplit::new); + } + if (!CollectionUtils.isNullOrEmpty(uris)) { + return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new); + } + + return getPrefixesSplitStream(); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + if (!CollectionUtils.isNullOrEmpty(objects)) { + return objects.size(); + } + + if (!CollectionUtils.isNullOrEmpty(uris)) { + return uris.size(); + } + + return Ints.checkedCast(getPrefixesSplitStream().count()); + } + + @Override + public boolean needsFormat() + { + return true; + } + + @Override + protected InputSourceReader formattableReader( + InputRowSchema inputRowSchema, + InputFormat inputFormat, + @Nullable File temporaryDirectory + ) + { + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + createSplits(inputFormat, null).map(this::createEntity), + temporaryDirectory + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CloudObjectInputSource that = (CloudObjectInputSource) o; + return Objects.equals(uris, that.uris) && + Objects.equals(prefixes, that.prefixes) && + Objects.equals(objects, that.objects); + } + + @Override + public int hashCode() + { + return Objects.hash(uris, prefixes, objects); + } + + private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException + { + if (clause) { + throw new IllegalArgumentException("exactly one of either uris or prefixes or objects must be specified"); + } + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java index d9810072258..e79b04edaf4 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java @@ -22,6 +22,7 @@ package org.apache.druid.data.input.impl; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.StringUtils; import java.net.URI; @@ -45,6 +46,14 @@ import java.util.Objects; */ public class CloudObjectLocation { + public static URI validateUriScheme(String scheme, URI uri) + { + if (!scheme.equalsIgnoreCase(uri.getScheme())) { + throw new IAE("Invalid URI scheme [%s] must be [%s]", uri.toString(), scheme); + } + return uri; + } + private final String bucket; private final String path; @@ -125,5 +134,4 @@ public class CloudObjectLocation { return Objects.hash(bucket, path); } - } diff --git a/docs/development/extensions-core/google.md b/docs/development/extensions-core/google.md index 6dafa6ad046..9e5c63abf72 100644 --- a/docs/development/extensions-core/google.md +++ b/docs/development/extensions-core/google.md @@ -60,12 +60,57 @@ This extension also provides an input source for Druid native batch ingestion to ... ``` +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "google", + "prefixes": ["gs://foo/bar", "gs://bar/foo"] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "google", + "objects": [ + { "bucket": "foo", "path": "bar/file1.json"}, + { "bucket": "bar", "path": "foo/file2.json"} + ] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + |property|description|default|required?| |--------|-----------|-------|---------| |type|This should be `google`.|N/A|yes| -|uris|JSON array of URIs where Google Cloud Storage files to be ingested are located.|N/A|yes| +|uris|JSON array of URIs where Google Cloud Storage objects to be ingested are located.|N/A|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set| +|objects|JSON array of Google Cloud Storage objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set| +Google Cloud Storage object: + +|property|description|default|required?| +|--------|-----------|-------|---------| +|bucket|Name of the Google Cloud Storage bucket|N/A|yes| +|path|The path where data is located.|N/A|yes| + ## Firehose diff --git a/extensions-core/google-extensions/pom.xml b/extensions-core/google-extensions/pom.xml index d98910b2718..68e42cd9f24 100644 --- a/extensions-core/google-extensions/pom.xml +++ b/extensions-core/google-extensions/pom.xml @@ -124,6 +124,13 @@ provided + + org.apache.druid + druid-core + ${project.parent.version} + test + test-jar + org.apache.druid druid-processing @@ -158,5 +165,11 @@ jackson-module-guice test + + joda-time + joda-time + test + + diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 77a1a2e2f30..4c1b53abc2b 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -21,6 +21,7 @@ package org.apache.druid.data.input.google; import com.google.common.base.Predicate; import org.apache.druid.data.input.RetryingInputEntity; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; @@ -33,36 +34,33 @@ import java.net.URI; public class GoogleCloudStorageEntity extends RetryingInputEntity { - private final GoogleStorage storage; - private final URI uri; + private final CloudObjectLocation location; + private final GoogleByteSource byteSource; - GoogleCloudStorageEntity(GoogleStorage storage, URI uri) + GoogleCloudStorageEntity(GoogleStorage storage, CloudObjectLocation location) { - this.storage = storage; - this.uri = uri; + this.location = location; + this.byteSource = new GoogleByteSource(storage, location.getBucket(), location.getPath()); } @Nullable @Override public URI getUri() { - return uri; + return location.toUri(GoogleCloudStorageInputSource.SCHEME); } @Override protected InputStream readFrom(long offset) throws IOException { // Get data of the given object and open an input stream - final String bucket = uri.getAuthority(); - final String key = StringUtils.maybeRemoveLeadingSlash(uri.getPath()); - final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key); return byteSource.openStream(offset); } @Override protected String getPath() { - return StringUtils.maybeRemoveLeadingSlash(uri.getPath()); + return StringUtils.maybeRemoveLeadingSlash(byteSource.getPath()); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java index 5d5711ff72d..b5b8bcd4a62 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSource.java @@ -22,104 +22,78 @@ package org.apache.druid.data.input.google; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.AbstractInputSource; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; -import org.apache.druid.data.input.SplitHintSpec; -import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.CloudObjectInputSource; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.storage.google.GoogleUtils; import javax.annotation.Nullable; -import java.io.File; import java.net.URI; import java.util.List; -import java.util.Objects; import java.util.stream.Stream; +import java.util.stream.StreamSupport; -public class GoogleCloudStorageInputSource extends AbstractInputSource implements SplittableInputSource +public class GoogleCloudStorageInputSource extends CloudObjectInputSource { + static final String SCHEME = "gs"; + private final GoogleStorage storage; - private final List uris; @JsonCreator public GoogleCloudStorageInputSource( @JacksonInject GoogleStorage storage, - @JsonProperty("uris") List uris + @JsonProperty("uris") @Nullable List uris, + @JsonProperty("prefixes") @Nullable List prefixes, + @JsonProperty("objects") @Nullable List objects ) { + super(SCHEME, uris, prefixes, objects); this.storage = storage; - this.uris = uris; - } - - @JsonProperty("uris") - public List getUris() - { - return uris; - } - - - @Override - public Stream> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) - { - return uris.stream().map(InputSplit::new); } @Override - public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + protected GoogleCloudStorageEntity createEntity(InputSplit split) { - return uris.size(); + return new GoogleCloudStorageEntity(storage, split.get()); } @Override - public SplittableInputSource withSplit(InputSplit split) + protected Stream> getPrefixesSplitStream() { - return new GoogleCloudStorageInputSource(storage, ImmutableList.of(split.get())); + return StreamSupport.stream(storageObjectIterable().spliterator(), false) + .map(this::byteSourceFromStorageObject) + .map(InputSplit::new); } @Override - public boolean needsFormat() + public SplittableInputSource withSplit(InputSplit split) { - return true; + return new GoogleCloudStorageInputSource(storage, null, null, ImmutableList.of(split.get())); + } + + private CloudObjectLocation byteSourceFromStorageObject(final StorageObject storageObject) + { + return new CloudObjectLocation(storageObject.getBucket(), storageObject.getName()); + } + + private Iterable storageObjectIterable() + { + return () -> + GoogleUtils.lazyFetchingStorageObjectsIterator(storage, getPrefixes().iterator(), RetryUtils.DEFAULT_MAX_TRIES); } @Override - protected InputSourceReader formattableReader( - InputRowSchema inputRowSchema, - InputFormat inputFormat, - @Nullable File temporaryDirectory - ) + public String toString() { - return new InputEntityIteratingReader( - inputRowSchema, - inputFormat, - createSplits(inputFormat, null).map(split -> new GoogleCloudStorageEntity( - storage, - split.get() - )), - temporaryDirectory - ); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - GoogleCloudStorageInputSource that = (GoogleCloudStorageInputSource) o; - return Objects.equals(uris, that.uris); - } - - @Override - public int hashCode() - { - return Objects.hash(uris); + return "GoogleCloudStorageInputSource{" + + "uris=" + getUris() + + ", prefixes=" + getPrefixes() + + ", objects=" + getObjects() + + '}'; } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java index 6066eb2c13e..977353f9f20 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleByteSource.java @@ -23,6 +23,7 @@ import com.google.common.io.ByteSource; import java.io.IOException; import java.io.InputStream; +import java.util.Objects; public class GoogleByteSource extends ByteSource { @@ -37,6 +38,16 @@ public class GoogleByteSource extends ByteSource this.path = path; } + public String getBucket() + { + return bucket; + } + + public String getPath() + { + return path; + } + @Override public InputStream openStream() throws IOException { @@ -47,4 +58,24 @@ public class GoogleByteSource extends ByteSource { return storage.get(bucket, path, start); } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + GoogleByteSource that = (GoogleByteSource) o; + return Objects.equals(bucket, that.bucket) && + Objects.equals(path, that.path); + } + + @Override + public int hashCode() + { + return Objects.hash(bucket, path); + } } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 4acd9e05069..9fbb23f040c 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -20,12 +20,23 @@ package org.apache.druid.storage.google; import com.google.api.client.http.HttpResponseException; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; import com.google.common.base.Predicate; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.StringUtils; import java.io.IOException; +import java.net.URI; +import java.util.Iterator; +import java.util.NoSuchElementException; public class GoogleUtils { + public static final Predicate GOOGLE_RETRY = GoogleUtils::isRetryable; + public static boolean isRetryable(Throwable t) { if (t instanceof HttpResponseException) { @@ -35,5 +46,103 @@ public class GoogleUtils return t instanceof IOException; } - public static final Predicate GOOGLE_RETRY = e -> isRetryable(e); + private static T retryGoogleCloudStorageOperation(RetryUtils.Task f) throws Exception + { + return RetryUtils.retry(f, GOOGLE_RETRY, RetryUtils.DEFAULT_MAX_TRIES); + } + + + public static Iterator lazyFetchingStorageObjectsIterator( + final GoogleStorage storage, + final Iterator uris, + final long maxListingLength + ) + { + return new Iterator() + { + private Storage.Objects.List listRequest; + private Objects results; + private URI currentUri; + private String currentBucket; + private String currentPrefix; + private String nextPageToken; + private Iterator storageObjectsIterator; + + { + nextPageToken = null; + prepareNextRequest(); + fetchNextBatch(); + } + + private void prepareNextRequest() + { + try { + currentUri = uris.next(); + currentBucket = currentUri.getAuthority(); + currentPrefix = StringUtils.maybeRemoveLeadingSlash(currentUri.getPath()); + nextPageToken = null; + listRequest = storage.list(currentBucket) + .setPrefix(currentPrefix) + .setMaxResults(maxListingLength); + + } + catch (IOException io) { + throw new RuntimeException(io); + } + } + + private void fetchNextBatch() + { + try { + listRequest.setPageToken(nextPageToken); + results = GoogleUtils.retryGoogleCloudStorageOperation(() -> listRequest.execute()); + storageObjectsIterator = results.getItems().iterator(); + nextPageToken = results.getNextPageToken(); + } + catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + @Override + public boolean hasNext() + { + return storageObjectsIterator.hasNext() || nextPageToken != null || uris.hasNext(); + } + + @Override + public StorageObject next() + { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + while (storageObjectsIterator.hasNext()) { + final StorageObject next = storageObjectsIterator.next(); + // list with prefix can return directories, but they should always end with `/`, ignore them + if (!next.getName().endsWith("/")) { + return next; + } + } + + if (nextPageToken != null) { + fetchNextBatch(); + } else if (uris.hasNext()) { + prepareNextRequest(); + fetchNextBatch(); + } + + if (!storageObjectsIterator.hasNext()) { + throw new ISE( + "Failed to further iterate on bucket[%s] and prefix[%s]. The last page token was [%s]", + currentBucket, + currentPrefix, + nextPageToken + ); + } + + return next(); + } + }; + } } diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index a3b77afd890..2146c5f4157 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -23,66 +23,271 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.guice.ObjectMapperModule; +import com.google.api.services.storage.Storage; +import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.StorageObject; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Provides; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.initialization.DruidModule; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.JSONPathSpec; import org.apache.druid.storage.google.GoogleStorage; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.utils.CompressionUtils; +import org.easymock.EasyMock; +import org.joda.time.DateTime; import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; -public class GoogleCloudStorageInputSourceTest +public class GoogleCloudStorageInputSourceTest extends InitializedNullHandlingTest { - private static final GoogleStorage STORAGE = new GoogleStorage(null); + private static final GoogleStorage STORAGE = EasyMock.createMock(GoogleStorage.class); + + private static final List EXPECTED_URIS = Arrays.asList( + URI.create("gs://foo/bar/file.csv"), + URI.create("gs://bar/foo/file2.csv") + ); + + private static final List EXPECTED_COMPRESSED_URIS = Arrays.asList( + URI.create("gs://foo/bar/file.csv.gz"), + URI.create("gs://bar/foo/file2.csv.gz") + ); + + private static final List EXPECTED_OBJECTS = + EXPECTED_URIS.stream().map(CloudObjectLocation::new).collect(Collectors.toList()); + + private static final List PREFIXES = Arrays.asList( + URI.create("gs://foo/bar"), + URI.create("gs://bar/foo") + ); + + private static final List EXPECTED_LOCATION = + ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv")); + + private static final DateTime NOW = DateTimes.nowUtc(); + private static final byte[] CONTENT = + StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); @Test public void testSerde() throws Exception { final ObjectMapper mapper = createGoogleObjectMapper(); - - final List uris = Arrays.asList( - new URI("gs://foo/bar/file.gz"), - new URI("gs://bar/foo/file2.gz") - ); - - final List prefixes = Arrays.asList( - new URI("gs://foo/bar"), - new URI("gs://bar/foo") - ); - - final GoogleCloudStorageInputSource withUris = new GoogleCloudStorageInputSource(STORAGE, uris); + final GoogleCloudStorageInputSource withUris = + new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); final GoogleCloudStorageInputSource serdeWithUris = mapper.readValue(mapper.writeValueAsString(withUris), GoogleCloudStorageInputSource.class); Assert.assertEquals(withUris, serdeWithUris); } + @Test + public void testSerdePrefixes() throws Exception + { + final ObjectMapper mapper = createGoogleObjectMapper(); + final GoogleCloudStorageInputSource withPrefixes = + new GoogleCloudStorageInputSource(STORAGE, ImmutableList.of(), PREFIXES, null); + final GoogleCloudStorageInputSource serdeWithPrefixes = + mapper.readValue(mapper.writeValueAsString(withPrefixes), GoogleCloudStorageInputSource.class); + Assert.assertEquals(withPrefixes, serdeWithPrefixes); + } + + @Test + public void testSerdeObjects() throws Exception + { + final ObjectMapper mapper = createGoogleObjectMapper(); + final GoogleCloudStorageInputSource withObjects = + new GoogleCloudStorageInputSource( + STORAGE, + null, + null, + ImmutableList.of(new CloudObjectLocation("foo", "bar/file.gz")) + ); + final GoogleCloudStorageInputSource serdeWithObjects = + mapper.readValue(mapper.writeValueAsString(withObjects), GoogleCloudStorageInputSource.class); + Assert.assertEquals(withObjects, serdeWithObjects); + } + @Test public void testWithUrisSplit() { - final List uris = Arrays.asList( - URI.create("gs://foo/bar/file.gz"), - URI.create("gs://bar/foo/file2.gz") - ); - GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource(STORAGE, uris); + GoogleCloudStorageInputSource inputSource = + new GoogleCloudStorageInputSource(STORAGE, EXPECTED_URIS, ImmutableList.of(), null); - Stream> splits = inputSource.createSplits( + Stream> splits = inputSource.createSplits( new JsonInputFormat(JSONPathSpec.DEFAULT, null), null ); - Assert.assertEquals(uris, splits.map(InputSplit::get).collect(Collectors.toList())); + Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList())); + } + + @Test + public void testWithPrefixesSplit() throws IOException + { + EasyMock.reset(STORAGE); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); + addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + EasyMock.replay(STORAGE); + + GoogleCloudStorageInputSource inputSource = + new GoogleCloudStorageInputSource(STORAGE, null, PREFIXES, null); + + Stream> splits = inputSource.createSplits( + new JsonInputFormat(JSONPathSpec.DEFAULT, null), + null + ); + + Assert.assertEquals(EXPECTED_OBJECTS, splits.map(InputSplit::get).collect(Collectors.toList())); + } + + @Test + public void testReader() throws IOException + { + EasyMock.reset(STORAGE); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); + addExpectedGetObjectMock(EXPECTED_URIS.get(0)); + addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + addExpectedGetObjectMock(EXPECTED_URIS.get(1)); + EasyMock.replay(STORAGE); + + GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( + STORAGE, + null, + PREFIXES, + null + ); + + InputRowSchema someSchema = new InputRowSchema( + new TimestampSpec("time", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))), + ImmutableList.of("count") + ); + + InputSourceReader reader = inputSource.reader( + someSchema, + new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), + null + ); + + CloseableIterator iterator = reader.read(); + + while (iterator.hasNext()) { + InputRow nextRow = iterator.next(); + Assert.assertEquals(NOW, nextRow.getTimestamp()); + Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0)); + Assert.assertEquals("world", nextRow.getDimension("dim2").get(0)); + } + } + + @Test + public void testCompressedReader() throws IOException + { + EasyMock.reset(STORAGE); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0))); + addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0)); + addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(1))); + addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1)); + EasyMock.replay(STORAGE); + + GoogleCloudStorageInputSource inputSource = new GoogleCloudStorageInputSource( + STORAGE, + null, + PREFIXES, + null + ); + + InputRowSchema someSchema = new InputRowSchema( + new TimestampSpec("time", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))), + ImmutableList.of("count") + ); + + InputSourceReader reader = inputSource.reader( + someSchema, + new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), + null + ); + + CloseableIterator iterator = reader.read(); + + while (iterator.hasNext()) { + InputRow nextRow = iterator.next(); + Assert.assertEquals(NOW, nextRow.getTimestamp()); + Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0)); + Assert.assertEquals("world", nextRow.getDimension("dim2").get(0)); + } + } + + private static void addExpectedPrefixObjects(URI prefix, List uris) throws IOException + { + final String bucket = prefix.getAuthority(); + + Storage.Objects.List listRequest = EasyMock.createMock(Storage.Objects.List.class); + EasyMock.expect(STORAGE.list(EasyMock.eq(bucket))).andReturn(listRequest).once(); + EasyMock.expect(listRequest.setPageToken(EasyMock.anyString())).andReturn(listRequest).once(); + EasyMock.expect(listRequest.setMaxResults(EasyMock.anyLong())).andReturn(listRequest).once(); + EasyMock.expect(listRequest.setPrefix(EasyMock.eq(StringUtils.maybeRemoveLeadingSlash(prefix.getPath())))) + .andReturn(listRequest) + .once(); + + List mockObjects = new ArrayList<>(); + for (URI uri : uris) { + StorageObject s = new StorageObject(); + s.setBucket(bucket); + s.setName(uri.getPath()); + mockObjects.add(s); + } + Objects response = new Objects(); + response.setItems(mockObjects); + EasyMock.expect(listRequest.execute()).andReturn(response).once(); + EasyMock.expect(response.getItems()).andReturn(mockObjects).once(); + + EasyMock.replay(listRequest); + } + + private static void addExpectedGetObjectMock(URI uri) throws IOException + { + CloudObjectLocation location = new CloudObjectLocation(uri); + + EasyMock.expect( + STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) + ).andReturn(new ByteArrayInputStream(CONTENT)).once(); + } + + private static void addExpectedGetCompressedObjectMock(URI uri) throws IOException + { + CloudObjectLocation location = new CloudObjectLocation(uri); + + ByteArrayOutputStream gzipped = new ByteArrayOutputStream(); + CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped); + + EasyMock.expect( + STORAGE.get(EasyMock.eq(location.getBucket()), EasyMock.eq(location.getPath()), EasyMock.eq(0L)) + ).andReturn(new ByteArrayInputStream(gzipped.toByteArray())).once(); } public static ObjectMapper createGoogleObjectMapper() diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml index 473002ccf8a..3a70b4640b9 100644 --- a/extensions-core/s3-extensions/pom.xml +++ b/extensions-core/s3-extensions/pom.xml @@ -113,11 +113,6 @@ aws-java-sdk-s3 provided - - joda-time - joda-time - provided - org.apache.druid @@ -155,6 +150,11 @@ easymock test + + joda-time + joda-time + test + diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java index 00efaaa0691..9c05cc31d34 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java @@ -26,6 +26,7 @@ import com.google.common.base.Predicate; import org.apache.druid.data.input.RetryingInputEntity; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -47,7 +48,7 @@ public class S3Entity extends RetryingInputEntity @Override public URI getUri() { - return null; + return object.toUri(S3StorageDruidModule.SCHEME); } @Override diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java index 0080ad8b7c6..9642ebec108 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -25,35 +25,25 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import org.apache.druid.data.input.AbstractInputSource; -import org.apache.druid.data.input.InputFormat; -import org.apache.druid.data.input.InputRowSchema; -import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; -import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.CloudObjectInputSource; import org.apache.druid.data.input.impl.CloudObjectLocation; -import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.storage.s3.S3StorageDruidModule; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; -import org.apache.druid.utils.CollectionUtils; import javax.annotation.Nullable; -import java.io.File; import java.net.URI; import java.util.List; -import java.util.Objects; import java.util.stream.Stream; import java.util.stream.StreamSupport; -public class S3InputSource extends AbstractInputSource implements SplittableInputSource +public class S3InputSource extends CloudObjectInputSource { private static final int MAX_LISTING_LENGTH = 1024; private final ServerSideEncryptingAmazonS3 s3Client; - private final List uris; - private final List prefixes; - private final List objects; @JsonCreator public S3InputSource( @@ -63,144 +53,42 @@ public class S3InputSource extends AbstractInputSource implements SplittableInpu @JsonProperty("objects") @Nullable List objects ) { + super(S3StorageDruidModule.SCHEME, uris, prefixes, objects); this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client"); - this.uris = uris; - this.prefixes = prefixes; - this.objects = objects; - - if (!CollectionUtils.isNullOrEmpty(objects)) { - throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes)); - } else if (!CollectionUtils.isNullOrEmpty(uris)) { - throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes)); - uris.forEach(S3Utils::checkURI); - } else if (!CollectionUtils.isNullOrEmpty(prefixes)) { - prefixes.forEach(S3Utils::checkURI); - } else { - throwIfIllegalArgs(true); - } - } - - @JsonProperty - public List getUris() - { - return uris; - } - - @JsonProperty - public List getPrefixes() - { - return prefixes; - } - - @JsonProperty - public List getObjects() - { - return objects; } @Override - public Stream> createSplits( - InputFormat inputFormat, - @Nullable SplitHintSpec splitHintSpec - ) + protected S3Entity createEntity(InputSplit split) { - if (objects != null) { - return objects.stream().map(InputSplit::new); - } - - if (uris != null) { - return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new); - } + return new S3Entity(s3Client, split.get()); + } + @Override + protected Stream> getPrefixesSplitStream() + { return StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false) .map(S3Utils::summaryToCloudObjectLocation) .map(InputSplit::new); } - @Override - public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) - { - if (objects != null) { - return objects.size(); - } - - if (uris != null) { - return uris.size(); - } - - return (int) StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false).count(); - } - @Override public SplittableInputSource withSplit(InputSplit split) { return new S3InputSource(s3Client, null, null, ImmutableList.of(split.get())); } - @Override - public boolean needsFormat() - { - return true; - } - - @Override - protected InputSourceReader formattableReader( - InputRowSchema inputRowSchema, - InputFormat inputFormat, - @Nullable File temporaryDirectory - ) - { - return new InputEntityIteratingReader( - inputRowSchema, - inputFormat, - // formattableReader() is supposed to be called in each task that actually creates segments. - // The task should already have only one split in parallel indexing, - // while there's no need to make splits using splitHintSpec in sequential indexing. - createSplits(inputFormat, null).map(split -> new S3Entity(s3Client, split.get())), - temporaryDirectory - ); - } - - @Override - public boolean equals(Object o) - { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - S3InputSource that = (S3InputSource) o; - return Objects.equals(uris, that.uris) && - Objects.equals(prefixes, that.prefixes) && - Objects.equals(objects, that.objects); - } - - @Override - public int hashCode() - { - return Objects.hash(uris, prefixes, objects); - } - @Override public String toString() { return "S3InputSource{" + - "uris=" + uris + - ", prefixes=" + prefixes + - ", objects=" + objects + + "uris=" + getUris() + + ", prefixes=" + getPrefixes() + + ", objects=" + getObjects() + '}'; } private Iterable getIterableObjectsFromPrefixes() { - return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, prefixes.iterator(), MAX_LISTING_LENGTH); - } - - private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException - { - if (clause) { - throw new IllegalArgumentException("exactly one of either uris or prefixes or objects must be specified"); - } + return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, getPrefixes().iterator(), MAX_LISTING_LENGTH); } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 5aaa0f18770..ae736770918 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -34,7 +34,6 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.Iterators; import org.apache.druid.data.input.impl.CloudObjectLocation; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RetryUtils; @@ -289,10 +288,8 @@ public class S3Utils { if (uri.getScheme().equalsIgnoreCase(S3StorageDruidModule.SCHEME_S3_ZIP)) { uri = URI.create(SCHEME + uri.toString().substring(S3StorageDruidModule.SCHEME_S3_ZIP.length())); - } else if (!SCHEME.equalsIgnoreCase(uri.getScheme())) { - throw new IAE("Invalid URI scheme [%s] must be [%s]", uri.toString(), SCHEME); } - return uri; + return CloudObjectLocation.validateUriScheme(SCHEME, uri); } // Copied from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder()