From 4458113375ee24cc11d8d5a3a6ddd3577d0571be Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Mon, 25 Nov 2019 22:31:19 -0800 Subject: [PATCH] S3 input source (#8903) * add s3 input source for native batch ingestion * add docs * fixes * checkstyle * lazy splits * fixes and hella tests * fix it * re-use better iterator * use key * javadoc and checkstyle * exception * oops * refactor to use S3Coords instead of URI * remove unused code, add retrying stream to handle s3 stream * remove unused parameter * update to latest master * use list of objects instead of object * serde test * refactor and such * now with the ability to compile * fix signature and javadocs * fix conflicts yet again, fix S3 uri stuffs * more tests, enforce uri for bucket * javadoc * oops * abstract class instead of interface * null or empty * better error --- .../apache/druid/data/input/InputSource.java | 6 +- .../druid/data/input/RetryingInputEntity.java | 33 +- .../data/input/impl/CloudObjectLocation.java | 129 +++++ .../druid/data/input/impl/HttpEntity.java | 16 +- .../druid/java/util/common/StringUtils.java | 10 + .../apache/druid/utils/CollectionUtils.java | 7 + .../input/impl/CloudObjectLocationTest.java | 116 +++++ docs/development/extensions-core/s3.md | 76 +++ .../StaticAzureBlobStoreFirehoseFactory.java | 7 +- .../StaticCloudFilesFirehoseFactory.java | 2 +- .../google/GoogleCloudStorageEntity.java | 16 +- .../StaticGoogleBlobStoreFirehoseFactory.java | 5 +- .../google/GoogleDataSegmentPuller.java | 11 +- .../druid/storage/google/GoogleUtils.java | 6 - .../inputsource/hdfs/HdfsInputEntity.java | 13 +- extensions-core/s3-extensions/pom.xml | 13 +- .../apache/druid/data/input/s3/S3Entity.java | 86 ++++ .../druid/data/input/s3/S3InputSource.java | 206 ++++++++ .../input/s3/S3InputSourceDruidModule.java | 50 ++ .../firehose/s3/StaticS3FirehoseFactory.java | 29 +- .../druid/storage/s3/S3DataSegmentPuller.java | 70 +-- .../apache/druid/storage/s3/S3LoadSpec.java | 5 +- .../storage/s3/S3StorageDruidModule.java | 18 +- .../s3/S3TimestampVersionedDataFinder.java | 58 +-- .../org/apache/druid/storage/s3/S3Utils.java | 148 +++++- ...rg.apache.druid.initialization.DruidModule | 1 + .../data/input/s3/S3InputSourceTest.java | 449 ++++++++++++++++++ .../s3/StaticS3FirehoseFactoryTest.java | 91 +--- .../storage/s3/S3DataSegmentArchiverTest.java | 6 +- .../storage/s3/S3DataSegmentPullerTest.java | 5 +- 30 files changed, 1397 insertions(+), 291 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java create mode 100644 core/src/test/java/org/apache/druid/data/input/impl/CloudObjectLocationTest.java create mode 100644 extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java create mode 100644 extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java create mode 100644 extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java create mode 100644 extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java diff --git a/core/src/main/java/org/apache/druid/data/input/InputSource.java b/core/src/main/java/org/apache/druid/data/input/InputSource.java index 56992a12f2a..8932c857928 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/InputSource.java @@ -75,9 +75,5 @@ public interface InputSource * @param inputFormat to parse data. It can be null if {@link #needsFormat()} = true * @param temporaryDirectory to store temp data. It will be cleaned up automatically once the task is finished. */ - InputSourceReader reader( - InputRowSchema inputRowSchema, - @Nullable InputFormat inputFormat, - File temporaryDirectory - ); + InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory); } diff --git a/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java b/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java index 6a47f0b5593..3c0aa441500 100644 --- a/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/RetryingInputEntity.java @@ -19,47 +19,58 @@ package org.apache.druid.data.input; -import com.google.common.base.Predicate; import org.apache.druid.data.input.impl.RetryingInputStream; import org.apache.druid.data.input.impl.prefetch.ObjectOpenFunction; import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.utils.CompressionUtils; import java.io.IOException; import java.io.InputStream; -public interface RetryingInputEntity extends InputEntity +public abstract class RetryingInputEntity implements InputEntity { + /** + * Open a {@link RetryingInputStream} wrapper for an underlying input stream, optionally decompressing the retrying + * stream if the file extension matches a known compression, otherwise passing through the retrying stream directly. + */ @Override - default InputStream open() throws IOException + public InputStream open() throws IOException { - return new RetryingInputStream<>( + RetryingInputStream retryingInputStream = new RetryingInputStream<>( this, new RetryingInputEntityOpenFunction(), getRetryCondition(), RetryUtils.DEFAULT_MAX_TRIES ); + return CompressionUtils.decompress(retryingInputStream, getPath()); } /** - * Directly opens an {@link InputStream} on the input entity. + * Directly opens an {@link InputStream} on the input entity. Decompression should be handled externally, and is + * handled by the default implementation of {@link #open}, so this should return the raw stream for the object. */ - default InputStream readFromStart() throws IOException + protected InputStream readFromStart() throws IOException { return readFrom(0); } /** - * Directly opens an {@link InputStream} starting at the given offset on the input entity. + * Directly opens an {@link InputStream} starting at the given offset on the input entity. Decompression should be + * handled externally, and is handled by the default implementation of {@link #open},this should return the raw stream + * for the object. * * @param offset an offset to start reading from. A non-negative integer counting * the number of bytes from the beginning of the entity */ - InputStream readFrom(long offset) throws IOException; + protected abstract InputStream readFrom(long offset) throws IOException; - @Override - Predicate getRetryCondition(); + /** + * Get path name for this entity, used by the default implementation of {@link #open} to determine if the underlying + * stream needs decompressed, based on file extension of the path + */ + protected abstract String getPath(); - class RetryingInputEntityOpenFunction implements ObjectOpenFunction + private static class RetryingInputEntityOpenFunction implements ObjectOpenFunction { @Override public InputStream open(RetryingInputEntity object) throws IOException diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java new file mode 100644 index 00000000000..d9810072258 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectLocation.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.java.util.common.StringUtils; + +import java.net.URI; +import java.util.Objects; + +/** + * Common type for 'bucket' and 'path' concept of cloud objects to allow code sharing between cloud specific + * implementations. {@link #bucket} and {@link #path} should NOT be URL encoded. + * + * The intention is that this is used as a common representation for storage objects as an alternative to dealing in + * {@link URI} directly, but still provide a mechanism to round-trip with a URI. + * + * In common clouds, bucket names must be dns compliant: + * https://docs.aws.amazon.com/AmazonS3/latest/dev/BucketRestrictions.html + * https://docs.microsoft.com/en-us/rest/api/storageservices/naming-and-referencing-containers--blobs--and-metadata + * https://cloud.google.com/storage/docs/naming + * + * The constructor ensures that bucket names are DNS compliant by checking that the URL encoded form of the bucket + * matches the supplied value. Technically it should probably confirm that the bucket is also all lower-case, but + * S3 has a legacy mode where buckets did not have to be compliant so we can't enforce that here unfortunately. + */ +public class CloudObjectLocation +{ + private final String bucket; + private final String path; + + @JsonCreator + public CloudObjectLocation(@JsonProperty("bucket") String bucket, @JsonProperty("path") String path) + { + this.bucket = Preconditions.checkNotNull(StringUtils.maybeRemoveTrailingSlash(bucket)); + this.path = Preconditions.checkNotNull(StringUtils.maybeRemoveLeadingSlash(path)); + Preconditions.checkArgument( + this.bucket.equals(StringUtils.urlEncode(this.bucket)), + "bucket must follow DNS-compliant naming conventions" + ); + } + + public CloudObjectLocation(URI uri) + { + this(uri.getHost(), uri.getPath()); + } + + /** + * Given a scheme, encode {@link #bucket} and {@link #path} into a {@link URI}. + * + * In all clouds bucket names must be dns compliant, so it does not require encoding + * There is no such restriction on object names, so they will be URL encoded when constructing the URI + */ + public URI toUri(String scheme) + { + // Encode path, except leave '/' characters unencoded + return URI.create( + StringUtils.format( + "%s://%s/%s", + scheme, + bucket, + StringUtils.replace(StringUtils.urlEncode(path), "%2F", "/") + ) + ); + } + + @JsonProperty + public String getBucket() + { + return bucket; + } + + @JsonProperty + public String getPath() + { + return path; + } + + @Override + public String toString() + { + return "CloudObjectLocation{" + + "bucket='" + bucket + '\'' + + ", path='" + path + '\'' + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final CloudObjectLocation that = (CloudObjectLocation) o; + return Objects.equals(bucket, that.bucket) && + Objects.equals(path, that.path); + } + + @Override + public int hashCode() + { + return Objects.hash(bucket, path); + } + +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java index 22ce64afb8c..e5f5e240826 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpEntity.java @@ -27,7 +27,6 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.metadata.PasswordProvider; -import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; import java.io.IOException; @@ -36,7 +35,7 @@ import java.net.URI; import java.net.URLConnection; import java.util.Base64; -public class HttpEntity implements RetryingInputEntity +public class HttpEntity extends RetryingInputEntity { private static final Logger LOG = new Logger(HttpEntity.class); @@ -64,12 +63,15 @@ public class HttpEntity implements RetryingInputEntity } @Override - public InputStream readFrom(long offset) throws IOException + protected InputStream readFrom(long offset) throws IOException { - return CompressionUtils.decompress( - openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset), - uri.toString() - ); + return openInputStream(uri, httpAuthenticationUsername, httpAuthenticationPasswordProvider, offset); + } + + @Override + protected String getPath() + { + return uri.getPath(); } @Override diff --git a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java index d5e39e31e4d..7485802ff36 100644 --- a/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java +++ b/core/src/main/java/org/apache/druid/java/util/common/StringUtils.java @@ -237,6 +237,16 @@ public class StringUtils } } + public static String maybeRemoveLeadingSlash(String s) + { + return s != null && s.startsWith("/") ? s.substring(1) : s; + } + + public static String maybeRemoveTrailingSlash(String s) + { + return s != null && s.endsWith("/") ? s.substring(0, s.length() - 1) : s; + } + /** * Removes all occurrences of the given char from the given string. This method is an optimal version of * {@link String#replace(CharSequence, CharSequence) s.replace("c", "")}. diff --git a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java index 530feb855b6..02d55d06a5d 100644 --- a/core/src/main/java/org/apache/druid/utils/CollectionUtils.java +++ b/core/src/main/java/org/apache/druid/utils/CollectionUtils.java @@ -23,11 +23,13 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import org.apache.druid.java.util.common.ISE; +import javax.annotation.Nullable; import java.util.AbstractCollection; import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Spliterator; import java.util.TreeSet; @@ -116,6 +118,11 @@ public final class CollectionUtils return result; } + public static boolean isNullOrEmpty(@Nullable List list) + { + return list == null || list.isEmpty(); + } + private CollectionUtils() { } diff --git a/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectLocationTest.java b/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectLocationTest.java new file mode 100644 index 00000000000..990fa0e1e08 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/impl/CloudObjectLocationTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.impl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.net.URI; + +public class CloudObjectLocationTest +{ + private static final ObjectMapper MAPPER = new ObjectMapper(); + private static final String SCHEME = "s3"; + private static final String BUCKET_NAME = "bucket"; + + private static final CloudObjectLocation LOCATION = + new CloudObjectLocation(BUCKET_NAME, "path/to/myobject"); + + private static final CloudObjectLocation LOCATION_EXTRA_SLASHES = + new CloudObjectLocation(BUCKET_NAME + '/', "/path/to/myobject"); + + private static final CloudObjectLocation LOCATION_URLENCODE = + new CloudObjectLocation(BUCKET_NAME, "path/to/myobject?question"); + + private static final CloudObjectLocation LOCATION_NON_ASCII = + new CloudObjectLocation(BUCKET_NAME, "pÄth/tø/myøbject"); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testSerde() throws Exception + { + Assert.assertEquals( + LOCATION, + MAPPER.readValue(MAPPER.writeValueAsString(LOCATION), CloudObjectLocation.class) + ); + + Assert.assertEquals( + LOCATION_EXTRA_SLASHES, + MAPPER.readValue(MAPPER.writeValueAsString(LOCATION_EXTRA_SLASHES), CloudObjectLocation.class) + ); + + Assert.assertEquals( + LOCATION_URLENCODE, + MAPPER.readValue(MAPPER.writeValueAsString(LOCATION_URLENCODE), CloudObjectLocation.class) + ); + + Assert.assertEquals( + LOCATION_NON_ASCII, + MAPPER.readValue(MAPPER.writeValueAsString(LOCATION_NON_ASCII), CloudObjectLocation.class) + ); + } + + @Test + public void testToUri() + { + Assert.assertEquals( + URI.create("s3://bucket/path/to/myobject"), + LOCATION.toUri(SCHEME) + ); + + Assert.assertEquals( + URI.create("s3://bucket/path/to/myobject"), + LOCATION_EXTRA_SLASHES.toUri(SCHEME) + ); + + Assert.assertEquals( + URI.create("s3://bucket/path/to/myobject%3Fquestion"), + LOCATION_URLENCODE.toUri(SCHEME) + ); + + Assert.assertEquals( + URI.create("s3://bucket/p%C3%84th/t%C3%B8/my%C3%B8bject"), + LOCATION_NON_ASCII.toUri(SCHEME) + ); + } + + @Test + public void testUriRoundTrip() + { + Assert.assertEquals(LOCATION, new CloudObjectLocation(LOCATION.toUri(SCHEME))); + Assert.assertEquals(LOCATION_EXTRA_SLASHES, new CloudObjectLocation(LOCATION_EXTRA_SLASHES.toUri(SCHEME))); + Assert.assertEquals(LOCATION_URLENCODE, new CloudObjectLocation(LOCATION_URLENCODE.toUri(SCHEME))); + Assert.assertEquals(LOCATION_NON_ASCII, new CloudObjectLocation(LOCATION_NON_ASCII.toUri(SCHEME))); + } + + @Test + public void testBucketName() + { + expectedException.expect(IllegalArgumentException.class); + CloudObjectLocation invalidBucket = new CloudObjectLocation("someBÜcket", "some/path"); + // will never get here + Assert.assertEquals(invalidBucket, new CloudObjectLocation(invalidBucket.toUri(SCHEME))); + } +} diff --git a/docs/development/extensions-core/s3.md b/docs/development/extensions-core/s3.md index d145f07d38a..fd82b37e8eb 100644 --- a/docs/development/extensions-core/s3.md +++ b/docs/development/extensions-core/s3.md @@ -98,6 +98,82 @@ You can enable [server-side encryption](https://docs.aws.amazon.com/AmazonS3/lat - kms: [Server-side encryption with AWS KMS–Managed Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html) - custom: [Server-side encryption with Customer-Provided Encryption Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html) + + + +## S3 batch ingestion input source + +This extension also provides an input source for Druid native batch ingestion to support reading objects directly from S3. Objects can be specified either via a list of S3 URI strings or a list of S3 location prefixes, which will attempt to list the contents and ingest all objects contained in the locations. The S3 input source is splittable and can be used by [native parallel index tasks](../../ingestion/native-batch.md#parallel-task), where each worker task of `index_parallel` will read a single object. + +Sample spec: + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "s3", + "uris": ["s3://foo/bar/file.json", "s3://bar/foo/file2.json"] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "s3", + "prefixes": ["s3://foo/bar", "s3://bar/foo"] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + + +```json +... + "ioConfig": { + "type": "index_parallel", + "inputSource": { + "type": "s3", + "objects": [ + { "bucket": "foo", "path": "bar/file1.json"}, + { "bucket": "bar", "path": "foo/file2.json"} + ] + }, + "inputFormat": { + "type": "json" + }, + ... + }, +... +``` + +|property|description|default|required?| +|--------|-----------|-------|---------| +|type|This should be `s3`.|N/A|yes| +|uris|JSON array of URIs where S3 objects to be ingested are located.|N/A|`uris` or `prefixes` or `objects` must be set| +|prefixes|JSON array of URI prefixes for the locations of S3 objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set| +|objects|JSON array of S3 Objects to be ingested.|N/A|`uris` or `prefixes` or `objects` must be set| + + +S3 Object: + +|property|description|default|required?| +|--------|-----------|-------|---------| +|bucket|Name of the S3 bucket|N/A|yes| +|path|The path where data is located.|N/A|yes| + ## StaticS3Firehose diff --git a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java index fa4c74e1ba4..c3919e9a0fd 100644 --- a/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java +++ b/extensions-contrib/azure-extensions/src/main/java/org/apache/druid/firehose/azure/StaticAzureBlobStoreFirehoseFactory.java @@ -28,6 +28,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.azure.AzureByteSource; import org.apache.druid.storage.azure.AzureStorage; import org.apache.druid.storage.azure.AzureUtils; @@ -50,7 +51,7 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi @JsonCreator public StaticAzureBlobStoreFirehoseFactory( - @JacksonInject("azureStorage") AzureStorage azureStorage, + @JacksonInject AzureStorage azureStorage, @JsonProperty("blobs") List blobs, @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, @@ -101,9 +102,7 @@ public class StaticAzureBlobStoreFirehoseFactory extends PrefetchableTextFilesFi private static AzureByteSource makeByteSource(AzureStorage azureStorage, AzureBlob object) { final String container = object.getContainer(); - final String path = object.getPath().startsWith("/") - ? object.getPath().substring(1) - : object.getPath(); + final String path = StringUtils.maybeRemoveLeadingSlash(object.getPath()); return new AzureByteSource(azureStorage, container, path); } diff --git a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java index fbab98a384e..f0de9f7e98d 100644 --- a/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java +++ b/extensions-contrib/cloudfiles-extensions/src/main/java/org/apache/druid/firehose/cloudfiles/StaticCloudFilesFirehoseFactory.java @@ -50,7 +50,7 @@ public class StaticCloudFilesFirehoseFactory extends PrefetchableTextFilesFireho @JsonCreator public StaticCloudFilesFirehoseFactory( - @JacksonInject("objectApi") CloudFilesApi cloudFilesApi, + @JacksonInject CloudFilesApi cloudFilesApi, @JsonProperty("blobs") List blobs, @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, @JsonProperty("maxFetchCapacityBytes") Long maxFetchCapacityBytes, diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java index 6144857d302..77a1a2e2f30 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/data/input/google/GoogleCloudStorageEntity.java @@ -21,17 +21,17 @@ package org.apache.druid.data.input.google; import com.google.common.base.Predicate; import org.apache.druid.data.input.RetryingInputEntity; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleUtils; -import org.apache.druid.utils.CompressionUtils; import javax.annotation.Nullable; import java.io.IOException; import java.io.InputStream; import java.net.URI; -public class GoogleCloudStorageEntity implements RetryingInputEntity +public class GoogleCloudStorageEntity extends RetryingInputEntity { private final GoogleStorage storage; private final URI uri; @@ -50,13 +50,19 @@ public class GoogleCloudStorageEntity implements RetryingInputEntity } @Override - public InputStream readFrom(long offset) throws IOException + protected InputStream readFrom(long offset) throws IOException { // Get data of the given object and open an input stream final String bucket = uri.getAuthority(); - final String key = GoogleUtils.extractGoogleCloudStorageObjectKey(uri); + final String key = StringUtils.maybeRemoveLeadingSlash(uri.getPath()); final GoogleByteSource byteSource = new GoogleByteSource(storage, bucket, key); - return CompressionUtils.decompress(byteSource.openStream(offset), uri.getPath()); + return byteSource.openStream(offset); + } + + @Override + protected String getPath() + { + return StringUtils.maybeRemoveLeadingSlash(uri.getPath()); } @Override diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java index 82116e884c1..22437d780e1 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/firehose/google/StaticGoogleBlobStoreFirehoseFactory.java @@ -27,6 +27,7 @@ import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.impl.StringInputRowParser; import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.storage.google.GoogleByteSource; import org.apache.druid.storage.google.GoogleStorage; import org.apache.druid.storage.google.GoogleUtils; @@ -87,9 +88,7 @@ public class StaticGoogleBlobStoreFirehoseFactory extends PrefetchableTextFilesF private GoogleByteSource createGoogleByteSource(GoogleBlob object) { final String bucket = object.getBucket(); - final String path = object.getPath().startsWith("/") - ? object.getPath().substring(1) - : object.getPath(); + final String path = StringUtils.maybeRemoveLeadingSlash(object.getPath()); return new GoogleByteSource(storage, bucket, path); } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java index c1825ec7713..dbc22bc5d87 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleDataSegmentPuller.java @@ -22,6 +22,7 @@ package org.apache.druid.storage.google; import com.google.common.base.Predicate; import com.google.inject.Inject; import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.loading.SegmentLoadingException; import org.apache.druid.segment.loading.URIDataPuller; @@ -81,20 +82,14 @@ public class GoogleDataSegmentPuller implements URIDataPuller @Override public InputStream getInputStream(URI uri) throws IOException { - String path = uri.getPath(); - if (path.startsWith("/")) { - path = path.substring(1); - } + String path = StringUtils.maybeRemoveLeadingSlash(uri.getPath()); return storage.get(uri.getHost(), path); } @Override public String getVersion(URI uri) throws IOException { - String path = uri.getPath(); - if (path.startsWith("/")) { - path = path.substring(1); - } + String path = StringUtils.maybeRemoveLeadingSlash(uri.getPath()); return storage.version(uri.getHost(), path); } diff --git a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java index 1dd12ef2388..4acd9e05069 100644 --- a/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java +++ b/extensions-core/google-extensions/src/main/java/org/apache/druid/storage/google/GoogleUtils.java @@ -23,7 +23,6 @@ import com.google.api.client.http.HttpResponseException; import com.google.common.base.Predicate; import java.io.IOException; -import java.net.URI; public class GoogleUtils { @@ -36,10 +35,5 @@ public class GoogleUtils return t instanceof IOException; } - public static String extractGoogleCloudStorageObjectKey(URI uri) - { - return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath(); - } - public static final Predicate GOOGLE_RETRY = e -> isRetryable(e); } diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java index 9a3786ed3dc..cd75d6c2a72 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputEntity.java @@ -22,7 +22,6 @@ package org.apache.druid.inputsource.hdfs; import com.google.common.base.Predicate; import org.apache.druid.data.input.RetryingInputEntity; import org.apache.druid.storage.hdfs.HdfsDataSegmentPuller; -import org.apache.druid.utils.CompressionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -32,7 +31,7 @@ import java.io.IOException; import java.io.InputStream; import java.net.URI; -public class HdfsInputEntity implements RetryingInputEntity +public class HdfsInputEntity extends RetryingInputEntity { private final Configuration conf; private final Path path; @@ -50,12 +49,18 @@ public class HdfsInputEntity implements RetryingInputEntity } @Override - public InputStream readFrom(long offset) throws IOException + protected InputStream readFrom(long offset) throws IOException { final FileSystem fs = path.getFileSystem(conf); final FSDataInputStream inputStream = fs.open(path); inputStream.seek(offset); - return CompressionUtils.decompress(inputStream, path.getName()); + return inputStream; + } + + @Override + protected String getPath() + { + return path.getName(); } @Override diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml index 8854400b9a1..473002ccf8a 100644 --- a/extensions-core/s3-extensions/pom.xml +++ b/extensions-core/s3-extensions/pom.xml @@ -113,8 +113,19 @@ aws-java-sdk-s3 provided - + + joda-time + joda-time + provided + + + org.apache.druid + druid-core + ${project.parent.version} + test-jar + test + org.apache.druid druid-server diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java new file mode 100644 index 00000000000..00efaaa0691 --- /dev/null +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3Entity.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.s3; + +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.google.common.base.Predicate; +import org.apache.druid.data.input.RetryingInputEntity; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.storage.s3.S3Utils; +import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; + +public class S3Entity extends RetryingInputEntity +{ + private final ServerSideEncryptingAmazonS3 s3Client; + private final CloudObjectLocation object; + + S3Entity(ServerSideEncryptingAmazonS3 s3Client, CloudObjectLocation coords) + { + this.s3Client = s3Client; + this.object = coords; + } + + @Override + public URI getUri() + { + return null; + } + + @Override + protected InputStream readFrom(long offset) throws IOException + { + final GetObjectRequest request = new GetObjectRequest(object.getBucket(), object.getPath()); + request.setRange(offset); + try { + final S3Object s3Object = s3Client.getObject(request); + if (s3Object == null) { + throw new ISE( + "Failed to get an s3 object for bucket[%s], key[%s], and start[%d]", + object.getBucket(), + object.getPath(), + offset + ); + } + return s3Object.getObjectContent(); + } + catch (AmazonS3Exception e) { + throw new IOException(e); + } + } + + @Override + protected String getPath() + { + return object.getPath(); + } + + @Override + public Predicate getRetryCondition() + { + return S3Utils.S3RETRY; + } +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java new file mode 100644 index 00000000000..0080ad8b7c6 --- /dev/null +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSource.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.s3; + +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.data.input.impl.InputEntityIteratingReader; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.storage.s3.S3Utils; +import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; +import org.apache.druid.utils.CollectionUtils; + +import javax.annotation.Nullable; +import java.io.File; +import java.net.URI; +import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class S3InputSource extends AbstractInputSource implements SplittableInputSource +{ + private static final int MAX_LISTING_LENGTH = 1024; + + private final ServerSideEncryptingAmazonS3 s3Client; + private final List uris; + private final List prefixes; + private final List objects; + + @JsonCreator + public S3InputSource( + @JacksonInject ServerSideEncryptingAmazonS3 s3Client, + @JsonProperty("uris") @Nullable List uris, + @JsonProperty("prefixes") @Nullable List prefixes, + @JsonProperty("objects") @Nullable List objects + ) + { + this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client"); + this.uris = uris; + this.prefixes = prefixes; + this.objects = objects; + + if (!CollectionUtils.isNullOrEmpty(objects)) { + throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(uris) || !CollectionUtils.isNullOrEmpty(prefixes)); + } else if (!CollectionUtils.isNullOrEmpty(uris)) { + throwIfIllegalArgs(!CollectionUtils.isNullOrEmpty(prefixes)); + uris.forEach(S3Utils::checkURI); + } else if (!CollectionUtils.isNullOrEmpty(prefixes)) { + prefixes.forEach(S3Utils::checkURI); + } else { + throwIfIllegalArgs(true); + } + } + + @JsonProperty + public List getUris() + { + return uris; + } + + @JsonProperty + public List getPrefixes() + { + return prefixes; + } + + @JsonProperty + public List getObjects() + { + return objects; + } + + @Override + public Stream> createSplits( + InputFormat inputFormat, + @Nullable SplitHintSpec splitHintSpec + ) + { + if (objects != null) { + return objects.stream().map(InputSplit::new); + } + + if (uris != null) { + return uris.stream().map(CloudObjectLocation::new).map(InputSplit::new); + } + + return StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false) + .map(S3Utils::summaryToCloudObjectLocation) + .map(InputSplit::new); + } + + @Override + public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) + { + if (objects != null) { + return objects.size(); + } + + if (uris != null) { + return uris.size(); + } + + return (int) StreamSupport.stream(getIterableObjectsFromPrefixes().spliterator(), false).count(); + } + + @Override + public SplittableInputSource withSplit(InputSplit split) + { + return new S3InputSource(s3Client, null, null, ImmutableList.of(split.get())); + } + + @Override + public boolean needsFormat() + { + return true; + } + + @Override + protected InputSourceReader formattableReader( + InputRowSchema inputRowSchema, + InputFormat inputFormat, + @Nullable File temporaryDirectory + ) + { + return new InputEntityIteratingReader( + inputRowSchema, + inputFormat, + // formattableReader() is supposed to be called in each task that actually creates segments. + // The task should already have only one split in parallel indexing, + // while there's no need to make splits using splitHintSpec in sequential indexing. + createSplits(inputFormat, null).map(split -> new S3Entity(s3Client, split.get())), + temporaryDirectory + ); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + S3InputSource that = (S3InputSource) o; + return Objects.equals(uris, that.uris) && + Objects.equals(prefixes, that.prefixes) && + Objects.equals(objects, that.objects); + } + + @Override + public int hashCode() + { + return Objects.hash(uris, prefixes, objects); + } + + @Override + public String toString() + { + return "S3InputSource{" + + "uris=" + uris + + ", prefixes=" + prefixes + + ", objects=" + objects + + '}'; + } + + private Iterable getIterableObjectsFromPrefixes() + { + return () -> S3Utils.lazyFetchingObjectSummariesIterator(s3Client, prefixes.iterator(), MAX_LISTING_LENGTH); + } + + private void throwIfIllegalArgs(boolean clause) throws IllegalArgumentException + { + if (clause) { + throw new IllegalArgumentException("exactly one of either uris or prefixes or objects must be specified"); + } + } +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java new file mode 100644 index 00000000000..241c7c16409 --- /dev/null +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/data/input/s3/S3InputSourceDruidModule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.s3; + +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.storage.s3.S3StorageDruidModule; + +import java.util.List; + +/** + * Druid module to wire up native batch support for S3 input + */ +public class S3InputSourceDruidModule implements DruidModule +{ + @Override + public List getJacksonModules() + { + return ImmutableList.of( + new SimpleModule().registerSubtypes(new NamedType(S3InputSource.class, S3StorageDruidModule.SCHEME)) + ); + } + + @Override + public void configure(Binder binder) + { + + } +} diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactory.java index 1c693778eec..e202876f8ba 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactory.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactory.java @@ -37,7 +37,6 @@ import org.apache.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFa import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -68,7 +67,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor @JsonCreator public StaticS3FirehoseFactory( - @JacksonInject("s3Client") ServerSideEncryptingAmazonS3 s3Client, + @JacksonInject ServerSideEncryptingAmazonS3 s3Client, @JsonProperty("uris") List uris, @JsonProperty("prefixes") List prefixes, @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, @@ -115,8 +114,6 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor @Override protected Collection initObjects() throws IOException { - // Here, the returned s3 objects contain minimal information without data. - // Getting data is deferred until openObjectStream() is called for each object. if (!uris.isEmpty()) { return uris; } else { @@ -128,8 +125,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor try { final Iterator objectSummaryIterator = S3Utils.objectSummaryIterator( s3Client, - bucket, - prefix, + uri, MAX_LISTING_LENGTH ); objects.addAll(Lists.newArrayList(objectSummaryIterator)); @@ -164,7 +160,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor } } } - return objects.stream().map(StaticS3FirehoseFactory::toUri).collect(Collectors.toList()); + return objects.stream().map(S3Utils::summaryToUri).collect(Collectors.toList()); } } @@ -273,23 +269,4 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor getMaxFetchRetry() ); } - - /** - * Create an {@link URI} from the given {@link S3ObjectSummary}. The result URI is composed as below. - * - *
-   * {@code s3://{BUCKET_NAME}/{OBJECT_KEY}}
-   * 
- */ - private static URI toUri(S3ObjectSummary object) - { - final String originalAuthority = object.getBucketName(); - final String originalPath = object.getKey(); - final String authority = originalAuthority.endsWith("/") ? - originalAuthority.substring(0, originalAuthority.length() - 1) : - originalAuthority; - final String path = originalPath.startsWith("/") ? originalPath.substring(1) : originalPath; - - return URI.create(StringUtils.format("s3://%s/%s", authority, path)); - } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java index b8ec0656ed7..9c413e7ec4d 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3DataSegmentPuller.java @@ -28,6 +28,7 @@ import com.google.common.base.Strings; import com.google.common.io.ByteSource; import com.google.common.io.Files; import com.google.inject.Inject; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IOE; @@ -55,13 +56,9 @@ import java.net.URI; */ public class S3DataSegmentPuller implements URIDataPuller { - public static final int DEFAULT_RETRY_COUNT = 3; - - public static final String SCHEME = S3StorageDruidModule.SCHEME; - private static final Logger log = new Logger(S3DataSegmentPuller.class); - protected static final String BUCKET = "bucket"; + static final String BUCKET = "bucket"; protected static final String KEY = "key"; protected final ServerSideEncryptingAmazonS3 s3Client; @@ -72,7 +69,7 @@ public class S3DataSegmentPuller implements URIDataPuller this.s3Client = s3Client; } - FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir) throws SegmentLoadingException + FileUtils.FileCopyResult getSegmentFiles(final CloudObjectLocation s3Coords, final File outDir) throws SegmentLoadingException { log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir); @@ -84,7 +81,7 @@ public class S3DataSegmentPuller implements URIDataPuller try { org.apache.commons.io.FileUtils.forceMkdir(outDir); - final URI uri = URI.create(StringUtils.format("s3://%s/%s", s3Coords.bucket, s3Coords.path)); + final URI uri = s3Coords.toUri(S3StorageDruidModule.SCHEME); final ByteSource byteSource = new ByteSource() { @Override @@ -103,7 +100,7 @@ public class S3DataSegmentPuller implements URIDataPuller } } }; - if (CompressionUtils.isZip(s3Coords.path)) { + if (CompressionUtils.isZip(s3Coords.getPath())) { final FileUtils.FileCopyResult result = CompressionUtils.unzip( byteSource, outDir, @@ -113,7 +110,7 @@ public class S3DataSegmentPuller implements URIDataPuller log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath()); return result; } - if (CompressionUtils.isGz(s3Coords.path)) { + if (CompressionUtils.isGz(s3Coords.getPath())) { final String fname = Files.getNameWithoutExtension(uri.getPath()); final File outFile = new File(outDir, fname); @@ -139,16 +136,6 @@ public class S3DataSegmentPuller implements URIDataPuller } } - public static URI checkURI(URI uri) - { - if (uri.getScheme().equalsIgnoreCase(SCHEME)) { - uri = URI.create("s3" + uri.toString().substring(SCHEME.length())); - } else if (!"s3".equalsIgnoreCase(uri.getScheme())) { - throw new IAE("Don't know how to load scheme for URI [%s]", uri.toString()); - } - return uri; - } - @Override public InputStream getInputStream(URI uri) throws IOException { @@ -162,8 +149,9 @@ public class S3DataSegmentPuller implements URIDataPuller private FileObject buildFileObject(final URI uri) throws AmazonServiceException { - final S3Coords coords = new S3Coords(checkURI(uri)); - final S3ObjectSummary objectSummary = S3Utils.getSingleObjectSummary(s3Client, coords.bucket, coords.path); + final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri)); + final S3ObjectSummary objectSummary = + S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath()); final String path = uri.getPath(); return new FileObject() @@ -289,8 +277,9 @@ public class S3DataSegmentPuller implements URIDataPuller public String getVersion(URI uri) throws IOException { try { - final S3Coords coords = new S3Coords(checkURI(uri)); - final S3ObjectSummary objectSummary = S3Utils.getSingleObjectSummary(s3Client, coords.bucket, coords.path); + final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri)); + final S3ObjectSummary objectSummary = + S3Utils.getSingleObjectSummary(s3Client, coords.getBucket(), coords.getPath()); return StringUtils.format("%d", objectSummary.getLastModified().getTime()); } catch (AmazonServiceException e) { @@ -303,11 +292,11 @@ public class S3DataSegmentPuller implements URIDataPuller } } - private boolean isObjectInBucket(final S3Coords coords) throws SegmentLoadingException + private boolean isObjectInBucket(final CloudObjectLocation coords) throws SegmentLoadingException { try { return S3Utils.retryS3Operation( - () -> S3Utils.isObjectInBucketIgnoringPermission(s3Client, coords.bucket, coords.path) + () -> S3Utils.isObjectInBucketIgnoringPermission(s3Client, coords.getBucket(), coords.getPath()) ); } catch (AmazonS3Exception | IOException e) { @@ -317,35 +306,4 @@ public class S3DataSegmentPuller implements URIDataPuller throw new RuntimeException(e); } } - - protected static class S3Coords - { - String bucket; - String path; - - public S3Coords(URI uri) - { - if (!"s3".equalsIgnoreCase(uri.getScheme())) { - throw new IAE("Unsupported scheme: [%s]", uri.getScheme()); - } - bucket = uri.getHost(); - String path = uri.getPath(); - if (path.startsWith("/")) { - path = path.substring(1); - } - this.path = path; - } - - public S3Coords(String bucket, String key) - { - this.bucket = bucket; - this.path = key; - } - - @Override - public String toString() - { - return StringUtils.format("s3://%s/%s", bucket, path); - } - } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java index 19e3b69ea24..5338abfe414 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3LoadSpec.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.segment.loading.LoadSpec; import org.apache.druid.segment.loading.SegmentLoadingException; @@ -32,7 +33,7 @@ import java.io.File; /** * */ -@JsonTypeName(S3StorageDruidModule.SCHEME) +@JsonTypeName(S3StorageDruidModule.SCHEME_S3_ZIP) public class S3LoadSpec implements LoadSpec { private final String bucket; @@ -57,7 +58,7 @@ public class S3LoadSpec implements LoadSpec @Override public LoadSpecResult loadSegment(File outDir) throws SegmentLoadingException { - return new LoadSpecResult(puller.getSegmentFiles(new S3DataSegmentPuller.S3Coords(bucket, key), outDir).size()); + return new LoadSpecResult(puller.getSegmentFiles(new CloudObjectLocation(bucket, key), outDir).size()); } @JsonProperty(S3DataSegmentPuller.BUCKET) diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java index 2daaadbe7e2..8cc6c250b54 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3StorageDruidModule.java @@ -54,7 +54,9 @@ import java.util.List; */ public class S3StorageDruidModule implements DruidModule { - public static final String SCHEME = "s3_zip"; + public static final String SCHEME = "s3"; + public static final String SCHEME_S3N = "s3n"; + public static final String SCHEME_S3_ZIP = "s3_zip"; private static final Logger log = new Logger(S3StorageDruidModule.class); @@ -139,27 +141,27 @@ public class S3StorageDruidModule implements DruidModule public void configure(Binder binder) { MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class) - .addBinding("s3") + .addBinding(SCHEME) .to(S3TimestampVersionedDataFinder.class) .in(LazySingleton.class); MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class) - .addBinding("s3n") + .addBinding(SCHEME_S3N) .to(S3TimestampVersionedDataFinder.class) .in(LazySingleton.class); - Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME).to(S3DataSegmentKiller.class).in(LazySingleton.class); - Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME).to(S3DataSegmentMover.class).in(LazySingleton.class); + Binders.dataSegmentKillerBinder(binder).addBinding(SCHEME_S3_ZIP).to(S3DataSegmentKiller.class).in(LazySingleton.class); + Binders.dataSegmentMoverBinder(binder).addBinding(SCHEME_S3_ZIP).to(S3DataSegmentMover.class).in(LazySingleton.class); Binders.dataSegmentArchiverBinder(binder) - .addBinding(SCHEME) + .addBinding(SCHEME_S3_ZIP) .to(S3DataSegmentArchiver.class) .in(LazySingleton.class); - Binders.dataSegmentPusherBinder(binder).addBinding("s3").to(S3DataSegmentPusher.class).in(LazySingleton.class); + Binders.dataSegmentPusherBinder(binder).addBinding(SCHEME).to(S3DataSegmentPusher.class).in(LazySingleton.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class); JsonConfigProvider.bind(binder, "druid.storage", S3StorageConfig.class); JsonConfigProvider.bind(binder, "druid.storage.sse.kms", S3SSEKmsConfig.class); JsonConfigProvider.bind(binder, "druid.storage.sse.custom", S3SSECustomConfig.class); - Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class); + Binders.taskLogsBinder(binder).addBinding(SCHEME).to(S3TaskLogs.class); JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class); binder.bind(S3TaskLogs.class).in(LazySingleton.class); } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinder.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinder.java index 5ef94c0ced8..075d94bb464 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinder.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3TimestampVersionedDataFinder.java @@ -22,7 +22,7 @@ package org.apache.druid.storage.s3; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.inject.Inject; import org.apache.druid.data.SearchableVersionedDataFinder; -import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; @@ -57,39 +57,31 @@ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implemen public URI getLatestVersion(final URI uri, final @Nullable Pattern pattern) { try { - return RetryUtils.retry( - () -> { - final S3Coords coords = new S3Coords(checkURI(uri)); - long mostRecent = Long.MIN_VALUE; - URI latest = null; - final Iterator objectSummaryIterator = S3Utils.objectSummaryIterator( - s3Client, - coords.bucket, - coords.path, - MAX_LISTING_KEYS - ); - while (objectSummaryIterator.hasNext()) { - final S3ObjectSummary objectSummary = objectSummaryIterator.next(); - String keyString = objectSummary.getKey().substring(coords.path.length()); - if (keyString.startsWith("/")) { - keyString = keyString.substring(1); - } - if (pattern != null && !pattern.matcher(keyString).matches()) { - continue; - } - final long latestModified = objectSummary.getLastModified().getTime(); - if (latestModified >= mostRecent) { - mostRecent = latestModified; - latest = new URI( - StringUtils.format("s3://%s/%s", objectSummary.getBucketName(), objectSummary.getKey()) - ); - } - } - return latest; - }, - shouldRetryPredicate(), - DEFAULT_RETRY_COUNT + final CloudObjectLocation coords = new CloudObjectLocation(S3Utils.checkURI(uri)); + long mostRecent = Long.MIN_VALUE; + URI latest = null; + final Iterator objectSummaryIterator = S3Utils.objectSummaryIterator( + s3Client, + uri, + MAX_LISTING_KEYS ); + while (objectSummaryIterator.hasNext()) { + final S3ObjectSummary objectSummary = objectSummaryIterator.next(); + final CloudObjectLocation objectLocation = S3Utils.summaryToCloudObjectLocation(objectSummary); + // remove coords path prefix from object path + String keyString = StringUtils.maybeRemoveLeadingSlash( + objectLocation.getPath().substring(coords.getPath().length()) + ); + if (pattern != null && !pattern.matcher(keyString).matches()) { + continue; + } + final long latestModified = objectSummary.getLastModified().getTime(); + if (latestModified >= mostRecent) { + mostRecent = latestModified; + latest = objectLocation.toUri(S3StorageDruidModule.SCHEME); + } + } + return latest; } catch (Exception e) { throw new RuntimeException(e); diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java index 2f4db984f5f..5aaa0f18770 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/S3Utils.java @@ -32,9 +32,14 @@ import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Joiner; import com.google.common.base.Predicate; +import com.google.common.collect.Iterators; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.RE; import org.apache.druid.java.util.common.RetryUtils; import org.apache.druid.java.util.common.RetryUtils.Task; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import java.io.File; @@ -48,15 +53,18 @@ import java.util.NoSuchElementException; */ public class S3Utils { + private static final String SCHEME = S3StorageDruidModule.SCHEME; private static final Joiner JOINER = Joiner.on("/").skipNulls(); private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory"; private static final Logger log = new Logger(S3Utils.class); + static boolean isServiceExceptionRecoverable(AmazonServiceException ex) { final boolean isIOException = ex.getCause() instanceof IOException; final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode()); - return isIOException || isTimeout; + final boolean badStatusCode = ex.getStatusCode() == 400 || ex.getStatusCode() == 403 || ex.getStatusCode() == 404; + return !badStatusCode && (isIOException || isTimeout); } public static final Predicate S3RETRY = new Predicate() @@ -80,7 +88,7 @@ public class S3Utils * Retries S3 operations that fail due to io-related exceptions. Service-level exceptions (access denied, file not * found, etc) are not retried. */ - public static T retryS3Operation(Task f) throws Exception + static T retryS3Operation(Task f) throws Exception { return RetryUtils.retry(f, S3RETRY, RetryUtils.DEFAULT_MAX_TRIES); } @@ -106,36 +114,105 @@ public class S3Utils public static Iterator objectSummaryIterator( final ServerSideEncryptingAmazonS3 s3Client, - final String bucket, - final String prefix, + final URI prefix, final int numMaxKeys ) { - final ListObjectsV2Request request = new ListObjectsV2Request() - .withBucketName(bucket) - .withPrefix(prefix) - .withMaxKeys(numMaxKeys); + return lazyFetchingObjectSummariesIterator(s3Client, Iterators.singletonIterator(prefix), numMaxKeys); + } + /** + * Create an iterator over a set of s3 objects specified by a set of 'prefixes' which may be paths or individual + * objects, in order to get {@link S3ObjectSummary} for each discovered object. This iterator is computed lazily as it + * is iterated, calling {@link ServerSideEncryptingAmazonS3#listObjectsV2} for each prefix in batches of + * {@param maxListLength}, falling back to {@link ServerSideEncryptingAmazonS3#getObjectMetadata} if the list API + * returns a 403 status code as a fallback to check if the URI is a single object instead of a directory. These + * summaries are supplied to the outer iterator until drained, then if additional results for the current prefix are + * still available, it will continue fetching and repeat the process, else it will move on to the next prefix, + * continuing until all objects have been evaluated. + */ + public static Iterator lazyFetchingObjectSummariesIterator( + final ServerSideEncryptingAmazonS3 s3Client, + final Iterator uris, + final int maxListingLength + ) + { return new Iterator() { + private ListObjectsV2Request request; private ListObjectsV2Result result; + private URI currentUri; + private String currentBucket; + private String currentPrefix; private Iterator objectSummaryIterator; { + prepareNextRequest(); fetchNextBatch(); } + private void prepareNextRequest() + { + currentUri = uris.next(); + currentBucket = currentUri.getAuthority(); + currentPrefix = S3Utils.extractS3Key(currentUri); + + request = new ListObjectsV2Request() + .withBucketName(currentBucket) + .withPrefix(currentPrefix) + .withMaxKeys(maxListingLength); + } + private void fetchNextBatch() { - result = s3Client.listObjectsV2(request); - objectSummaryIterator = result.getObjectSummaries().iterator(); - request.setContinuationToken(result.getContinuationToken()); + try { + result = S3Utils.retryS3Operation(() -> s3Client.listObjectsV2(request)); + objectSummaryIterator = result.getObjectSummaries().iterator(); + request.setContinuationToken(result.getContinuationToken()); + } + catch (AmazonS3Exception outerException) { + log.error(outerException, "Exception while listing on %s", currentUri); + + if (outerException.getStatusCode() == 403) { + // The "Access Denied" means users might not have a proper permission for listing on the given uri. + // Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes. + // In this case, users should be able to get objects if they have a proper permission for GetObject. + + log.warn("Access denied for %s. Try to get the object from the uri without listing", currentUri); + try { + final ObjectMetadata objectMetadata = + S3Utils.retryS3Operation(() -> s3Client.getObjectMetadata(currentBucket, currentPrefix)); + + if (!S3Utils.isDirectoryPlaceholder(currentPrefix, objectMetadata)) { + // it's not a directory, so just generate an object summary + S3ObjectSummary fabricated = new S3ObjectSummary(); + fabricated.setBucketName(currentBucket); + fabricated.setKey(currentPrefix); + objectSummaryIterator = Iterators.singletonIterator(fabricated); + } else { + throw new RE( + "[%s] is a directory placeholder, " + + "but failed to get the object list under the directory due to permission", + currentUri + ); + } + } + catch (Exception innerException) { + throw new RuntimeException(innerException); + } + } else { + throw new RuntimeException(outerException); + } + } + catch (Exception ex) { + throw new RuntimeException(ex); + } } @Override public boolean hasNext() { - return objectSummaryIterator.hasNext() || result.isTruncated(); + return objectSummaryIterator.hasNext() || result.isTruncated() || uris.hasNext(); } @Override @@ -151,13 +228,16 @@ public class S3Utils if (result.isTruncated()) { fetchNextBatch(); + } else if (uris.hasNext()) { + prepareNextRequest(); + fetchNextBatch(); } if (!objectSummaryIterator.hasNext()) { throw new ISE( "Failed to further iterate on bucket[%s] and prefix[%s]. The last continuationToken was [%s]", - bucket, - prefix, + currentBucket, + currentPrefix, result.getContinuationToken() ); } @@ -167,7 +247,25 @@ public class S3Utils }; } - public static String constructSegmentPath(String baseKey, String storageDir) + + /** + * Create an {@link URI} from the given {@link S3ObjectSummary}. The result URI is composed as below. + * + *
+   * {@code s3://{BUCKET_NAME}/{OBJECT_KEY}}
+   * 
+ */ + public static URI summaryToUri(S3ObjectSummary object) + { + return summaryToCloudObjectLocation(object).toUri(SCHEME); + } + + public static CloudObjectLocation summaryToCloudObjectLocation(S3ObjectSummary object) + { + return new CloudObjectLocation(object.getBucketName(), object.getKey()); + } + + static String constructSegmentPath(String baseKey, String storageDir) { return JOINER.join( baseKey.isEmpty() ? null : baseKey, @@ -184,7 +282,17 @@ public class S3Utils public static String extractS3Key(URI uri) { - return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath(); + return StringUtils.maybeRemoveLeadingSlash(uri.getPath()); + } + + public static URI checkURI(URI uri) + { + if (uri.getScheme().equalsIgnoreCase(S3StorageDruidModule.SCHEME_S3_ZIP)) { + uri = URI.create(SCHEME + uri.toString().substring(S3StorageDruidModule.SCHEME_S3_ZIP.length())); + } else if (!SCHEME.equalsIgnoreCase(uri.getScheme())) { + throw new IAE("Invalid URI scheme [%s] must be [%s]", uri.toString(), SCHEME); + } + return uri; } // Copied from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder() @@ -254,7 +362,13 @@ public class S3Utils * @param key The key under which to store the new object. * @param file The path of the file to upload to Amazon S3. */ - public static void uploadFileIfPossible(ServerSideEncryptingAmazonS3 service, boolean disableAcl, String bucket, String key, File file) + static void uploadFileIfPossible( + ServerSideEncryptingAmazonS3 service, + boolean disableAcl, + String bucket, + String key, + File file + ) { final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, file); diff --git a/extensions-core/s3-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule b/extensions-core/s3-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule index ccd137b82bd..51968bbb97b 100644 --- a/extensions-core/s3-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule +++ b/extensions-core/s3-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule @@ -15,3 +15,4 @@ org.apache.druid.storage.s3.S3StorageDruidModule org.apache.druid.firehose.s3.S3FirehoseDruidModule +org.apache.druid.data.input.s3.S3InputSourceDruidModule diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java new file mode 100644 index 00000000000..1c1bc019912 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input.s3; + +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.Headers; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.module.guice.ObjectMapperModule; +import com.google.common.collect.ImmutableList; +import com.google.inject.Binder; +import com.google.inject.Guice; +import com.google.inject.Injector; +import com.google.inject.Provides; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.impl.CloudObjectLocation; +import org.apache.druid.data.input.impl.CsvInputFormat; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.storage.s3.NoopServerSideEncryption; +import org.apache.druid.storage.s3.S3Utils; +import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.apache.druid.utils.CompressionUtils; +import org.easymock.EasyMock; +import org.joda.time.DateTime; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class S3InputSourceTest extends InitializedNullHandlingTest +{ + private static final ObjectMapper MAPPER = createS3ObjectMapper(); + private static final AmazonS3Client S3_CLIENT = EasyMock.createNiceMock(AmazonS3Client.class); + private static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3( + S3_CLIENT, + new NoopServerSideEncryption() + ); + + private static final List EXPECTED_URIS = Arrays.asList( + URI.create("s3://foo/bar/file.csv"), + URI.create("s3://bar/foo/file2.csv") + ); + + private static final List EXPECTED_COMPRESSED_URIS = Arrays.asList( + URI.create("s3://foo/bar/file.csv.gz"), + URI.create("s3://bar/foo/file2.csv.gz") + ); + + private static final List EXPECTED_COORDS = + EXPECTED_URIS.stream().map(CloudObjectLocation::new).collect(Collectors.toList()); + + private static final List PREFIXES = Arrays.asList( + URI.create("s3://foo/bar"), + URI.create("s3://bar/foo") + ); + + private static final List EXPECTED_LOCATION = + ImmutableList.of(new CloudObjectLocation("foo", "bar/file.csv")); + + private static final DateTime NOW = DateTimes.nowUtc(); + private static final byte[] CONTENT = + StringUtils.toUtf8(StringUtils.format("%d,hello,world", NOW.getMillis())); + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testSerdeWithUris() throws Exception + { + final S3InputSource withUris = new S3InputSource(SERVICE, EXPECTED_URIS, null, null); + final S3InputSource serdeWithUris = MAPPER.readValue(MAPPER.writeValueAsString(withUris), S3InputSource.class); + Assert.assertEquals(withUris, serdeWithUris); + } + + @Test + public void testSerdeWithPrefixes() throws Exception + { + final S3InputSource withPrefixes = new S3InputSource(SERVICE, null, PREFIXES, null); + final S3InputSource serdeWithPrefixes = + MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); + Assert.assertEquals(withPrefixes, serdeWithPrefixes); + } + + @Test + public void testSerdeWithObjects() throws Exception + { + + final S3InputSource withPrefixes = new S3InputSource( + SERVICE, + null, + null, + EXPECTED_LOCATION + ); + final S3InputSource serdeWithPrefixes = + MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); + Assert.assertEquals(withPrefixes, serdeWithPrefixes); + } + + @Test + public void testSerdeWithExtraEmptyLists() throws Exception + { + final S3InputSource withPrefixes = new S3InputSource( + SERVICE, + ImmutableList.of(), + ImmutableList.of(), + EXPECTED_LOCATION + ); + final S3InputSource serdeWithPrefixes = + MAPPER.readValue(MAPPER.writeValueAsString(withPrefixes), S3InputSource.class); + Assert.assertEquals(withPrefixes, serdeWithPrefixes); + } + + @Test + public void testSerdeWithInvalidArgs() throws Exception + { + expectedException.expect(IllegalArgumentException.class); + // constructor will explode + new S3InputSource( + SERVICE, + EXPECTED_URIS, + PREFIXES, + EXPECTED_LOCATION + ); + } + + @Test + public void testSerdeWithOtherInvalidArgs() + { + expectedException.expect(IllegalArgumentException.class); + // constructor will explode + new S3InputSource( + SERVICE, + EXPECTED_URIS, + PREFIXES, + ImmutableList.of() + ); + } + + @Test + public void testSerdeWithOtherOtherInvalidArgs() + { + expectedException.expect(IllegalArgumentException.class); + // constructor will explode + new S3InputSource( + SERVICE, + ImmutableList.of(), + PREFIXES, + EXPECTED_LOCATION + ); + } + + @Test + public void testWithUrisSplit() + { + S3InputSource inputSource = new S3InputSource(SERVICE, EXPECTED_URIS, null, null); + + Stream> splits = inputSource.createSplits( + new JsonInputFormat(JSONPathSpec.DEFAULT, null), + null + ); + + Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList())); + } + + @Test + public void testWithPrefixesSplit() + { + EasyMock.reset(S3_CLIENT); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); + addExpectedPrefixObjects(PREFIXES.get(1), ImmutableList.of(EXPECTED_URIS.get(1))); + EasyMock.replay(S3_CLIENT); + + S3InputSource inputSource = new S3InputSource(SERVICE, null, PREFIXES, null); + + Stream> splits = inputSource.createSplits( + new JsonInputFormat(JSONPathSpec.DEFAULT, null), + null + ); + + Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList())); + } + + @Test + public void testWithPrefixesWhereOneIsUrisAndNoListPermissionSplit() + { + EasyMock.reset(S3_CLIENT); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); + addExpectedNonPrefixObjectsWithNoListPermission(); + EasyMock.replay(S3_CLIENT); + + S3InputSource inputSource = new S3InputSource( + SERVICE, + null, + ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), + null + ); + + Stream> splits = inputSource.createSplits( + new JsonInputFormat(JSONPathSpec.DEFAULT, null), + null + ); + + Assert.assertEquals(EXPECTED_COORDS, splits.map(InputSplit::get).collect(Collectors.toList())); + } + + @Test + public void testReader() throws IOException + { + EasyMock.reset(S3_CLIENT); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_URIS.get(0))); + addExpectedNonPrefixObjectsWithNoListPermission(); + addExpectedGetObjectMock(EXPECTED_URIS.get(0)); + addExpectedGetObjectMock(EXPECTED_URIS.get(1)); + EasyMock.replay(S3_CLIENT); + + S3InputSource inputSource = new S3InputSource( + SERVICE, + null, + ImmutableList.of(PREFIXES.get(0), EXPECTED_URIS.get(1)), + null + ); + + InputRowSchema someSchema = new InputRowSchema( + new TimestampSpec("time", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))), + ImmutableList.of("count") + ); + + InputSourceReader reader = inputSource.reader( + someSchema, + new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), + temporaryFolder.newFolder() + ); + + CloseableIterator iterator = reader.read(); + + while (iterator.hasNext()) { + InputRow nextRow = iterator.next(); + Assert.assertEquals(NOW, nextRow.getTimestamp()); + Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0)); + Assert.assertEquals("world", nextRow.getDimension("dim2").get(0)); + } + } + + @Test + public void testCompressedReader() throws IOException + { + EasyMock.reset(S3_CLIENT); + addExpectedPrefixObjects(PREFIXES.get(0), ImmutableList.of(EXPECTED_COMPRESSED_URIS.get(0))); + addExpectedNonPrefixObjectsWithNoListPermission(); + addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(0)); + addExpectedGetCompressedObjectMock(EXPECTED_COMPRESSED_URIS.get(1)); + EasyMock.replay(S3_CLIENT); + + S3InputSource inputSource = new S3InputSource( + SERVICE, + null, + ImmutableList.of(PREFIXES.get(0), EXPECTED_COMPRESSED_URIS.get(1)), + null + ); + + InputRowSchema someSchema = new InputRowSchema( + new TimestampSpec("time", "auto", null), + new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("dim1", "dim2"))), + ImmutableList.of("count") + ); + + InputSourceReader reader = inputSource.reader( + someSchema, + new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), + temporaryFolder.newFolder() + ); + + CloseableIterator iterator = reader.read(); + + while (iterator.hasNext()) { + InputRow nextRow = iterator.next(); + Assert.assertEquals(NOW, nextRow.getTimestamp()); + Assert.assertEquals("hello", nextRow.getDimension("dim1").get(0)); + Assert.assertEquals("world", nextRow.getDimension("dim2").get(0)); + } + } + + private static void addExpectedPrefixObjects(URI prefix, List uris) + { + final String s3Bucket = prefix.getAuthority(); + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.setBucketName(s3Bucket); + result.setKeyCount(1); + for (URI uri : uris) { + final String key = S3Utils.extractS3Key(uri); + final S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(s3Bucket); + objectSummary.setKey(key); + result.getObjectSummaries().add(objectSummary); + } + EasyMock.expect(S3_CLIENT.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andReturn(result).once(); + } + + private static void addExpectedNonPrefixObjectsWithNoListPermission() + { + AmazonS3Exception boom = new AmazonS3Exception("oh dang, you can't list that bucket friend"); + boom.setStatusCode(403); + EasyMock.expect(S3_CLIENT.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andThrow(boom).once(); + + ObjectMetadata metadata = new ObjectMetadata(); + metadata.setContentLength(CONTENT.length); + metadata.setContentEncoding("text/csv"); + metadata.setHeader(Headers.ETAG, "some-totally-real-etag-base64-hash-i-guess"); + EasyMock.expect(S3_CLIENT.getObjectMetadata(EasyMock.anyObject(GetObjectMetadataRequest.class))) + .andReturn(metadata) + .once(); + } + + private static void addExpectedGetObjectMock(URI uri) + { + final String s3Bucket = uri.getAuthority(); + final String key = S3Utils.extractS3Key(uri); + + S3Object someObject = new S3Object(); + someObject.setBucketName(s3Bucket); + someObject.setKey(key); + someObject.setObjectContent(new ByteArrayInputStream(CONTENT)); + EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once(); + } + + private static void addExpectedGetCompressedObjectMock(URI uri) throws IOException + { + final String s3Bucket = uri.getAuthority(); + final String key = S3Utils.extractS3Key(uri); + + S3Object someObject = new S3Object(); + someObject.setBucketName(s3Bucket); + someObject.setKey(key); + ByteArrayOutputStream gzipped = new ByteArrayOutputStream(); + CompressionUtils.gzip(new ByteArrayInputStream(CONTENT), gzipped); + someObject.setObjectContent(new ByteArrayInputStream(gzipped.toByteArray())); + EasyMock.expect(S3_CLIENT.getObject(EasyMock.anyObject(GetObjectRequest.class))).andReturn(someObject).once(); + } + + public static ObjectMapper createS3ObjectMapper() + { + DruidModule baseModule = new TestS3Module(); + final Injector injector = Guice.createInjector( + new ObjectMapperModule(), + baseModule + ); + final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class); + + baseModule.getJacksonModules().forEach(baseMapper::registerModule); + return baseMapper; + } + + public static class TestS3Module implements DruidModule + { + @Override + public List getJacksonModules() + { + // Deserializer is need for AmazonS3Client even though it is injected. + // See https://github.com/FasterXML/jackson-databind/issues/962. + return ImmutableList.of(new SimpleModule().addDeserializer(AmazonS3.class, new ItemDeserializer())); + } + + @Override + public void configure(Binder binder) + { + + } + + @Provides + public ServerSideEncryptingAmazonS3 getAmazonS3Client() + { + return SERVICE; + } + } + + public static class ItemDeserializer extends StdDeserializer + { + ItemDeserializer() + { + this(null); + } + + ItemDeserializer(Class vc) + { + super(vc); + } + + @Override + public AmazonS3 deserialize(JsonParser jp, DeserializationContext ctxt) + { + throw new UnsupportedOperationException(); + } + } +} diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java index 5f89860fbee..c809bf37040 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/firehose/s3/StaticS3FirehoseFactoryTest.java @@ -19,28 +19,12 @@ package org.apache.druid.firehose.s3; -import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; -import com.amazonaws.services.s3.model.S3ObjectSummary; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.DeserializationContext; -import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.deser.std.StdDeserializer; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.fasterxml.jackson.module.guice.ObjectMapperModule; -import com.google.common.collect.ImmutableList; -import com.google.inject.Binder; -import com.google.inject.Guice; -import com.google.inject.Injector; -import com.google.inject.Provides; import org.apache.druid.data.input.FiniteFirehoseFactory; import org.apache.druid.data.input.impl.StringInputRowParser; -import org.apache.druid.initialization.DruidModule; +import org.apache.druid.data.input.s3.S3InputSourceTest; import org.apache.druid.storage.s3.NoopServerSideEncryption; -import org.apache.druid.storage.s3.S3Utils; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.easymock.EasyMock; import org.junit.Assert; @@ -66,7 +50,7 @@ public class StaticS3FirehoseFactoryTest @Test public void testSerde() throws Exception { - final ObjectMapper mapper = createObjectMapper(new TestS3Module()); + final ObjectMapper mapper = S3InputSourceTest.createS3ObjectMapper(); final List uris = Arrays.asList( new URI("s3://foo/bar/file.gz"), @@ -101,9 +85,6 @@ public class StaticS3FirehoseFactoryTest ); uris.sort(Comparator.comparing(URI::toString)); - uris.forEach(StaticS3FirehoseFactoryTest::addExpectedObjject); - EasyMock.replay(S3_CLIENT); - final StaticS3FirehoseFactory factory = new StaticS3FirehoseFactory( SERVICE, uris, @@ -131,72 +112,4 @@ public class StaticS3FirehoseFactoryTest Assert.assertEquals(uris.get(i), subFactoryUris.get(0)); } } - - private static void addExpectedObjject(URI uri) - { - final String s3Bucket = uri.getAuthority(); - final String key = S3Utils.extractS3Key(uri); - final S3ObjectSummary objectSummary = new S3ObjectSummary(); - objectSummary.setBucketName(s3Bucket); - objectSummary.setKey(key); - final ListObjectsV2Result result = new ListObjectsV2Result(); - result.setBucketName(s3Bucket); - result.setKeyCount(1); - result.getObjectSummaries().add(objectSummary); - EasyMock.expect(SERVICE.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))).andReturn(result); - } - - private static ObjectMapper createObjectMapper(DruidModule baseModule) - { - final Injector injector = Guice.createInjector( - new ObjectMapperModule(), - baseModule - ); - final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class); - - baseModule.getJacksonModules().forEach(baseMapper::registerModule); - return baseMapper; - } - - private static class TestS3Module implements DruidModule - { - @Override - public List getJacksonModules() - { - // Deserializer is need for AmazonS3Client even though it is injected. - // See https://github.com/FasterXML/jackson-databind/issues/962. - return ImmutableList.of(new SimpleModule().addDeserializer(AmazonS3.class, new ItemDeserializer())); - } - - @Override - public void configure(Binder binder) - { - - } - - @Provides - public ServerSideEncryptingAmazonS3 getAmazonS3Client() - { - return SERVICE; - } - } - - public static class ItemDeserializer extends StdDeserializer - { - public ItemDeserializer() - { - this(null); - } - - public ItemDeserializer(Class vc) - { - super(vc); - } - - @Override - public AmazonS3 deserialize(JsonParser jp, DeserializationContext ctxt) - { - throw new UnsupportedOperationException(); - } - } } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentArchiverTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentArchiverTest.java index 660ef35552b..f2531d4e0fc 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentArchiverTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentArchiverTest.java @@ -85,7 +85,7 @@ public class S3DataSegmentArchiverTest .version("version") .loadSpec(ImmutableMap.of( "type", - S3StorageDruidModule.SCHEME, + S3StorageDruidModule.SCHEME_S3_ZIP, S3DataSegmentPuller.BUCKET, "source_bucket", S3DataSegmentPuller.KEY, @@ -107,7 +107,7 @@ public class S3DataSegmentArchiverTest final DataSegment archivedSegment = SOURCE_SEGMENT .withLoadSpec(ImmutableMap.of( "type", - S3StorageDruidModule.SCHEME, + S3StorageDruidModule.SCHEME_S3_ZIP, S3DataSegmentPuller.BUCKET, ARCHIVER_CONFIG.getArchiveBucket(), S3DataSegmentPuller.KEY, @@ -144,7 +144,7 @@ public class S3DataSegmentArchiverTest final DataSegment archivedSegment = SOURCE_SEGMENT .withLoadSpec(ImmutableMap.of( "type", - S3StorageDruidModule.SCHEME, + S3StorageDruidModule.SCHEME_S3_ZIP, S3DataSegmentPuller.BUCKET, ARCHIVER_CONFIG.getArchiveBucket(), S3DataSegmentPuller.KEY, diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java index a4306d12b97..914785d1ce7 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3DataSegmentPullerTest.java @@ -24,6 +24,7 @@ import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.java.util.common.FileUtils; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.segment.loading.SegmentLoadingException; @@ -124,7 +125,7 @@ public class S3DataSegmentPullerTest EasyMock.replay(s3Client); FileUtils.FileCopyResult result = puller.getSegmentFiles( - new S3DataSegmentPuller.S3Coords( + new CloudObjectLocation( bucket, object0.getKey() ), tmpDir @@ -191,7 +192,7 @@ public class S3DataSegmentPullerTest EasyMock.replay(s3Client); FileUtils.FileCopyResult result = puller.getSegmentFiles( - new S3DataSegmentPuller.S3Coords( + new CloudObjectLocation( bucket, object0.getKey() ), tmpDir