From 1ad898bde29820a8ff41267457a4370395406ae5 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 21 Mar 2018 15:36:54 -0700 Subject: [PATCH] Use the official aws-sdk instead of jet3t (#5382) * Use the official aws-sdk instead of jet3t * fix compile and serde tests * address comments and fix test * add http version string * remove redundant dependencies, fix potential NPE, and fix test * resolve TODOs * fix build * downgrade jackson version to 2.6.7 * fix test * resolve the last TODO * support proxy and endpoint configurations * fix build * remove debugging log * downgrade hadoop version to 2.8.3 * fix tests * remove unused log * fix it test * revert KerberosAuthenticator change * change hadoop-aws scope to provided in hdfs-storage * address comments * address comments --- .travis.yml | 1 + .../io/druid/guice/JsonConfigProvider.java | 4 +- .../input/impl/InputRowParserSerdeTest.java | 8 +- .../data/input/impl/JSONParseSpecTest.java | 4 +- .../input/impl/JavaScriptParseSpecTest.java | 4 +- .../data/input/impl/RegexParseSpecTest.java | 4 +- aws-common/pom.xml | 2 +- .../druid/common/aws/AWSEndpointConfig.java | 52 ++++ .../io/druid/common/aws/AWSProxyConfig.java | 61 +++++ common/pom.xml | 16 +- .../content/development/extensions-core/s3.md | 18 +- .../kerberos/KerberosAuthenticator.java | 25 +- extensions-core/hdfs-storage/pom.xml | 11 +- .../storage/hdfs/HdfsDataSegmentFinder.java | 2 +- extensions-core/s3-extensions/pom.xml | 137 ++++++----- .../firehose/s3/StaticS3FirehoseFactory.java | 127 +++++----- .../s3/AWSSessionCredentialsAdapter.java | 70 ------ .../storage/s3/S3DataSegmentArchiver.java | 4 +- .../druid/storage/s3/S3DataSegmentFinder.java | 48 ++-- .../druid/storage/s3/S3DataSegmentKiller.java | 14 +- .../druid/storage/s3/S3DataSegmentMover.java | 70 +++--- .../druid/storage/s3/S3DataSegmentPuller.java | 73 +++--- .../druid/storage/s3/S3DataSegmentPusher.java | 86 +++---- .../storage/s3/S3StorageDruidModule.java | 58 ++++- .../java/io/druid/storage/s3/S3TaskLogs.java | 50 ++-- .../s3/S3TimestampVersionedDataFinder.java | 31 ++- .../java/io/druid/storage/s3/S3Utils.java | 223 +++++++++++------- .../s3/StaticS3FirehoseFactoryTest.java | 42 +++- .../storage/s3/S3DataSegmentArchiverTest.java | 4 +- .../storage/s3/S3DataSegmentFinderTest.java | 209 ++++++++-------- .../storage/s3/S3DataSegmentMoverTest.java | 133 ++++++++--- .../storage/s3/S3DataSegmentPullerTest.java | 101 +++++--- .../storage/s3/S3DataSegmentPusherTest.java | 40 +++- .../S3TimestampVersionedDataFinderTest.java | 89 ++++--- .../s3/TestAWSCredentialsProvider.java | 6 +- indexing-hadoop/pom.xml | 32 ++- .../indexer/DetermineHashedPartitionsJob.java | 2 +- .../indexing/common/config/TaskConfig.java | 2 +- .../indexing/common/task/HadoopTask.java | 2 - .../autoscaling/EC2AutoScalerSerdeTest.java | 6 +- .../JavaScriptWorkerSelectStrategyTest.java | 4 +- pom.xml | 93 +------- .../groupby/orderby/DefaultLimitSpecTest.java | 30 +-- .../topn/AlphaNumericTopNMetricSpecTest.java | 4 +- .../topn/DimensionTopNMetricSpecTest.java | 16 +- server/pom.xml | 4 + .../main/java/io/druid/guice/AWSModule.java | 4 + .../firehose/HttpFirehoseFactory.java | 2 +- .../server/AsyncQueryForwardingServlet.java | 5 +- .../jetty/JettyServerModule.java | 4 +- .../dimension/LookupDimensionSpecTest.java | 4 +- 51 files changed, 1131 insertions(+), 910 deletions(-) create mode 100644 aws-common/src/main/java/io/druid/common/aws/AWSEndpointConfig.java create mode 100644 aws-common/src/main/java/io/druid/common/aws/AWSProxyConfig.java delete mode 100644 extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/AWSSessionCredentialsAdapter.java diff --git a/.travis.yml b/.travis.yml index 4fb64a7bf9a..4ec30507b4f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -47,6 +47,7 @@ matrix: - sudo: false env: - NAME="other modules test" + - AWS_REGION=us-east-1 # set a aws region for unit tests install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B before_script: - unset _JAVA_OPTIONS diff --git a/api/src/main/java/io/druid/guice/JsonConfigProvider.java b/api/src/main/java/io/druid/guice/JsonConfigProvider.java index c3a9cfd64d8..609567b2dd7 100644 --- a/api/src/main/java/io/druid/guice/JsonConfigProvider.java +++ b/api/src/main/java/io/druid/guice/JsonConfigProvider.java @@ -130,8 +130,8 @@ public class JsonConfigProvider implements Provider> Key> supplierKey ) { - binder.bind(supplierKey).toProvider((Provider) of(propertyBase, clazz)).in(LazySingleton.class); - binder.bind(instanceKey).toProvider(new SupplierProvider(supplierKey)); + binder.bind(supplierKey).toProvider(of(propertyBase, clazz)).in(LazySingleton.class); + binder.bind(instanceKey).toProvider(new SupplierProvider<>(supplierKey)); } @SuppressWarnings("unchecked") diff --git a/api/src/test/java/io/druid/data/input/impl/InputRowParserSerdeTest.java b/api/src/test/java/io/druid/data/input/impl/InputRowParserSerdeTest.java index e8169628344..c7cb2df6340 100644 --- a/api/src/test/java/io/druid/data/input/impl/InputRowParserSerdeTest.java +++ b/api/src/test/java/io/druid/data/input/impl/InputRowParserSerdeTest.java @@ -101,9 +101,9 @@ public class InputRowParserSerdeTest null ) ); - final MapInputRowParser parser2 = jsonMapper.readValue( + final MapInputRowParser parser2 = (MapInputRowParser) jsonMapper.readValue( jsonMapper.writeValueAsBytes(parser), - MapInputRowParser.class + InputRowParser.class ); final InputRow parsed = parser2.parseBatch( ImmutableMap.of( @@ -134,9 +134,9 @@ public class InputRowParserSerdeTest null ) ); - final MapInputRowParser parser2 = jsonMapper.readValue( + final MapInputRowParser parser2 = (MapInputRowParser) jsonMapper.readValue( jsonMapper.writeValueAsBytes(parser), - MapInputRowParser.class + InputRowParser.class ); final InputRow parsed = parser2.parseBatch( ImmutableMap.of( diff --git a/api/src/test/java/io/druid/data/input/impl/JSONParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/JSONParseSpecTest.java index de2814eda9a..c7c73802f73 100644 --- a/api/src/test/java/io/druid/data/input/impl/JSONParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/JSONParseSpecTest.java @@ -91,9 +91,9 @@ public class JSONParseSpecTest feature ); - final JSONParseSpec serde = jsonMapper.readValue( + final JSONParseSpec serde = (JSONParseSpec) jsonMapper.readValue( jsonMapper.writeValueAsString(spec), - JSONParseSpec.class + ParseSpec.class ); Assert.assertEquals("timestamp", serde.getTimestampSpec().getTimestampColumn()); Assert.assertEquals("iso", serde.getTimestampSpec().getTimestampFormat()); diff --git a/api/src/test/java/io/druid/data/input/impl/JavaScriptParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/JavaScriptParseSpecTest.java index 805e019b1b0..b63caf43ff3 100644 --- a/api/src/test/java/io/druid/data/input/impl/JavaScriptParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/JavaScriptParseSpecTest.java @@ -58,9 +58,9 @@ public class JavaScriptParseSpecTest "abc", JavaScriptConfig.getEnabledInstance() ); - final JavaScriptParseSpec serde = jsonMapper.readValue( + final JavaScriptParseSpec serde = (JavaScriptParseSpec) jsonMapper.readValue( jsonMapper.writeValueAsString(spec), - JavaScriptParseSpec.class + ParseSpec.class ); Assert.assertEquals("abc", serde.getTimestampSpec().getTimestampColumn()); Assert.assertEquals("iso", serde.getTimestampSpec().getTimestampFormat()); diff --git a/api/src/test/java/io/druid/data/input/impl/RegexParseSpecTest.java b/api/src/test/java/io/druid/data/input/impl/RegexParseSpecTest.java index 68930ea6269..5468ae0302f 100644 --- a/api/src/test/java/io/druid/data/input/impl/RegexParseSpecTest.java +++ b/api/src/test/java/io/druid/data/input/impl/RegexParseSpecTest.java @@ -43,9 +43,9 @@ public class RegexParseSpecTest Collections.singletonList("abc"), "abc" ); - final RegexParseSpec serde = jsonMapper.readValue( + final RegexParseSpec serde = (RegexParseSpec) jsonMapper.readValue( jsonMapper.writeValueAsString(spec), - RegexParseSpec.class + ParseSpec.class ); Assert.assertEquals("abc", serde.getTimestampSpec().getTimestampColumn()); Assert.assertEquals("iso", serde.getTimestampSpec().getTimestampFormat()); diff --git a/aws-common/pom.xml b/aws-common/pom.xml index 9ce40e3951e..c6e69b099f6 100644 --- a/aws-common/pom.xml +++ b/aws-common/pom.xml @@ -37,7 +37,7 @@ com.amazonaws - aws-java-sdk-ec2 + aws-java-sdk-bundle diff --git a/aws-common/src/main/java/io/druid/common/aws/AWSEndpointConfig.java b/aws-common/src/main/java/io/druid/common/aws/AWSEndpointConfig.java new file mode 100644 index 00000000000..773a2ab1501 --- /dev/null +++ b/aws-common/src/main/java/io/druid/common/aws/AWSEndpointConfig.java @@ -0,0 +1,52 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.common.aws; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class AWSEndpointConfig +{ + @JsonProperty + private String url; + + @JsonProperty + private String serviceName; + + @JsonProperty + private String signingRegion; + + @JsonProperty + public String getUrl() + { + return url; + } + + @JsonProperty + public String getServiceName() + { + return serviceName; + } + + @JsonProperty + public String getSigningRegion() + { + return signingRegion; + } +} diff --git a/aws-common/src/main/java/io/druid/common/aws/AWSProxyConfig.java b/aws-common/src/main/java/io/druid/common/aws/AWSProxyConfig.java new file mode 100644 index 00000000000..eda04bb3715 --- /dev/null +++ b/aws-common/src/main/java/io/druid/common/aws/AWSProxyConfig.java @@ -0,0 +1,61 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.common.aws; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class AWSProxyConfig +{ + @JsonProperty + private String host; + + @JsonProperty + private int port = -1; // AWS's default proxy port is -1 + + @JsonProperty + private String username; + + @JsonProperty + private String password; + + @JsonProperty + public String getHost() + { + return host; + } + + @JsonProperty + public int getPort() + { + return port; + } + + @JsonProperty + public String getUsername() + { + return username; + } + + @JsonProperty + public String getPassword() + { + return password; + } +} diff --git a/common/pom.xml b/common/pom.xml index 234f21af5bb..0f07a4105ab 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -157,21 +157,7 @@ com.lmax disruptor - - - net.java.dev.jets3t - jets3t - 0.9.4 - + org.antlr antlr4-runtime diff --git a/docs/content/development/extensions-core/s3.md b/docs/content/development/extensions-core/s3.md index f24a406946b..3d32c9de895 100644 --- a/docs/content/development/extensions-core/s3.md +++ b/docs/content/development/extensions-core/s3.md @@ -12,12 +12,18 @@ S3-compatible deep storage is basically either S3 or something like Google Stora ### Configuration -|Property|Possible Values|Description|Default| -|--------|---------------|-----------|-------| -|`druid.s3.accessKey`||S3 access key.|Must be set.| -|`druid.s3.secretKey`||S3 secret key.|Must be set.| -|`druid.storage.bucket`||Bucket to store in.|Must be set.| -|`druid.storage.baseKey`||Base key prefix to use, i.e. what directory.|Must be set.| +|Property|Description|Default| +|--------|-----------|-------| +|`druid.s3.accessKey`|S3 access key.|Must be set.| +|`druid.s3.secretKey`|S3 secret key.|Must be set.| +|`druid.storage.bucket`|Bucket to store in.|Must be set.| +|`druid.storage.baseKey`|Base key prefix to use, i.e. what directory.|Must be set.| +|`druid.s3.endpoint.url`|Service endpoint either with or without the protocol.|None| +|`druid.s3.endpoint.signingRegion`|Region to use for SigV4 signing of requests (e.g. us-west-1).|None| +|`druid.s3.proxy.host`|Proxy host to connect through.|None| +|`druid.s3.proxy.port`|Port on the proxy host to connect through.|None| +|`druid.s3.proxy.username`|User name to use when connecting through a proxy.|None| +|`druid.s3.proxy.password`|Password to use when connecting through a proxy.|None| ## StaticS3Firehose diff --git a/extensions-core/druid-kerberos/src/main/java/io/druid/security/kerberos/KerberosAuthenticator.java b/extensions-core/druid-kerberos/src/main/java/io/druid/security/kerberos/KerberosAuthenticator.java index fbdfb066c8e..215de9caa4f 100644 --- a/extensions-core/druid-kerberos/src/main/java/io/druid/security/kerberos/KerberosAuthenticator.java +++ b/extensions-core/druid-kerberos/src/main/java/io/druid/security/kerberos/KerberosAuthenticator.java @@ -334,13 +334,14 @@ public class KerberosAuthenticator implements Authenticator }; if (newToken && !token.isExpired() && token != AuthenticationToken.ANONYMOUS) { String signedToken = mySigner.sign(token.toString()); - tokenToAuthCookie(httpResponse, - signedToken, - getCookieDomain(), - getCookiePath(), - token.getExpires(), - !token.isExpired() && token.getExpires() > 0, - isHttps + tokenToAuthCookie( + httpResponse, + signedToken, + getCookieDomain(), + getCookiePath(), + token.getExpires(), + !token.isExpired() && token.getExpires() > 0, + isHttps ); } doFilter(filterChain, httpRequest, httpResponse); @@ -361,8 +362,14 @@ public class KerberosAuthenticator implements Authenticator } if (unauthorizedResponse) { if (!httpResponse.isCommitted()) { - tokenToAuthCookie(httpResponse, "", getCookieDomain(), - getCookiePath(), 0, false, isHttps + tokenToAuthCookie( + httpResponse, + "", + getCookieDomain(), + getCookiePath(), + 0, + false, + isHttps ); // If response code is 401. Then WWW-Authenticate Header should be // present.. reset to 403 if not found.. diff --git a/extensions-core/hdfs-storage/pom.xml b/extensions-core/hdfs-storage/pom.xml index f737da39c3e..4c1a853744f 100644 --- a/extensions-core/hdfs-storage/pom.xml +++ b/extensions-core/hdfs-storage/pom.xml @@ -145,16 +145,7 @@ org.apache.hadoop hadoop-aws ${hadoop.compile.version} - - - com.amazonaws - aws-java-sdk - - - - - com.amazonaws - aws-java-sdk-s3 + provided commons-io diff --git a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java index 6fba009cf86..c960541e634 100644 --- a/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java +++ b/extensions-core/hdfs-storage/src/main/java/io/druid/storage/hdfs/HdfsDataSegmentFinder.java @@ -22,11 +22,11 @@ package io.druid.storage.hdfs; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import com.google.inject.Inject; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentFinder; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; -import io.druid.java.util.common.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; diff --git a/extensions-core/s3-extensions/pom.xml b/extensions-core/s3-extensions/pom.xml index 3d389eab4d3..487d3378254 100644 --- a/extensions-core/s3-extensions/pom.xml +++ b/extensions-core/s3-extensions/pom.xml @@ -18,80 +18,75 @@ - 4.0.0 + 4.0.0 - io.druid.extensions - druid-s3-extensions - druid-s3-extensions - druid-s3-extensions + io.druid.extensions + druid-s3-extensions + druid-s3-extensions + druid-s3-extensions - - io.druid - druid - 0.13.0-SNAPSHOT - ../../pom.xml - + + io.druid + druid + 0.13.0-SNAPSHOT + ../../pom.xml + - - - io.druid - druid-api - ${project.parent.version} - provided - - - io.druid - druid-aws-common - ${project.parent.version} - provided - - - io.druid - java-util - ${project.parent.version} - provided - - - net.java.dev.jets3t - jets3t - provided - - - commons-io - commons-io - provided - - - com.fasterxml.jackson.module - jackson-module-guice - ${jackson.version} - provided - + + + io.druid + druid-api + ${project.parent.version} + provided + + + io.druid + druid-aws-common + ${project.parent.version} + provided + + + io.druid + java-util + ${project.parent.version} + provided + + + commons-io + commons-io + provided + + + com.fasterxml.jackson.module + jackson-module-guice + ${jackson.version} + provided + - - - io.druid - druid-server - ${project.parent.version} - test - - - io.druid - druid-processing - ${project.parent.version} - test-jar - test - - - junit - junit - test - - - org.easymock - easymock - test - - + + + io.druid + druid-server + ${project.parent.version} + test + + + io.druid + druid-processing + ${project.parent.version} + test-jar + test + + + junit + junit + test + + + org.easymock + easymock + test + + diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java index fa649dc495c..8827fc9ae31 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/firehose/s3/StaticS3FirehoseFactory.java @@ -19,28 +19,32 @@ package io.druid.firehose.s3; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.collect.Lists; import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.IAE; +import io.druid.java.util.common.IOE; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.storage.s3.S3Utils; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.ServiceException; -import org.jets3t.service.StorageObjectsChunk; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -48,18 +52,18 @@ import java.util.stream.Collectors; /** * Builds firehoses that read from a predefined list of S3 objects and then dry up. */ -public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactory +public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactory { private static final Logger log = new Logger(StaticS3FirehoseFactory.class); - private static final long MAX_LISTING_LENGTH = 1024; + private static final int MAX_LISTING_LENGTH = 1024; - private final RestS3Service s3Client; + private final AmazonS3 s3Client; private final List uris; private final List prefixes; @JsonCreator public StaticS3FirehoseFactory( - @JacksonInject("s3Client") RestS3Service s3Client, + @JacksonInject("s3Client") AmazonS3 s3Client, @JsonProperty("uris") List uris, @JsonProperty("prefixes") List prefixes, @JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes, @@ -70,7 +74,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor ) { super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry); - this.s3Client = Preconditions.checkNotNull(s3Client, "null s3Client"); + this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client"); this.uris = uris == null ? new ArrayList<>() : uris; this.prefixes = prefixes == null ? new ArrayList<>() : prefixes; @@ -104,7 +108,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor } @Override - protected Collection initObjects() throws IOException + protected Collection initObjects() throws IOException { // Here, the returned s3 objects contain minimal information without data. // Getting data is deferred until openObjectStream() is called for each object. @@ -113,53 +117,49 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor .map( uri -> { final String s3Bucket = uri.getAuthority(); - final S3Object s3Object = new S3Object(extractS3Key(uri)); - s3Object.setBucketName(s3Bucket); - return s3Object; + final String key = S3Utils.extractS3Key(uri); + return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key); } ) .collect(Collectors.toList()); } else { - final List objects = new ArrayList<>(); + final List objects = new ArrayList<>(); for (URI uri : prefixes) { final String bucket = uri.getAuthority(); - final String prefix = extractS3Key(uri); + final String prefix = S3Utils.extractS3Key(uri); + try { - String lastKey = null; - StorageObjectsChunk objectsChunk; - do { - objectsChunk = s3Client.listObjectsChunked( - bucket, - prefix, - null, - MAX_LISTING_LENGTH, - lastKey - ); - Arrays.stream(objectsChunk.getObjects()) - .filter(storageObject -> !storageObject.isDirectoryPlaceholder()) - .forEach(storageObject -> objects.add((S3Object) storageObject)); - lastKey = objectsChunk.getPriorLastKey(); - } while (!objectsChunk.isListingComplete()); + final Iterator objectSummaryIterator = S3Utils.objectSummaryIterator( + s3Client, + bucket, + prefix, + MAX_LISTING_LENGTH + ); + objects.addAll(Lists.newArrayList(objectSummaryIterator)); } - catch (ServiceException outerException) { + catch (AmazonS3Exception outerException) { log.error(outerException, "Exception while listing on %s", uri); - if (outerException.getResponseCode() == 403) { + if (outerException.getStatusCode() == 403) { // The "Access Denied" means users might not have a proper permission for listing on the given uri. // Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes. // In this case, users should be able to get objects if they have a proper permission for GetObject. log.warn("Access denied for %s. Try to get the object from the uri without listing", uri); try { - final S3Object s3Object = s3Client.getObject(bucket, prefix); - if (!s3Object.isDirectoryPlaceholder()) { - objects.add(s3Object); + final ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucket, prefix); + + if (!S3Utils.isDirectoryPlaceholder(prefix, objectMetadata)) { + objects.add(S3Utils.getSingleObjectSummary(s3Client, bucket, prefix)); } else { - throw new IOException(uri + " is a directory placeholder, " - + "but failed to get the object list under the directory due to permission"); + throw new IOE( + "[%s] is a directory placeholder, " + + "but failed to get the object list under the directory due to permission", + uri + ); } } - catch (S3ServiceException innerException) { + catch (AmazonS3Exception innerException) { throw new IOException(innerException); } } else { @@ -171,49 +171,46 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor } } - private static String extractS3Key(URI uri) - { - return uri.getPath().startsWith("/") - ? uri.getPath().substring(1) - : uri.getPath(); - } - @Override - protected InputStream openObjectStream(S3Object object) throws IOException + protected InputStream openObjectStream(S3ObjectSummary object) throws IOException { try { // Get data of the given object and open an input stream - return s3Client.getObject(object.getBucketName(), object.getKey()).getDataInputStream(); + final S3Object s3Object = s3Client.getObject(object.getBucketName(), object.getKey()); + if (s3Object == null) { + throw new ISE("Failed to get an s3 object for bucket[%s] and key[%s]", object.getBucketName(), object.getKey()); + } + return s3Object.getObjectContent(); } - catch (ServiceException e) { + catch (AmazonS3Exception e) { throw new IOException(e); } } @Override - protected InputStream openObjectStream(S3Object object, long start) throws IOException + protected InputStream openObjectStream(S3ObjectSummary object, long start) throws IOException { + final GetObjectRequest request = new GetObjectRequest(object.getBucketName(), object.getKey()); + request.setRange(start); try { - final S3Object result = s3Client.getObject( - object.getBucketName(), - object.getKey(), - null, - null, - null, - null, - start, - null - ); - - return result.getDataInputStream(); + final S3Object s3Object = s3Client.getObject(request); + if (s3Object == null) { + throw new ISE( + "Failed to get an s3 object for bucket[%s], key[%s], and start[%d]", + object.getBucketName(), + object.getKey(), + start + ); + } + return s3Object.getObjectContent(); } - catch (ServiceException e) { + catch (AmazonS3Exception e) { throw new IOException(e); } } @Override - protected InputStream wrapObjectStream(S3Object object, InputStream stream) throws IOException + protected InputStream wrapObjectStream(S3ObjectSummary object, InputStream stream) throws IOException { return object.getKey().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream; } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/AWSSessionCredentialsAdapter.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/AWSSessionCredentialsAdapter.java deleted file mode 100644 index 7a64a81e7c9..00000000000 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/AWSSessionCredentialsAdapter.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to Metamarkets Group Inc. (Metamarkets) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. Metamarkets licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package io.druid.storage.s3; - -import com.amazonaws.auth.AWSCredentialsProvider; -import org.jets3t.service.security.AWSSessionCredentials; - -public class AWSSessionCredentialsAdapter extends AWSSessionCredentials -{ - private final AWSCredentialsProvider provider; - - public AWSSessionCredentialsAdapter(AWSCredentialsProvider provider) - { - super(null, null, null); - if (provider.getCredentials() instanceof com.amazonaws.auth.AWSSessionCredentials) { - this.provider = provider; - } else { - throw new IllegalArgumentException("provider does not contain session credentials"); - } - } - - @Override - protected String getTypeName() - { - return "AWSSessionCredentialsAdapter"; - } - - @Override - public String getVersionPrefix() - { - return "AWSSessionCredentialsAdapter, version: "; - } - - @Override - public String getAccessKey() - { - return provider.getCredentials().getAWSAccessKeyId(); - } - - @Override - public String getSecretKey() - { - return provider.getCredentials().getAWSSecretKey(); - } - - @Override - public String getSessionToken() - { - com.amazonaws.auth.AWSSessionCredentials sessionCredentials = - (com.amazonaws.auth.AWSSessionCredentials) provider.getCredentials(); - return sessionCredentials.getSessionToken(); - } -} diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java index d7bc1b2d491..42eef5ce819 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentArchiver.java @@ -19,6 +19,7 @@ package io.druid.storage.s3; +import com.amazonaws.services.s3.AmazonS3; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Objects; import com.google.common.collect.ImmutableMap; @@ -28,7 +29,6 @@ import io.druid.segment.loading.DataSegmentArchiver; import io.druid.segment.loading.LoadSpec; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSegmentArchiver @@ -40,7 +40,7 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg @Inject public S3DataSegmentArchiver( @Json ObjectMapper mapper, - RestS3Service s3Client, + AmazonS3 s3Client, S3DataSegmentArchiverConfig archiveConfig, S3DataSegmentPusherConfig restoreConfig ) diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java index d6d773640e8..649554e7564 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentFinder.java @@ -19,22 +19,24 @@ package io.druid.storage.s3; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.collect.Sets; import com.google.inject.Inject; - +import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentFinder; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; -import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; -import org.jets3t.service.model.StorageObject; +import java.io.ByteArrayInputStream; import java.io.IOException; -import java.io.InputStream; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -43,13 +45,13 @@ public class S3DataSegmentFinder implements DataSegmentFinder { private static final Logger log = new Logger(S3DataSegmentFinder.class); - private final RestS3Service s3Client; + private final AmazonS3 s3Client; private final ObjectMapper jsonMapper; private final S3DataSegmentPusherConfig config; @Inject public S3DataSegmentFinder( - RestS3Service s3Client, + AmazonS3 s3Client, S3DataSegmentPusherConfig config, ObjectMapper jsonMapper ) @@ -65,24 +67,24 @@ public class S3DataSegmentFinder implements DataSegmentFinder final Set segments = Sets.newHashSet(); try { - Iterator objectsIterator = S3Utils.storageObjectsIterator( + final Iterator objectSummaryIterator = S3Utils.objectSummaryIterator( s3Client, config.getBucket(), workingDirPath.length() == 0 ? config.getBaseKey() : workingDirPath, - config.getMaxListingLength()); + config.getMaxListingLength() + ); - while (objectsIterator.hasNext()) { - StorageObject storageObject = objectsIterator.next(); - storageObject.closeDataInputStream(); + while (objectSummaryIterator.hasNext()) { + final S3ObjectSummary objectSummary = objectSummaryIterator.next(); - if (S3Utils.toFilename(storageObject.getKey()).equals("descriptor.json")) { - final String descriptorJson = storageObject.getKey(); + if (S3Utils.toFilename(objectSummary.getKey()).equals("descriptor.json")) { + final String descriptorJson = objectSummary.getKey(); String indexZip = S3Utils.indexZipForSegmentPath(descriptorJson); - if (S3Utils.isObjectInBucket(s3Client, config.getBucket(), indexZip)) { - S3Object indexObject = s3Client.getObject(config.getBucket(), descriptorJson); - - try (InputStream is = indexObject.getDataInputStream()) { + if (S3Utils.isObjectInBucketIgnoringPermission(s3Client, config.getBucket(), indexZip)) { + try (S3Object indexObject = s3Client.getObject(config.getBucket(), descriptorJson); + S3ObjectInputStream is = indexObject.getObjectContent()) { + final ObjectMetadata objectMetadata = indexObject.getObjectMetadata(); final DataSegment dataSegment = jsonMapper.readValue(is, DataSegment.class); log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip); @@ -99,8 +101,10 @@ public class S3DataSegmentFinder implements DataSegmentFinder descriptorJson, indexObject ); - S3Object newDescJsonObject = new S3Object(descriptorJson, jsonMapper.writeValueAsString(dataSegment)); - s3Client.putObject(config.getBucket(), newDescJsonObject); + final ByteArrayInputStream bais = new ByteArrayInputStream( + StringUtils.toUtf8(jsonMapper.writeValueAsString(dataSegment)) + ); + s3Client.putObject(config.getBucket(), descriptorJson, bais, objectMetadata); } } segments.add(dataSegment); @@ -114,7 +118,7 @@ public class S3DataSegmentFinder implements DataSegmentFinder } } } - catch (ServiceException e) { + catch (AmazonServiceException e) { throw new SegmentLoadingException(e, "Problem interacting with S3"); } catch (IOException e) { diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java index b1503e42b75..4053fdd6056 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentKiller.java @@ -19,14 +19,14 @@ package io.druid.storage.s3; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; import com.google.inject.Inject; import io.druid.java.util.common.MapUtils; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.DataSegmentKiller; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; -import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; import java.util.Map; @@ -36,11 +36,11 @@ public class S3DataSegmentKiller implements DataSegmentKiller { private static final Logger log = new Logger(S3DataSegmentKiller.class); - private final RestS3Service s3Client; + private final AmazonS3 s3Client; @Inject public S3DataSegmentKiller( - RestS3Service s3Client + AmazonS3 s3Client ) { this.s3Client = s3Client; @@ -55,16 +55,16 @@ public class S3DataSegmentKiller implements DataSegmentKiller String s3Path = MapUtils.getString(loadSpec, "key"); String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path); - if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { + if (s3Client.doesObjectExist(s3Bucket, s3Path)) { log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path); s3Client.deleteObject(s3Bucket, s3Path); } - if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) { + if (s3Client.doesObjectExist(s3Bucket, s3DescriptorPath)) { log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath); s3Client.deleteObject(s3Bucket, s3DescriptorPath); } } - catch (ServiceException e) { + catch (AmazonServiceException e) { throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getIdentifier(), e); } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java index 1c22418e2b4..e50ea2cca70 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentMover.java @@ -19,6 +19,13 @@ package io.druid.storage.s3; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.StorageClass; import com.google.common.base.Predicate; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; @@ -34,10 +41,6 @@ import io.druid.segment.loading.DataSegmentMover; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; -import org.jets3t.service.ServiceException; -import org.jets3t.service.acl.gs.GSAccessControlList; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; import java.io.IOException; import java.util.Map; @@ -46,12 +49,12 @@ public class S3DataSegmentMover implements DataSegmentMover { private static final Logger log = new Logger(S3DataSegmentMover.class); - private final RestS3Service s3Client; + private final AmazonS3 s3Client; private final S3DataSegmentPusherConfig config; @Inject public S3DataSegmentMover( - RestS3Service s3Client, + AmazonS3 s3Client, S3DataSegmentPusherConfig config ) { @@ -103,7 +106,7 @@ public class S3DataSegmentMover implements DataSegmentMover .build() ); } - catch (ServiceException e) { + catch (AmazonServiceException e) { throw new SegmentLoadingException(e, "Unable to move segment[%s]: [%s]", segment.getIdentifier(), e); } } @@ -113,7 +116,7 @@ public class S3DataSegmentMover implements DataSegmentMover final String s3Path, final String targetS3Bucket, final String targetS3Path - ) throws ServiceException, SegmentLoadingException + ) throws SegmentLoadingException { try { S3Utils.retryS3Operation( @@ -129,7 +132,7 @@ public class S3DataSegmentMover implements DataSegmentMover selfCheckingMove(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg); return null; } - catch (ServiceException | IOException | SegmentLoadingException e) { + catch (AmazonServiceException | IOException | SegmentLoadingException e) { log.info(e, "Error while trying to move " + copyMsg); throw e; } @@ -137,7 +140,7 @@ public class S3DataSegmentMover implements DataSegmentMover ); } catch (Exception e) { - Throwables.propagateIfInstanceOf(e, ServiceException.class); + Throwables.propagateIfInstanceOf(e, AmazonServiceException.class); Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class); throw Throwables.propagate(e); } @@ -155,40 +158,41 @@ public class S3DataSegmentMover implements DataSegmentMover String s3Path, String targetS3Path, String copyMsg - ) throws ServiceException, IOException, SegmentLoadingException + ) throws IOException, SegmentLoadingException { if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) { log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path); return; } - if (s3Client.isObjectInBucket(s3Bucket, s3Path)) { - final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, ""); - if (list.length == 0) { + if (s3Client.doesObjectExist(s3Bucket, s3Path)) { + final ListObjectsV2Result listResult = s3Client.listObjectsV2( + new ListObjectsV2Request() + .withBucketName(s3Bucket) + .withPrefix(s3Path) + .withMaxKeys(1) + ); + if (listResult.getKeyCount() == 0) { // should never happen throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path); } - final S3Object s3Object = list[0]; - if (s3Object.getStorageClass() != null && - s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) { - throw new ServiceException(StringUtils.format( - "Cannot move file[s3://%s/%s] of storage class glacier, skipping.", - s3Bucket, - s3Path - )); + final S3ObjectSummary objectSummary = listResult.getObjectSummaries().get(0); + if (objectSummary.getStorageClass() != null && + StorageClass.fromValue(StringUtils.toUpperCase(objectSummary.getStorageClass())).equals(StorageClass.Glacier)) { + throw new AmazonServiceException( + StringUtils.format( + "Cannot move file[s3://%s/%s] of storage class glacier, skipping.", + s3Bucket, + s3Path + ) + ); } else { log.info("Moving file %s", copyMsg); - final S3Object target = new S3Object(targetS3Path); + final CopyObjectRequest copyRequest = new CopyObjectRequest(s3Bucket, s3Path, targetS3Bucket, targetS3Path); if (!config.getDisableAcl()) { - target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); + copyRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(s3Client, targetS3Bucket)); } - s3Client.copyObject( - s3Bucket, - s3Path, - targetS3Bucket, - target, - false - ); - if (!s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) { + s3Client.copyObject(copyRequest); + if (!s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) { throw new IOE( "After copy was reported as successful the file doesn't exist in the target location [%s]", copyMsg @@ -199,7 +203,7 @@ public class S3DataSegmentMover implements DataSegmentMover } } else { // ensure object exists in target location - if (s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) { + if (s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) { log.info( "Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]", s3Bucket, s3Path, diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java index 1b2765af209..55a00a76b8b 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPuller.java @@ -19,6 +19,11 @@ package io.druid.storage.s3; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Predicate; import com.google.common.base.Strings; import com.google.common.base.Throwables; @@ -33,17 +38,15 @@ import io.druid.java.util.common.MapUtils; import io.druid.java.util.common.RE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.UOE; +import io.druid.java.util.common.io.Closer; import io.druid.java.util.common.logger.Logger; import io.druid.segment.loading.SegmentLoadingException; import io.druid.segment.loading.URIDataPuller; import io.druid.timeline.DataSegment; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.StorageObject; import javax.tools.FileObject; import java.io.File; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -59,17 +62,15 @@ public class S3DataSegmentPuller implements URIDataPuller { public static final int DEFAULT_RETRY_COUNT = 3; - public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws ServiceException + private static FileObject buildFileObject(final URI uri, final AmazonS3 s3Client) throws AmazonServiceException { final S3Coords coords = new S3Coords(checkURI(uri)); - final StorageObject s3Obj = s3Client.getObjectDetails(coords.bucket, coords.path); + final S3ObjectSummary objectSummary = S3Utils.getSingleObjectSummary(s3Client, coords.bucket, coords.path); final String path = uri.getPath(); return new FileObject() { - final Object inputStreamOpener = new Object(); - volatile boolean streamAcquired = false; - volatile StorageObject storageObject = s3Obj; + S3Object s3Object = null; @Override public URI toUri() @@ -84,22 +85,33 @@ public class S3DataSegmentPuller implements URIDataPuller return Files.getNameWithoutExtension(path) + (Strings.isNullOrEmpty(ext) ? "" : ("." + ext)); } + /** + * Returns an input stream for a s3 object. The returned input stream is not thread-safe. + */ @Override public InputStream openInputStream() throws IOException { try { - synchronized (inputStreamOpener) { - if (streamAcquired) { - return storageObject.getDataInputStream(); - } + if (s3Object == null) { // lazily promote to full GET - storageObject = s3Client.getObject(s3Obj.getBucketName(), s3Obj.getKey()); - final InputStream stream = storageObject.getDataInputStream(); - streamAcquired = true; - return stream; + s3Object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey()); } + + final InputStream in = s3Object.getObjectContent(); + final Closer closer = Closer.create(); + closer.register(in); + closer.register(s3Object); + + return new FilterInputStream(in) + { + @Override + public void close() throws IOException + { + closer.close(); + } + }; } - catch (ServiceException e) { + catch (AmazonServiceException e) { throw new IOE(e, "Could not load S3 URI [%s]", uri); } } @@ -131,7 +143,7 @@ public class S3DataSegmentPuller implements URIDataPuller @Override public long getLastModified() { - return s3Obj.getLastModifiedDate().getTime(); + return objectSummary.getLastModified().getTime(); } @Override @@ -149,11 +161,11 @@ public class S3DataSegmentPuller implements URIDataPuller protected static final String BUCKET = "bucket"; protected static final String KEY = "key"; - protected final RestS3Service s3Client; + protected final AmazonS3 s3Client; @Inject public S3DataSegmentPuller( - RestS3Service s3Client + AmazonS3 s3Client ) { this.s3Client = s3Client; @@ -180,7 +192,7 @@ public class S3DataSegmentPuller implements URIDataPuller try { return buildFileObject(uri, s3Client).openInputStream(); } - catch (ServiceException e) { + catch (AmazonServiceException e) { if (e.getCause() != null) { if (S3Utils.S3RETRY.apply(e)) { throw new IOException("Recoverable exception", e); @@ -242,7 +254,7 @@ public class S3DataSegmentPuller implements URIDataPuller try { return buildFileObject(uri, s3Client).openInputStream(); } - catch (ServiceException e) { + catch (AmazonServiceException e) { throw new IOE(e, "Could not load URI [%s]", uri); } } @@ -259,8 +271,8 @@ public class S3DataSegmentPuller implements URIDataPuller if (e == null) { return false; } - if (e instanceof ServiceException) { - return S3Utils.isServiceExceptionRecoverable((ServiceException) e); + if (e instanceof AmazonServiceException) { + return S3Utils.isServiceExceptionRecoverable((AmazonServiceException) e); } if (S3Utils.S3RETRY.apply(e)) { return true; @@ -284,10 +296,11 @@ public class S3DataSegmentPuller implements URIDataPuller public String getVersion(URI uri) throws IOException { try { - final FileObject object = buildFileObject(uri, s3Client); - return StringUtils.format("%d", object.getLastModified()); + final S3Coords coords = new S3Coords(checkURI(uri)); + final S3ObjectSummary objectSummary = S3Utils.getSingleObjectSummary(s3Client, coords.bucket, coords.path); + return StringUtils.format("%d", objectSummary.getLastModified().getTime()); } - catch (ServiceException e) { + catch (AmazonServiceException e) { if (S3Utils.isServiceExceptionRecoverable(e)) { // The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable throw new IOE(e, "Could not fetch last modified timestamp from URI [%s]", uri); @@ -301,10 +314,10 @@ public class S3DataSegmentPuller implements URIDataPuller { try { return S3Utils.retryS3Operation( - () -> S3Utils.isObjectInBucket(s3Client, coords.bucket, coords.path) + () -> S3Utils.isObjectInBucketIgnoringPermission(s3Client, coords.bucket, coords.path) ); } - catch (S3ServiceException | IOException e) { + catch (AmazonS3Exception | IOException e) { throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords); } catch (Exception e) { diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java index 985121bef62..981d24a7ef5 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3DataSegmentPusher.java @@ -19,21 +19,20 @@ package io.druid.storage.s3; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.PutObjectRequest; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.segment.SegmentUtils; import io.druid.segment.loading.DataSegmentPusher; import io.druid.timeline.DataSegment; -import org.jets3t.service.ServiceException; -import org.jets3t.service.acl.gs.GSAccessControlList; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; import java.io.File; import java.io.IOException; @@ -46,13 +45,13 @@ public class S3DataSegmentPusher implements DataSegmentPusher { private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class); - private final RestS3Service s3Client; + private final AmazonS3 s3Client; private final S3DataSegmentPusherConfig config; private final ObjectMapper jsonMapper; @Inject public S3DataSegmentPusher( - RestS3Service s3Client, + AmazonS3 s3Client, S3DataSegmentPusherConfig config, ObjectMapper jsonMapper ) @@ -97,45 +96,43 @@ public class S3DataSegmentPusher implements DataSegmentPusher final File zipOutFile = File.createTempFile("druid", "index.zip"); final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile); + final DataSegment outSegment = inSegment.withSize(indexSize) + .withLoadSpec(makeLoadSpec(config.getBucket(), s3Path)) + .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); + + final File descriptorFile = File.createTempFile("druid", "descriptor.json"); + // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in + // runtime, and because Guava deletes methods over time, that causes incompatibilities. + Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment)); + try { return S3Utils.retryS3Operation( () -> { - S3Object toPush = new S3Object(zipOutFile); - putObject(config.getBucket(), s3Path, toPush, replaceExisting); - - final DataSegment outSegment = inSegment.withSize(indexSize) - .withLoadSpec(makeLoadSpec(config.getBucket(), toPush.getKey())) - .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir)); - - File descriptorFile = File.createTempFile("druid", "descriptor.json"); - // Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in - // runtime, and because Guava deletes methods over time, that causes incompatibilities. - Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment)); - S3Object descriptorObject = new S3Object(descriptorFile); - - putObject( + uploadFileIfPossible(s3Client, config.getBucket(), s3Path, zipOutFile, replaceExisting); + uploadFileIfPossible( + s3Client, config.getBucket(), S3Utils.descriptorPathForSegmentPath(s3Path), - descriptorObject, + descriptorFile, replaceExisting ); - log.info("Deleting zipped index File[%s]", zipOutFile); - zipOutFile.delete(); - - log.info("Deleting descriptor file[%s]", descriptorFile); - descriptorFile.delete(); - return outSegment; } ); } - catch (ServiceException e) { + catch (AmazonServiceException e) { throw new IOException(e); } catch (Exception e) { throw Throwables.propagate(e); } + finally { + log.info("Deleting temporary cached index.zip"); + zipOutFile.delete(); + log.info("Deleting temporary cached descriptor.json"); + descriptorFile.delete(); + } } @Override @@ -163,21 +160,26 @@ public class S3DataSegmentPusher implements DataSegmentPusher ); } - private void putObject(String bucketName, String path, S3Object object, boolean replaceExisting) - throws ServiceException + private void uploadFileIfPossible( + AmazonS3 s3Client, + String bucket, + String key, + File file, + boolean replaceExisting + ) { - object.setBucketName(bucketName); - object.setKey(path); - if (!config.getDisableAcl()) { - object.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL); - } - - log.info("Pushing %s.", object); - - if (!replaceExisting && S3Utils.isObjectInBucket(s3Client, bucketName, object.getKey())) { - log.info("Skipping push because key [%s] exists && replaceExisting == false", object.getKey()); + if (!replaceExisting && S3Utils.isObjectInBucketIgnoringPermission(s3Client, bucket, key)) { + log.info("Skipping push because key [%s] exists && replaceExisting == false", key); } else { - s3Client.putObject(bucketName, object); + final PutObjectRequest indexFilePutRequest = new PutObjectRequest(bucket, key, file); + + if (!config.getDisableAcl()) { + indexFilePutRequest.setAccessControlList( + S3Utils.grantFullControlToBucketOwner(s3Client, bucket) + ); + } + log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key); + s3Client.putObject(indexFilePutRequest); } } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java index 8b9b3fdd7cc..ac7839f4363 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3StorageDruidModule.java @@ -19,8 +19,12 @@ package io.druid.storage.s3; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSSessionCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.S3ClientOptions; import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.Module; import com.google.common.collect.ImmutableList; @@ -28,13 +32,14 @@ import com.google.inject.Binder; import com.google.inject.Provides; import com.google.inject.multibindings.MapBinder; import io.druid.common.aws.AWSCredentialsConfig; +import io.druid.common.aws.AWSEndpointConfig; +import io.druid.common.aws.AWSProxyConfig; import io.druid.data.SearchableVersionedDataFinder; import io.druid.guice.Binders; import io.druid.guice.JsonConfigProvider; import io.druid.guice.LazySingleton; import io.druid.initialization.DruidModule; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.security.AWSCredentials; +import org.apache.commons.lang.StringUtils; import java.util.List; @@ -75,6 +80,8 @@ public class S3StorageDruidModule implements DruidModule public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class); + JsonConfigProvider.bind(binder, "druid.s3.proxy", AWSProxyConfig.class); + JsonConfigProvider.bind(binder, "druid.s3.endpoint", AWSEndpointConfig.class); MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class) .addBinding("s3") .to(S3TimestampVersionedDataFinder.class) @@ -101,15 +108,44 @@ public class S3StorageDruidModule implements DruidModule @Provides @LazySingleton - public RestS3Service getRestS3Service(AWSCredentialsProvider provider) + public AmazonS3 getAmazonS3Client( + AWSCredentialsProvider provider, + AWSProxyConfig proxyConfig, + AWSEndpointConfig endpointConfig + ) { - if (provider.getCredentials() instanceof AWSSessionCredentials) { - return new RestS3Service(new AWSSessionCredentialsAdapter(provider)); - } else { - return new RestS3Service(new AWSCredentials( - provider.getCredentials().getAWSAccessKeyId(), - provider.getCredentials().getAWSSecretKey() - )); + // AmazonS3ClientBuilder can't be used because it makes integration tests failed + final ClientConfiguration configuration = new ClientConfigurationFactory().getConfig(); + final AmazonS3Client client = new AmazonS3Client(provider, setProxyConfig(configuration, proxyConfig)); + + if (StringUtils.isNotEmpty(endpointConfig.getUrl())) { + if (StringUtils.isNotEmpty(endpointConfig.getServiceName()) && + StringUtils.isNotEmpty(endpointConfig.getSigningRegion())) { + client.setEndpoint(endpointConfig.getUrl(), endpointConfig.getServiceName(), endpointConfig.getSigningRegion()); + } else { + client.setEndpoint(endpointConfig.getUrl()); + } } + + client.setS3ClientOptions(S3ClientOptions.builder().enableForceGlobalBucketAccess().build()); + + return client; + } + + private static ClientConfiguration setProxyConfig(ClientConfiguration conf, AWSProxyConfig proxyConfig) + { + if (StringUtils.isNotEmpty(proxyConfig.getHost())) { + conf.setProxyHost(proxyConfig.getHost()); + } + if (proxyConfig.getPort() != -1) { + conf.setProxyPort(proxyConfig.getPort()); + } + if (StringUtils.isNotEmpty(proxyConfig.getUsername())) { + conf.setProxyUsername(proxyConfig.getUsername()); + } + if (StringUtils.isNotEmpty(proxyConfig.getPassword())) { + conf.setProxyPassword(proxyConfig.getPassword()); + } + return conf; } } diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java index 426221f508b..afef97a8892 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TaskLogs.java @@ -19,6 +19,11 @@ package io.druid.storage.s3; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import com.google.common.base.Optional; import com.google.common.base.Throwables; import com.google.common.io.ByteSource; @@ -27,10 +32,6 @@ import io.druid.java.util.common.IOE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; import io.druid.tasklogs.TaskLogs; -import org.jets3t.service.ServiceException; -import org.jets3t.service.StorageService; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.StorageObject; import java.io.File; import java.io.IOException; @@ -43,11 +44,11 @@ public class S3TaskLogs implements TaskLogs { private static final Logger log = new Logger(S3TaskLogs.class); - private final StorageService service; + private final AmazonS3 service; private final S3TaskLogsConfig config; @Inject - public S3TaskLogs(S3TaskLogsConfig config, RestS3Service service) + public S3TaskLogs(S3TaskLogsConfig config, AmazonS3 service) { this.config = config; this.service = service; @@ -59,9 +60,9 @@ public class S3TaskLogs implements TaskLogs final String taskKey = getTaskLogKey(taskid); try { - final StorageObject objectDetails = service.getObjectDetails(config.getS3Bucket(), taskKey, null, null, null, null); + final ObjectMetadata objectMetadata = service.getObjectMetadata(config.getS3Bucket(), taskKey); - return Optional.of( + return Optional.of( new ByteSource() { @Override @@ -69,36 +70,31 @@ public class S3TaskLogs implements TaskLogs { try { final long start; - final long end = objectDetails.getContentLength() - 1; + final long end = objectMetadata.getContentLength() - 1; - if (offset > 0 && offset < objectDetails.getContentLength()) { + if (offset > 0 && offset < objectMetadata.getContentLength()) { start = offset; - } else if (offset < 0 && (-1 * offset) < objectDetails.getContentLength()) { - start = objectDetails.getContentLength() + offset; + } else if (offset < 0 && (-1 * offset) < objectMetadata.getContentLength()) { + start = objectMetadata.getContentLength() + offset; } else { start = 0; } - return service.getObject( - config.getS3Bucket(), - taskKey, - null, - null, - new String[]{objectDetails.getETag()}, - null, - start, - end - ).getDataInputStream(); + final GetObjectRequest request = new GetObjectRequest(config.getS3Bucket(), taskKey) + .withMatchingETagConstraint(objectMetadata.getETag()) + .withRange(start, end); + + return service.getObject(request).getObjectContent(); } - catch (ServiceException e) { + catch (AmazonServiceException e) { throw new IOException(e); } } } ); } - catch (ServiceException e) { - if (404 == e.getResponseCode() + catch (AmazonS3Exception e) { + if (404 == e.getStatusCode() || "NoSuchKey".equals(e.getErrorCode()) || "NoSuchBucket".equals(e.getErrorCode())) { return Optional.absent(); @@ -117,9 +113,7 @@ public class S3TaskLogs implements TaskLogs try { S3Utils.retryS3Operation( () -> { - final StorageObject object = new StorageObject(logFile); - object.setKey(taskKey); - service.putObject(config.getS3Bucket(), object); + service.putObject(config.getS3Bucket(), taskKey, logFile); return null; } ); diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java index 8014ec8ac88..2d4724851b7 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3TimestampVersionedDataFinder.java @@ -19,16 +19,17 @@ package io.druid.storage.s3; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Throwables; import com.google.inject.Inject; import io.druid.data.SearchableVersionedDataFinder; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; import javax.annotation.Nullable; import java.net.URI; +import java.util.Iterator; import java.util.regex.Pattern; /** @@ -37,8 +38,10 @@ import java.util.regex.Pattern; */ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implements SearchableVersionedDataFinder { + private static final int MAX_LISTING_KEYS = 1000; + @Inject - public S3TimestampVersionedDataFinder(RestS3Service s3Client) + public S3TimestampVersionedDataFinder(AmazonS3 s3Client) { super(s3Client); } @@ -65,23 +68,27 @@ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implemen final S3Coords coords = new S3Coords(checkURI(uri)); long mostRecent = Long.MIN_VALUE; URI latest = null; - S3Object[] objects = s3Client.listObjects(coords.bucket, coords.path, null); - if (objects == null) { - return null; - } - for (S3Object storageObject : objects) { - storageObject.closeDataInputStream(); - String keyString = storageObject.getKey().substring(coords.path.length()); + final Iterator objectSummaryIterator = S3Utils.objectSummaryIterator( + s3Client, + coords.bucket, + coords.path, + MAX_LISTING_KEYS + ); + while (objectSummaryIterator.hasNext()) { + final S3ObjectSummary objectSummary = objectSummaryIterator.next(); + String keyString = objectSummary.getKey().substring(coords.path.length()); if (keyString.startsWith("/")) { keyString = keyString.substring(1); } if (pattern != null && !pattern.matcher(keyString).matches()) { continue; } - final long latestModified = storageObject.getLastModifiedDate().getTime(); + final long latestModified = objectSummary.getLastModified().getTime(); if (latestModified >= mostRecent) { mostRecent = latestModified; - latest = new URI(StringUtils.format("s3://%s/%s", storageObject.getBucketName(), storageObject.getKey())); + latest = new URI( + StringUtils.format("s3://%s/%s", objectSummary.getBucketName(), objectSummary.getKey()) + ); } } return latest; diff --git a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java index 2a9372e96d0..c4fa1576106 100644 --- a/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java +++ b/extensions-core/s3-extensions/src/main/java/io/druid/storage/s3/S3Utils.java @@ -19,19 +19,27 @@ package io.druid.storage.s3; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CanonicalGrantee; +import com.amazonaws.services.s3.model.Grant; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.Permission; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.base.Joiner; import com.google.common.base.Predicate; -import com.google.common.base.Throwables; +import io.druid.java.util.common.ISE; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.RetryUtils.Task; -import org.jets3t.service.ServiceException; -import org.jets3t.service.StorageObjectsChunk; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; -import org.jets3t.service.model.StorageObject; import java.io.IOException; +import java.net.URI; import java.util.Iterator; +import java.util.NoSuchElementException; /** * @@ -39,25 +47,12 @@ import java.util.Iterator; public class S3Utils { private static final Joiner JOINER = Joiner.on("/").skipNulls(); + private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory"; - public static void closeStreamsQuietly(S3Object s3Obj) - { - if (s3Obj == null) { - return; - } - - try { - s3Obj.closeDataInputStream(); - } - catch (IOException e) { - - } - } - - public static boolean isServiceExceptionRecoverable(ServiceException ex) + static boolean isServiceExceptionRecoverable(AmazonServiceException ex) { final boolean isIOException = ex.getCause() instanceof IOException; - final boolean isTimeout = "RequestTimeout".equals(((ServiceException) ex).getErrorCode()); + final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode()); return isIOException || isTimeout; } @@ -70,8 +65,8 @@ public class S3Utils return false; } else if (e instanceof IOException) { return true; - } else if (e instanceof ServiceException) { - return isServiceExceptionRecoverable((ServiceException) e); + } else if (e instanceof AmazonServiceException) { + return isServiceExceptionRecoverable((AmazonServiceException) e); } else { return apply(e.getCause()); } @@ -88,91 +83,81 @@ public class S3Utils return RetryUtils.retry(f, S3RETRY, maxTries); } - public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey) - throws ServiceException + static boolean isObjectInBucketIgnoringPermission(AmazonS3 s3Client, String bucketName, String objectKey) { try { - s3Client.getObjectDetails(bucketName, objectKey); + return s3Client.doesObjectExist(bucketName, objectKey); } - catch (ServiceException e) { - if (404 == e.getResponseCode() - || "NoSuchKey".equals(e.getErrorCode()) - || "NoSuchBucket".equals(e.getErrorCode())) { - return false; - } - if ("AccessDenied".equals(e.getErrorCode())) { + catch (AmazonS3Exception e) { + if (e.getStatusCode() == 404) { // Object is inaccessible to current user, but does exist. return true; } // Something else has gone wrong throw e; } - return true; } - public static Iterator storageObjectsIterator( - final RestS3Service s3Client, + public static Iterator objectSummaryIterator( + final AmazonS3 s3Client, final String bucket, final String prefix, - final long maxListingLength + final int numMaxKeys ) { - return new Iterator() + final ListObjectsV2Request request = new ListObjectsV2Request() + .withBucketName(bucket) + .withPrefix(prefix) + .withMaxKeys(numMaxKeys); + + return new Iterator() { - private StorageObjectsChunk objectsChunk; - private int objectsChunkOffset; + private ListObjectsV2Result result; + private Iterator objectSummaryIterator; + + { + fetchNextBatch(); + } + + private void fetchNextBatch() + { + result = s3Client.listObjectsV2(request); + objectSummaryIterator = result.getObjectSummaries().iterator(); + request.setContinuationToken(result.getContinuationToken()); + } @Override public boolean hasNext() { - if (objectsChunk == null) { - objectsChunk = listObjectsChunkedAfter(""); - objectsChunkOffset = 0; - } - - if (objectsChunk.getObjects().length <= objectsChunkOffset) { - if (objectsChunk.isListingComplete()) { - return false; - } else { - objectsChunk = listObjectsChunkedAfter(objectsChunk.getPriorLastKey()); - objectsChunkOffset = 0; - } - } - - return true; - } - - private StorageObjectsChunk listObjectsChunkedAfter(final String priorLastKey) - { - try { - return retryS3Operation( - () -> s3Client.listObjectsChunked(bucket, prefix, null, maxListingLength, priorLastKey) - ); - } - catch (Exception e) { - throw Throwables.propagate(e); - } + return objectSummaryIterator.hasNext() || result.isTruncated(); } @Override - public StorageObject next() + public S3ObjectSummary next() { if (!hasNext()) { - throw new IllegalStateException(); + throw new NoSuchElementException(); } - StorageObject storageObject = objectsChunk.getObjects()[objectsChunkOffset]; - objectsChunkOffset++; - return storageObject; + if (objectSummaryIterator.hasNext()) { + return objectSummaryIterator.next(); + } + + if (result.isTruncated()) { + fetchNextBatch(); + } + + if (!objectSummaryIterator.hasNext()) { + throw new ISE( + "Failed to further iterate on bucket[%s] and prefix[%s]. The last continuationToken was [%s]", + bucket, + prefix, + result.getContinuationToken() + ); + } + + return objectSummaryIterator.next(); } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - - }; } @@ -184,25 +169,93 @@ public class S3Utils ) + "/index.zip"; } - public static String descriptorPathForSegmentPath(String s3Path) + static String descriptorPathForSegmentPath(String s3Path) { return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json"; } - public static String indexZipForSegmentPath(String s3Path) + static String indexZipForSegmentPath(String s3Path) { return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/index.zip"; } - public static String toFilename(String key) + static String toFilename(String key) { return toFilename(key, ""); } - public static String toFilename(String key, final String suffix) + static String toFilename(String key, final String suffix) { String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/' filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end return filename; } + + static AccessControlList grantFullControlToBucketOwner(AmazonS3 s3Client, String bucket) + { + final AccessControlList acl = s3Client.getBucketAcl(bucket); + acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl)); + return acl; + } + + public static String extractS3Key(URI uri) + { + return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath(); + } + + // Copied from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder() + public static boolean isDirectoryPlaceholder(String key, ObjectMetadata objectMetadata) + { + // Recognize "standard" directory place-holder indications used by + // Amazon's AWS Console and Panic's Transmit. + if (key.endsWith("/") && objectMetadata.getContentLength() == 0) { + return true; + } + // Recognize s3sync.rb directory placeholders by MD5/ETag value. + if ("d66759af42f282e1ba19144df2d405d0".equals(objectMetadata.getETag())) { + return true; + } + // Recognize place-holder objects created by the Google Storage console + // or S3 Organizer Firefox extension. + if (key.endsWith("_$folder$") && objectMetadata.getContentLength() == 0) { + return true; + } + + // We don't use JetS3t APIs anymore, but the below check is still needed for backward compatibility. + + // Recognize legacy JetS3t directory place-holder objects, only gives + // accurate results if an object's metadata is populated. + if (objectMetadata.getContentLength() == 0 && MIMETYPE_JETS3T_DIRECTORY.equals(objectMetadata.getContentType())) { + return true; + } + return false; + } + + /** + * Gets a single {@link S3ObjectSummary} from s3. Since this method might return a wrong object if there are multiple + * objects that match the given key, this method should be used only when it's guaranteed that the given key is unique + * in the given bucket. + * + * @param s3Client s3 client + * @param bucket s3 bucket + * @param key unique key for the object to be retrieved + */ + public static S3ObjectSummary getSingleObjectSummary(AmazonS3 s3Client, String bucket, String key) + { + final ListObjectsV2Request request = new ListObjectsV2Request() + .withBucketName(bucket) + .withPrefix(key) + .withMaxKeys(1); + final ListObjectsV2Result result = s3Client.listObjectsV2(request); + + if (result.getKeyCount() == 0) { + throw new ISE("Cannot find object for bucket[%s] and key[%s]", bucket, key); + } + final S3ObjectSummary objectSummary = result.getObjectSummaries().get(0); + if (!objectSummary.getBucketName().equals(bucket) || !objectSummary.getKey().equals(key)) { + throw new ISE("Wrong object[%s] for bucket[%s] and key[%s]", objectSummary, bucket, key); + } + + return objectSummary; + } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java index 3a5c94471cd..e6021012694 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/firehose/s3/StaticS3FirehoseFactoryTest.java @@ -19,8 +19,13 @@ package io.druid.firehose.s3; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.deser.std.StdDeserializer; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.guice.ObjectMapperModule; import com.google.common.collect.ImmutableList; @@ -29,8 +34,6 @@ import com.google.inject.Guice; import com.google.inject.Injector; import com.google.inject.Provides; import io.druid.initialization.DruidModule; -import io.druid.jackson.DefaultObjectMapper; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.junit.Assert; import org.junit.Test; @@ -42,7 +45,7 @@ import java.util.List; */ public class StaticS3FirehoseFactoryTest { - private static final RestS3Service SERVICE = new RestS3Service(null); + private static final AmazonS3Client SERVICE = new AmazonS3Client(); @Test public void testSerde() throws Exception @@ -75,14 +78,14 @@ public class StaticS3FirehoseFactoryTest private static ObjectMapper createObjectMapper(DruidModule baseModule) { - final ObjectMapper baseMapper = new DefaultObjectMapper(); - baseModule.getJacksonModules().forEach(baseMapper::registerModule); - final Injector injector = Guice.createInjector( new ObjectMapperModule(), baseModule ); - return injector.getInstance(ObjectMapper.class); + final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class); + + baseModule.getJacksonModules().forEach(baseMapper::registerModule); + return baseMapper; } private static class TestS3Module implements DruidModule @@ -90,7 +93,9 @@ public class StaticS3FirehoseFactoryTest @Override public List getJacksonModules() { - return ImmutableList.of(new SimpleModule()); + // Deserializer is need for AmazonS3Client even though it is injected. + // See https://github.com/FasterXML/jackson-databind/issues/962. + return ImmutableList.of(new SimpleModule().addDeserializer(AmazonS3.class, new ItemDeserializer())); } @Override @@ -100,9 +105,28 @@ public class StaticS3FirehoseFactoryTest } @Provides - public RestS3Service getRestS3Service() + public AmazonS3 getAmazonS3Client() { return SERVICE; } } + + public static class ItemDeserializer extends StdDeserializer + { + public ItemDeserializer() + { + this(null); + } + + public ItemDeserializer(Class vc) + { + super(vc); + } + + @Override + public AmazonS3 deserialize(JsonParser jp, DeserializationContext ctxt) + { + throw new UnsupportedOperationException(); + } + } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentArchiverTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentArchiverTest.java index d28d5f93389..d93dfc08c55 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentArchiverTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentArchiverTest.java @@ -19,6 +19,7 @@ package io.druid.storage.s3; +import com.amazonaws.services.s3.AmazonS3Client; import com.fasterxml.jackson.databind.BeanProperty; import com.fasterxml.jackson.databind.DeserializationContext; import com.fasterxml.jackson.databind.InjectableValues; @@ -30,7 +31,6 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.Intervals; import io.druid.timeline.DataSegment; import org.easymock.EasyMock; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -65,7 +65,7 @@ public class S3DataSegmentArchiverTest } }; private static final S3DataSegmentPusherConfig PUSHER_CONFIG = new S3DataSegmentPusherConfig(); - private static final RestS3Service S3_SERVICE = EasyMock.createStrictMock(RestS3Service.class); + private static final AmazonS3Client S3_SERVICE = EasyMock.createStrictMock(AmazonS3Client.class); private static final S3DataSegmentPuller PULLER = new S3DataSegmentPuller(S3_SERVICE); private static final DataSegment SOURCE_SEGMENT = DataSegment .builder() diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java index 12f76126bb4..5c449faf2e1 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentFinderTest.java @@ -19,6 +19,15 @@ package io.druid.storage.s3; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Predicate; @@ -31,17 +40,13 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import io.druid.java.util.common.Intervals; +import io.druid.java.util.common.StringUtils; import io.druid.segment.TestHelper; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; -import org.jets3t.service.ServiceException; -import org.jets3t.service.StorageObjectsChunk; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; -import org.jets3t.service.model.StorageObject; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -50,8 +55,12 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.InputStream; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -105,7 +114,7 @@ public class S3DataSegmentFinderTest @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); - RestS3Service mockS3Client; + MockAmazonS3Client mockS3Client; S3DataSegmentPusherConfig config; private String bucket; @@ -122,8 +131,6 @@ public class S3DataSegmentFinderTest private String indexZip4_0; private String indexZip4_1; - - @BeforeClass public static void setUpStatic() { @@ -140,7 +147,7 @@ public class S3DataSegmentFinderTest config.setBucket(bucket); config.setBaseKey(baseKey); - mockS3Client = new MockStorageService(temporaryFolder.newFolder()); + mockS3Client = new MockAmazonS3Client(temporaryFolder.newFolder()); descriptor1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval1/v1/0/"); @@ -154,17 +161,17 @@ public class S3DataSegmentFinderTest indexZip4_0 = S3Utils.indexZipForSegmentPath(descriptor4_0); indexZip4_1 = S3Utils.indexZipForSegmentPath(descriptor4_1); - mockS3Client.putObject(bucket, new S3Object(descriptor1, mapper.writeValueAsString(SEGMENT_1))); - mockS3Client.putObject(bucket, new S3Object(descriptor2, mapper.writeValueAsString(SEGMENT_2))); - mockS3Client.putObject(bucket, new S3Object(descriptor3, mapper.writeValueAsString(SEGMENT_3))); - mockS3Client.putObject(bucket, new S3Object(descriptor4_0, mapper.writeValueAsString(SEGMENT_4_0))); - mockS3Client.putObject(bucket, new S3Object(descriptor4_1, mapper.writeValueAsString(SEGMENT_4_1))); + mockS3Client.putObject(bucket, descriptor1, mapper.writeValueAsString(SEGMENT_1)); + mockS3Client.putObject(bucket, descriptor2, mapper.writeValueAsString(SEGMENT_2)); + mockS3Client.putObject(bucket, descriptor3, mapper.writeValueAsString(SEGMENT_3)); + mockS3Client.putObject(bucket, descriptor4_0, mapper.writeValueAsString(SEGMENT_4_0)); + mockS3Client.putObject(bucket, descriptor4_1, mapper.writeValueAsString(SEGMENT_4_1)); - mockS3Client.putObject(bucket, new S3Object(indexZip1, "dummy")); - mockS3Client.putObject(bucket, new S3Object(indexZip2, "dummy")); - mockS3Client.putObject(bucket, new S3Object(indexZip3, "dummy")); - mockS3Client.putObject(bucket, new S3Object(indexZip4_0, "dummy")); - mockS3Client.putObject(bucket, new S3Object(indexZip4_1, "dummy")); + mockS3Client.putObject(bucket, indexZip1, "dummy"); + mockS3Client.putObject(bucket, indexZip2, "dummy"); + mockS3Client.putObject(bucket, indexZip3, "dummy"); + mockS3Client.putObject(bucket, indexZip4_0, "dummy"); + mockS3Client.putObject(bucket, indexZip4_1, "dummy"); } @Test @@ -210,34 +217,34 @@ public class S3DataSegmentFinderTest final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1); Assert.assertNotEquals(serializedSegment1, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent())); Assert.assertNotEquals(serializedSegment2, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent())); Assert.assertNotEquals(serializedSegment3, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent())); Assert.assertNotEquals(serializedSegment4_0, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent())); Assert.assertNotEquals(serializedSegment4_1, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent())); final Set segments2 = s3DataSegmentFinder.findSegments("", true); Assert.assertEquals(segments, segments2); Assert.assertEquals(serializedSegment1, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent())); Assert.assertEquals(serializedSegment2, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent())); Assert.assertEquals(serializedSegment3, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent())); Assert.assertEquals(serializedSegment4_0, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent())); Assert.assertEquals(serializedSegment4_1, - IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream())); + IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent())); } @Test(expected = SegmentLoadingException.class) - public void testFindSegmentsFail() throws SegmentLoadingException, ServiceException + public void testFindSegmentsFail() throws SegmentLoadingException { mockS3Client.deleteObject(bucket, indexZip4_1); @@ -275,21 +282,8 @@ public class S3DataSegmentFinderTest final String descriptorPath = S3Utils.descriptorPathForSegmentPath(segmentPath); final String indexPath = S3Utils.indexZipForSegmentPath(segmentPath); - mockS3Client.putObject( - config.getBucket(), - new S3Object( - descriptorPath, - mapper.writeValueAsString(segmentMissingLoadSpec) - ) - ); - - mockS3Client.putObject( - config.getBucket(), - new S3Object( - indexPath, - "dummy" - ) - ); + mockS3Client.putObject(config.getBucket(), descriptorPath, mapper.writeValueAsString(segmentMissingLoadSpec)); + mockS3Client.putObject(config.getBucket(), indexPath, "dummy"); Set segments = s3DataSegmentFinder.findSegments(segmentPath, false); Assert.assertEquals(1, segments.size()); @@ -308,24 +302,34 @@ public class S3DataSegmentFinderTest return S3Utils.descriptorPathForSegmentPath(String.valueOf(segment.getLoadSpec().get("key"))); } - private static class MockStorageService extends RestS3Service + private static class MockAmazonS3Client extends AmazonS3Client { private final File baseDir; private final Map> storage = Maps.newHashMap(); - public MockStorageService(File baseDir) + public MockAmazonS3Client(File baseDir) { - super(null); + super(); this.baseDir = baseDir; } @Override - public StorageObjectsChunk listObjectsChunked( - final String bucketName, final String prefix, final String delimiter, - final long maxListingLength, final String priorLastKey - ) throws ServiceException + public boolean doesObjectExist(String bucketName, String objectName) { - List keysOrigin = Lists.newArrayList(storage.get(bucketName)); + final Set keys = storage.get(bucketName); + if (keys != null) { + return keys.contains(objectName); + } + return false; + } + + @Override + public ListObjectsV2Result listObjectsV2(ListObjectsV2Request listObjectsV2Request) + { + final String bucketName = listObjectsV2Request.getBucketName(); + final String prefix = listObjectsV2Request.getPrefix(); + + final List keysOrigin = Lists.newArrayList(storage.get(bucketName)); Predicate prefixFilter = new Predicate() { @@ -341,11 +345,11 @@ public class S3DataSegmentFinderTest ); int startOffset = 0; - if (priorLastKey != null) { - startOffset = keys.indexOf(priorLastKey) + 1; + if (listObjectsV2Request.getContinuationToken() != null) { + startOffset = keys.indexOf(listObjectsV2Request.getContinuationToken()) + 1; } - int endOffset = startOffset + (int) maxListingLength; // exclusive + int endOffset = startOffset + listObjectsV2Request.getMaxKeys(); // exclusive if (endOffset > keys.size()) { endOffset = keys.size(); } @@ -355,64 +359,73 @@ public class S3DataSegmentFinderTest newPriorLastkey = null; } - List objects = Lists.newArrayList(); + List objects = new ArrayList<>(); for (String objectKey : keys.subList(startOffset, endOffset)) { - objects.add(getObjectDetails(bucketName, objectKey)); + final S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucketName); + objectSummary.setKey(objectKey); + objects.add(objectSummary); } - return new StorageObjectsChunk( - prefix, delimiter, objects.toArray(new StorageObject[]{}), null, newPriorLastkey); - } + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.setBucketName(bucketName); + result.setKeyCount(objects.size()); + result.getObjectSummaries().addAll(objects); + result.setContinuationToken(newPriorLastkey); + result.setTruncated(newPriorLastkey != null); - @Override - public StorageObject getObjectDetails(String bucketName, String objectKey) throws ServiceException - { - - if (!storage.containsKey(bucketName)) { - ServiceException ex = new ServiceException(); - ex.setResponseCode(404); - ex.setErrorCode("NoSuchBucket"); - throw ex; - } - - if (!storage.get(bucketName).contains(objectKey)) { - ServiceException ex = new ServiceException(); - ex.setResponseCode(404); - ex.setErrorCode("NoSuchKey"); - throw ex; - } - - final File objectPath = new File(baseDir, objectKey); - StorageObject storageObject = new StorageObject(); - storageObject.setBucketName(bucketName); - storageObject.setKey(objectKey); - storageObject.setDataInputFile(objectPath); - - return storageObject; + return result; } @Override public S3Object getObject(String bucketName, String objectKey) { + if (!storage.containsKey(bucketName)) { + AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest"); + ex.setStatusCode(404); + ex.setErrorCode("NoSuchBucket"); + throw ex; + } + + if (!storage.get(bucketName).contains(objectKey)) { + AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest"); + ex.setStatusCode(404); + ex.setErrorCode("NoSuchKey"); + throw ex; + } + final File objectPath = new File(baseDir, objectKey); - S3Object s3Object = new S3Object(); - s3Object.setBucketName(bucketName); - s3Object.setKey(objectKey); - s3Object.setDataInputFile(objectPath); - - return s3Object; + S3Object storageObject = new S3Object(); + storageObject.setBucketName(bucketName); + storageObject.setKey(objectKey); + try { + storageObject.setObjectContent(new FileInputStream(objectPath)); + } + catch (FileNotFoundException e) { + AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest", e); + ex.setStatusCode(500); + ex.setErrorCode("InternalError"); + throw ex; + } + return storageObject; } @Override - public S3Object putObject(final String bucketName, final S3Object object) + public PutObjectResult putObject(String bucketName, String key, String data) + { + return putObject(bucketName, key, new ByteArrayInputStream(StringUtils.toUtf8(data)), null); + } + + @Override + public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata) { if (!storage.containsKey(bucketName)) { - storage.put(bucketName, Sets.newHashSet()); + storage.put(bucketName, Sets.newHashSet()); } - storage.get(bucketName).add(object.getKey()); + storage.get(bucketName).add(key); - final File objectPath = new File(baseDir, object.getKey()); + final File objectPath = new File(baseDir, key); if (!objectPath.getParentFile().exists()) { objectPath.getParentFile().mkdirs(); @@ -420,7 +433,7 @@ public class S3DataSegmentFinderTest try { try ( - InputStream in = object.getDataInputStream() + InputStream in = input ) { FileUtils.copyInputStreamToFile(in, objectPath); } @@ -429,7 +442,7 @@ public class S3DataSegmentFinderTest throw Throwables.propagate(e); } - return object; + return new PutObjectResult(); } @Override diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java index 95ddcdfb7dd..a848eb18082 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentMoverTest.java @@ -19,6 +19,22 @@ package io.druid.storage.s3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.CanonicalGrantee; +import com.amazonaws.services.s3.model.CopyObjectRequest; +import com.amazonaws.services.s3.model.CopyObjectResult; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; +import com.amazonaws.services.s3.model.Grant; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.Owner; +import com.amazonaws.services.s3.model.Permission; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.StorageClass; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; @@ -28,12 +44,11 @@ import io.druid.java.util.common.MapUtils; import io.druid.segment.loading.SegmentLoadingException; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; -import org.jets3t.service.model.StorageObject; import org.junit.Assert; import org.junit.Test; +import java.io.File; +import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -59,11 +74,17 @@ public class S3DataSegmentMoverTest @Test public void testMove() throws Exception { - MockStorageService mockS3Client = new MockStorageService(); + MockAmazonS3Client mockS3Client = new MockAmazonS3Client(); S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig()); - mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip")); - mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json")); + mockS3Client.putObject( + "main", + "baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip" + ); + mockS3Client.putObject( + "main", + "baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json" + ); DataSegment movedSegment = mover.move( sourceSegment, @@ -79,11 +100,17 @@ public class S3DataSegmentMoverTest @Test public void testMoveNoop() throws Exception { - MockStorageService mockS3Client = new MockStorageService(); + MockAmazonS3Client mockS3Client = new MockAmazonS3Client(); S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig()); - mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip")); - mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json")); + mockS3Client.putObject( + "archive", + "targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip" + ); + mockS3Client.putObject( + "archive", + "targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json" + ); DataSegment movedSegment = mover.move( sourceSegment, @@ -100,7 +127,7 @@ public class S3DataSegmentMoverTest @Test(expected = SegmentLoadingException.class) public void testMoveException() throws Exception { - MockStorageService mockS3Client = new MockStorageService(); + MockAmazonS3Client mockS3Client = new MockAmazonS3Client(); S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig()); mover.move( @@ -112,7 +139,7 @@ public class S3DataSegmentMoverTest @Test public void testIgnoresGoneButAlreadyMoved() throws Exception { - MockStorageService mockS3Client = new MockStorageService(); + MockAmazonS3Client mockS3Client = new MockAmazonS3Client(); S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig()); mover.move(new DataSegment( "test", @@ -135,7 +162,7 @@ public class S3DataSegmentMoverTest @Test(expected = SegmentLoadingException.class) public void testFailsToMoveMissing() throws Exception { - MockStorageService mockS3Client = new MockStorageService(); + MockAmazonS3Client mockS3Client = new MockAmazonS3Client(); S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig()); mover.move(new DataSegment( "test", @@ -155,15 +182,15 @@ public class S3DataSegmentMoverTest ), ImmutableMap.of("bucket", "DOES NOT EXIST", "baseKey", "baseKey2")); } - private static class MockStorageService extends RestS3Service + private static class MockAmazonS3Client extends AmazonS3Client { Map> storage = Maps.newHashMap(); boolean copied = false; boolean deletedOld = false; - private MockStorageService() + private MockAmazonS3Client() { - super(null); + super(); } public boolean didMove() @@ -172,37 +199,68 @@ public class S3DataSegmentMoverTest } @Override - public boolean isObjectInBucket(String bucketName, String objectKey) + public AccessControlList getBucketAcl(String bucketName) + { + final AccessControlList acl = new AccessControlList(); + acl.setOwner(new Owner("ownerId", "owner")); + acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl)); + return acl; + } + + @Override + public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest) + { + return new ObjectMetadata(); + } + + @Override + public boolean doesObjectExist(String bucketName, String objectKey) { Set objects = storage.get(bucketName); return (objects != null && objects.contains(objectKey)); } @Override - public S3Object[] listObjects(String bucketName, String objectKey, String separator) + public ListObjectsV2Result listObjectsV2(ListObjectsV2Request listObjectsV2Request) { - if (isObjectInBucket(bucketName, objectKey)) { - final S3Object object = new S3Object(objectKey); - object.setStorageClass(S3Object.STORAGE_CLASS_STANDARD); - return new S3Object[]{object}; + final String bucketName = listObjectsV2Request.getBucketName(); + final String objectKey = listObjectsV2Request.getPrefix(); + if (doesObjectExist(bucketName, objectKey)) { + final S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucketName); + objectSummary.setKey(objectKey); + objectSummary.setStorageClass(StorageClass.Standard.name()); + + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.setBucketName(bucketName); + result.setPrefix(objectKey); + result.setKeyCount(1); + result.getObjectSummaries().add(objectSummary); + result.setTruncated(true); + return result; + } else { + return new ListObjectsV2Result(); } - return new S3Object[]{}; } @Override - public Map copyObject( - String sourceBucketName, - String sourceObjectKey, - String destinationBucketName, - StorageObject destinationObject, - boolean replaceMetadata - ) + public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest) { + final String sourceBucketName = copyObjectRequest.getSourceBucketName(); + final String sourceObjectKey = copyObjectRequest.getSourceKey(); + final String destinationBucketName = copyObjectRequest.getDestinationBucketName(); + final String destinationObjectKey = copyObjectRequest.getDestinationKey(); copied = true; - if (isObjectInBucket(sourceBucketName, sourceObjectKey)) { - this.putObject(destinationBucketName, new S3Object(destinationObject.getKey())); + if (doesObjectExist(sourceBucketName, sourceObjectKey)) { + storage.computeIfAbsent(destinationBucketName, k -> new HashSet<>()) + .add(destinationObjectKey); + return new CopyObjectResult(); + } else { + final AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentMoverTest"); + exception.setErrorCode("NoSuchKey"); + exception.setStatusCode(404); + throw exception; } - return null; } @Override @@ -212,14 +270,19 @@ public class S3DataSegmentMoverTest storage.get(bucket).remove(objectKey); } + public PutObjectResult putObject(String bucketName, String key) + { + return putObject(bucketName, key, (File) null); + } + @Override - public S3Object putObject(String bucketName, S3Object object) + public PutObjectResult putObject(String bucketName, String key, File file) { if (!storage.containsKey(bucketName)) { storage.put(bucketName, Sets.newHashSet()); } - storage.get(bucketName).add(object.getKey()); - return object; + storage.get(bucketName).add(key); + return new PutObjectResult(); } } } diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java index 303bf657d6e..8bc028a64f0 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPullerTest.java @@ -19,9 +19,21 @@ package io.druid.storage.s3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AmazonS3Exception; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; import io.druid.java.util.common.FileUtils; import io.druid.java.util.common.StringUtils; import io.druid.segment.loading.SegmentLoadingException; +import org.easymock.EasyMock; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; @@ -30,15 +42,6 @@ import java.io.OutputStream; import java.net.URI; import java.util.Date; import java.util.zip.GZIPOutputStream; -import org.easymock.EasyMock; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; /** * @@ -50,26 +53,29 @@ public class S3DataSegmentPullerTest public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Test - public void testSimpleGetVersion() throws ServiceException, IOException + public void testSimpleGetVersion() throws IOException { String bucket = "bucket"; String keyPrefix = "prefix/dir/0"; - RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); + AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class); - S3Object object0 = new S3Object(); + final S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucket); + objectSummary.setKey(keyPrefix + "/renames-0.gz"); + objectSummary.setLastModified(new Date(0)); - object0.setBucketName(bucket); - object0.setKey(keyPrefix + "/renames-0.gz"); - object0.setLastModifiedDate(new Date(0)); + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.setKeyCount(1); + result.getObjectSummaries().add(objectSummary); - EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) - .andReturn(object0) + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) + .andReturn(result) .once(); S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client); EasyMock.replay(s3Client); - String version = puller.getVersion(URI.create(StringUtils.format("s3://%s/%s", bucket, object0.getKey()))); + String version = puller.getVersion(URI.create(StringUtils.format("s3://%s/%s", bucket, objectSummary.getKey()))); EasyMock.verify(s3Client); @@ -77,11 +83,11 @@ public class S3DataSegmentPullerTest } @Test - public void testGZUncompress() throws ServiceException, IOException, SegmentLoadingException + public void testGZUncompress() throws IOException, SegmentLoadingException { final String bucket = "bucket"; final String keyPrefix = "prefix/dir/0"; - final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); + final AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class); final byte[] value = bucket.getBytes("utf8"); final File tmpFile = temporaryFolder.newFile("gzTest.gz"); @@ -91,19 +97,27 @@ public class S3DataSegmentPullerTest } final S3Object object0 = new S3Object(); - object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); - object0.setLastModifiedDate(new Date(0)); - object0.setDataInputStream(new FileInputStream(tmpFile)); + object0.getObjectMetadata().setLastModified(new Date(0)); + object0.setObjectContent(new FileInputStream(tmpFile)); + + final S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucket); + objectSummary.setKey(keyPrefix + "/renames-0.gz"); + objectSummary.setLastModified(new Date(0)); + + final ListObjectsV2Result listObjectsResult = new ListObjectsV2Result(); + listObjectsResult.setKeyCount(1); + listObjectsResult.getObjectSummaries().add(objectSummary); final File tmpDir = temporaryFolder.newFolder("gzTestDir"); - EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) - .andReturn(null) + EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(true) .once(); - EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) - .andReturn(object0) + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) + .andReturn(listObjectsResult) .once(); EasyMock.expect(s3Client.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) .andReturn(object0) @@ -126,11 +140,11 @@ public class S3DataSegmentPullerTest } @Test - public void testGZUncompressRetries() throws ServiceException, IOException, SegmentLoadingException + public void testGZUncompressRetries() throws IOException, SegmentLoadingException { final String bucket = "bucket"; final String keyPrefix = "prefix/dir/0"; - final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); + final AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class); final byte[] value = bucket.getBytes("utf8"); final File tmpFile = temporaryFolder.newFile("gzTest.gz"); @@ -143,25 +157,34 @@ public class S3DataSegmentPullerTest object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); - object0.setLastModifiedDate(new Date(0)); - object0.setDataInputStream(new FileInputStream(tmpFile)); + object0.getObjectMetadata().setLastModified(new Date(0)); + object0.setObjectContent(new FileInputStream(tmpFile)); + + final S3ObjectSummary objectSummary = new S3ObjectSummary(); + objectSummary.setBucketName(bucket); + objectSummary.setKey(keyPrefix + "/renames-0.gz"); + objectSummary.setLastModified(new Date(0)); + + final ListObjectsV2Result listObjectsResult = new ListObjectsV2Result(); + listObjectsResult.setKeyCount(1); + listObjectsResult.getObjectSummaries().add(objectSummary); File tmpDir = temporaryFolder.newFolder("gzTestDir"); - S3ServiceException exception = new S3ServiceException(); + AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentPullerTest"); exception.setErrorCode("NoSuchKey"); - exception.setResponseCode(404); - EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) - .andReturn(null) + exception.setStatusCode(404); + EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) + .andReturn(true) .once(); - EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) - .andReturn(object0) + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) + .andReturn(listObjectsResult) .once(); EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) .andThrow(exception) .once(); - EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey()))) - .andReturn(object0) + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) + .andReturn(listObjectsResult) .once(); EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey()))) .andReturn(object0) diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java index f26bd161039..c787ed7d900 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3DataSegmentPusherTest.java @@ -19,6 +19,14 @@ package io.druid.storage.s3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.AccessControlList; +import com.amazonaws.services.s3.model.CanonicalGrantee; +import com.amazonaws.services.s3.model.Grant; +import com.amazonaws.services.s3.model.Owner; +import com.amazonaws.services.s3.model.Permission; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -31,14 +39,13 @@ import org.apache.commons.io.IOUtils; import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.IAnswer; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; import java.io.File; +import java.io.FileInputStream; /** */ @@ -65,27 +72,38 @@ public class S3DataSegmentPusherTest @Test public void testPush() throws Exception { - RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); + AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class); - Capture capturedS3Object = Capture.newInstance(); + final AccessControlList acl = new AccessControlList(); + acl.setOwner(new Owner("ownerId", "owner")); + acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl)); + EasyMock.expect(s3Client.getBucketAcl(EasyMock.eq("bucket"))).andReturn(acl).once(); + + EasyMock.expect(s3Client.putObject(EasyMock.anyObject())) + .andReturn(new PutObjectResult()) + .once(); + + EasyMock.expect(s3Client.getBucketAcl(EasyMock.eq("bucket"))).andReturn(acl).once(); + + Capture capturedPutRequest = Capture.newInstance(); ValueContainer capturedS3SegmentJson = new ValueContainer<>(); - EasyMock.expect(s3Client.putObject(EasyMock.anyString(), EasyMock.capture(capturedS3Object))) + EasyMock.expect(s3Client.putObject(EasyMock.capture(capturedPutRequest))) .andAnswer( - new IAnswer() + new IAnswer() { @Override - public S3Object answer() throws Throwable + public PutObjectResult answer() throws Throwable { capturedS3SegmentJson.setValue( - IOUtils.toString(capturedS3Object.getValue().getDataInputStream(), "utf-8") + IOUtils.toString(new FileInputStream(capturedPutRequest.getValue().getFile()), "utf-8") ); - return null; + return new PutObjectResult(); } } ) - .atLeastOnce(); - EasyMock.replay(s3Client); + .once(); + EasyMock.replay(s3Client); S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig(); config.setBucket("bucket"); diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3TimestampVersionedDataFinderTest.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3TimestampVersionedDataFinderTest.java index c2bdc347334..ce19c9b10fc 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3TimestampVersionedDataFinderTest.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/S3TimestampVersionedDataFinderTest.java @@ -19,11 +19,12 @@ package io.druid.storage.s3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.S3ObjectSummary; import io.druid.java.util.common.StringUtils; import org.easymock.EasyMock; -import org.jets3t.service.S3ServiceException; -import org.jets3t.service.impl.rest.httpclient.RestS3Service; -import org.jets3t.service.model.S3Object; import org.junit.Assert; import org.junit.Test; @@ -35,25 +36,31 @@ public class S3TimestampVersionedDataFinderTest { @Test - public void testSimpleLatestVersion() throws S3ServiceException + public void testSimpleLatestVersion() { String bucket = "bucket"; String keyPrefix = "prefix/dir/0"; - RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); + AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class); - S3Object object0 = new S3Object(), object1 = new S3Object(); + S3ObjectSummary object0 = new S3ObjectSummary(), object1 = new S3ObjectSummary(); object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); - object0.setLastModifiedDate(new Date(0)); + object0.setLastModified(new Date(0)); object1.setBucketName(bucket); object1.setKey(keyPrefix + "/renames-1.gz"); - object1.setLastModifiedDate(new Date(1)); + object1.setLastModified(new Date(1)); - EasyMock.expect(s3Client.listObjects(EasyMock.eq(bucket), EasyMock.anyString(), EasyMock.isNull())).andReturn( - new S3Object[]{object0, object1} - ).once(); + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.getObjectSummaries().add(object0); + result.getObjectSummaries().add(object1); + result.setKeyCount(2); + result.setTruncated(false); + + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) + .andReturn(result) + .once(); S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client); Pattern pattern = Pattern.compile("renames-[0-9]*\\.gz"); @@ -71,25 +78,19 @@ public class S3TimestampVersionedDataFinderTest } @Test - public void testMissing() throws S3ServiceException + public void testMissing() { String bucket = "bucket"; String keyPrefix = "prefix/dir/0"; - RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); + AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class); - S3Object object0 = new S3Object(), object1 = new S3Object(); + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.setKeyCount(0); + result.setTruncated(false); - object0.setBucketName(bucket); - object0.setKey(keyPrefix + "/renames-0.gz"); - object0.setLastModifiedDate(new Date(0)); - - object1.setBucketName(bucket); - object1.setKey(keyPrefix + "/renames-1.gz"); - object1.setLastModifiedDate(new Date(1)); - - EasyMock.expect(s3Client.listObjects(EasyMock.eq(bucket), EasyMock.anyString(), EasyMock.isNull())).andReturn( - null - ).once(); + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) + .andReturn(result) + .once(); S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client); Pattern pattern = Pattern.compile("renames-[0-9]*\\.gz"); @@ -105,21 +106,26 @@ public class S3TimestampVersionedDataFinderTest } @Test - public void testFindSelf() throws S3ServiceException + public void testFindSelf() { String bucket = "bucket"; String keyPrefix = "prefix/dir/0"; - RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); + AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class); - S3Object object0 = new S3Object(); + S3ObjectSummary object0 = new S3ObjectSummary(); object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); - object0.setLastModifiedDate(new Date(0)); + object0.setLastModified(new Date(0)); - EasyMock.expect(s3Client.listObjects(EasyMock.eq(bucket), EasyMock.anyString(), EasyMock.isNull())).andReturn( - new S3Object[]{object0} - ).once(); + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.getObjectSummaries().add(object0); + result.setKeyCount(1); + result.setTruncated(false); + + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) + .andReturn(result) + .once(); S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client); Pattern pattern = Pattern.compile("renames-[0-9]*\\.gz"); @@ -137,21 +143,26 @@ public class S3TimestampVersionedDataFinderTest } @Test - public void testFindExact() throws S3ServiceException + public void testFindExact() { String bucket = "bucket"; String keyPrefix = "prefix/dir/0"; - RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class); + AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class); - S3Object object0 = new S3Object(); + S3ObjectSummary object0 = new S3ObjectSummary(); object0.setBucketName(bucket); object0.setKey(keyPrefix + "/renames-0.gz"); - object0.setLastModifiedDate(new Date(0)); + object0.setLastModified(new Date(0)); - EasyMock.expect(s3Client.listObjects(EasyMock.eq(bucket), EasyMock.anyString(), EasyMock.isNull())).andReturn( - new S3Object[]{object0} - ).once(); + final ListObjectsV2Result result = new ListObjectsV2Result(); + result.getObjectSummaries().add(object0); + result.setKeyCount(1); + result.setTruncated(false); + + EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class))) + .andReturn(result) + .once(); S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client); diff --git a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/TestAWSCredentialsProvider.java b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/TestAWSCredentialsProvider.java index a3bf27a40b4..a7716e2a5ed 100644 --- a/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/TestAWSCredentialsProvider.java +++ b/extensions-core/s3-extensions/src/test/java/io/druid/storage/s3/TestAWSCredentialsProvider.java @@ -24,6 +24,8 @@ import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.AWSSessionCredentials; import com.google.common.io.Files; import io.druid.common.aws.AWSCredentialsConfig; +import io.druid.common.aws.AWSEndpointConfig; +import io.druid.common.aws.AWSProxyConfig; import io.druid.guice.AWSModule; import io.druid.metadata.DefaultPasswordProvider; import org.easymock.EasyMock; @@ -58,7 +60,7 @@ public class TestAWSCredentialsProvider assertEquals(credentials.getAWSSecretKey(), "secretKeySample"); // try to create - s3Module.getRestS3Service(provider); + s3Module.getAmazonS3Client(provider, new AWSProxyConfig(), new AWSEndpointConfig()); } @Rule @@ -86,6 +88,6 @@ public class TestAWSCredentialsProvider assertEquals(sessionCredentials.getSessionToken(), "sessionTokenSample"); // try to create - s3Module.getRestS3Service(provider); + s3Module.getAmazonS3Client(provider, new AWSProxyConfig(), new AWSEndpointConfig()); } } diff --git a/indexing-hadoop/pom.xml b/indexing-hadoop/pom.xml index e2bf002c3d5..3a9c7740a03 100644 --- a/indexing-hadoop/pom.xml +++ b/indexing-hadoop/pom.xml @@ -56,23 +56,6 @@ com.google.guava guava - - - net.java.dev.jets3t - jets3t - test - - - - org.apache.httpcomponents - httpclient - test - - - org.apache.httpcomponents - httpcore - test - org.apache.hadoop hadoop-client @@ -100,6 +83,21 @@ + + com.amazonaws + aws-java-sdk-bundle + test + + + org.apache.httpcomponents + httpclient + test + + + org.apache.httpcomponents + httpcore + test + junit junit diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java index 9a6c5d9ecc9..44e75805d77 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/DetermineHashedPartitionsJob.java @@ -395,7 +395,7 @@ public class DetermineHashedPartitionsJob implements Jobby public int getPartition(LongWritable interval, BytesWritable text, int numPartitions) { - if (config.get("mapred.job.tracker").equals("local") || determineIntervals) { + if ("local".equals(config.get("mapred.job.tracker")) || determineIntervals) { return 0; } else { return reducerLookup.get(interval); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java index 6a9370324d0..9152c873210 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/config/TaskConfig.java @@ -31,7 +31,7 @@ import java.util.List; public class TaskConfig { public static final List DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of( - "org.apache.hadoop:hadoop-client:2.7.3" + "org.apache.hadoop:hadoop-client:2.8.3" ); private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M"); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java index 199aacdd8bb..8963559e312 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopTask.java @@ -35,7 +35,6 @@ import javax.annotation.Nullable; import java.io.File; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; -import java.net.MalformedURLException; import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; @@ -127,7 +126,6 @@ public abstract class HadoopTask extends AbstractTask * * @param toolbox The toolbox to pull the default coordinates from if not present in the task * @return An isolated URLClassLoader not tied by parent chain to the ApplicationClassLoader - * @throws MalformedURLException from Initialization.getClassLoaderForExtension */ protected ClassLoader buildClassLoader(final TaskToolbox toolbox) { diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerSerdeTest.java index 5d891d62e64..18a46365edb 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/autoscaling/EC2AutoScalerSerdeTest.java @@ -74,12 +74,12 @@ public class EC2AutoScalerSerdeTest } ); - final EC2AutoScaler autoScaler = objectMapper.readValue(json, EC2AutoScaler.class); + final EC2AutoScaler autoScaler = (EC2AutoScaler) objectMapper.readValue(json, AutoScaler.class); verifyAutoScaler(autoScaler); - final EC2AutoScaler roundTripAutoScaler = objectMapper.readValue( + final EC2AutoScaler roundTripAutoScaler = (EC2AutoScaler) objectMapper.readValue( objectMapper.writeValueAsBytes(autoScaler), - EC2AutoScaler.class + AutoScaler.class ); verifyAutoScaler(roundTripAutoScaler); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java index 1eb7028a5da..0569d07f2b1 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/setup/JavaScriptWorkerSelectStrategyTest.java @@ -86,7 +86,7 @@ public class JavaScriptWorkerSelectStrategyTest STRATEGY, mapper.readValue( mapper.writeValueAsString(STRATEGY), - JavaScriptWorkerSelectStrategy.class + WorkerSelectStrategy.class ) ); } @@ -108,7 +108,7 @@ public class JavaScriptWorkerSelectStrategyTest expectedException.expectCause(CoreMatchers.instanceOf(IllegalStateException.class)); expectedException.expectMessage("JavaScript is disabled"); - mapper.readValue(strategyString, JavaScriptWorkerSelectStrategy.class); + mapper.readValue(strategyString, WorkerSelectStrategy.class); } @Test diff --git a/pom.xml b/pom.xml index 095d58e2e2b..870fe78d046 100644 --- a/pom.xml +++ b/pom.xml @@ -68,8 +68,8 @@ 4.1.0 9.3.19.v20170502 1.19.3 - - 2.4.6 + + 2.6.7 2.5 3.10.6.Final @@ -78,12 +78,10 @@ 4.0.52.Final 1.7.12 - 2.7.3 + 2.8.3 2.0.0 1.6.6 - - 1.10.77 + 1.11.199 2.5.5 3.4.11 @@ -189,49 +187,8 @@ com.amazonaws - aws-java-sdk-ec2 - ${aws.sdk.version} - - - javax.mail - mail - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-annotations - - - commons-codec - commons-codec - - - - - com.amazonaws - aws-java-sdk-s3 - ${aws.sdk.version} - - - javax.mail - mail - - - com.fasterxml.jackson.core - jackson-databind - - - com.fasterxml.jackson.core - jackson-annotations - - - commons-codec - commons-codec - - + aws-java-sdk-bundle + ${aws.sdk.bundle.version} com.ning @@ -612,49 +569,15 @@ aether-api 0.9.0.M2 - - net.java.dev.jets3t - jets3t - 0.9.4 - - - commons-codec - commons-codec - - - commons-logging - commons-logging - - - org.apache.httpcomponents - httpclient - - - org.apache.httpcomponents - httpcore - - - org.codehaus.jackson - jackson-core-asl - - - org.codehaus.jackson - jackson-mapper-asl - - - - - org.apache.httpcomponents httpclient - 4.5.1 + 4.5.3 org.apache.httpcomponents httpcore - 4.4.3 + 4.4.4 org.apache.hadoop diff --git a/processing/src/test/java/io/druid/query/groupby/orderby/DefaultLimitSpecTest.java b/processing/src/test/java/io/druid/query/groupby/orderby/DefaultLimitSpecTest.java index 64d354e99fd..3da0488d679 100644 --- a/processing/src/test/java/io/druid/query/groupby/orderby/DefaultLimitSpecTest.java +++ b/processing/src/test/java/io/druid/query/groupby/orderby/DefaultLimitSpecTest.java @@ -74,9 +74,9 @@ public class DefaultLimitSpecTest //defaults String json = "{\"type\": \"default\"}"; - DefaultLimitSpec spec = mapper.readValue( - mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)), - DefaultLimitSpec.class + DefaultLimitSpec spec = (DefaultLimitSpec) mapper.readValue( + mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)), + LimitSpec.class ); Assert.assertEquals( @@ -90,9 +90,9 @@ public class DefaultLimitSpecTest + " \"columns\":[{\"dimension\":\"d\",\"direction\":\"DESCENDING\", \"dimensionOrder\":\"numeric\"}],\n" + " \"limit\":10\n" + "}"; - spec = mapper.readValue( - mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)), - DefaultLimitSpec.class + spec = (DefaultLimitSpec) mapper.readValue( + mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)), + LimitSpec.class ); Assert.assertEquals( new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec("d", OrderByColumnSpec.Direction.DESCENDING, @@ -106,9 +106,9 @@ public class DefaultLimitSpecTest + " \"limit\":10\n" + "}"; - spec = mapper.readValue( - mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)), - DefaultLimitSpec.class + spec = (DefaultLimitSpec) mapper.readValue( + mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)), + LimitSpec.class ); Assert.assertEquals( @@ -122,9 +122,9 @@ public class DefaultLimitSpecTest + " \"columns\":[{\"dimension\":\"d\"}],\n" + " \"limit\":10\n" + "}"; - spec = mapper.readValue( - mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)), - DefaultLimitSpec.class + spec = (DefaultLimitSpec) mapper.readValue( + mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)), + LimitSpec.class ); Assert.assertEquals( new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec("d", OrderByColumnSpec.Direction.ASCENDING, @@ -137,9 +137,9 @@ public class DefaultLimitSpecTest + " \"columns\":[\"d\"],\n" + " \"limit\":10\n" + "}"; - spec = mapper.readValue( - mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)), - DefaultLimitSpec.class + spec = (DefaultLimitSpec) mapper.readValue( + mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)), + LimitSpec.class ); Assert.assertEquals( new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec("d", OrderByColumnSpec.Direction.ASCENDING, diff --git a/processing/src/test/java/io/druid/query/topn/AlphaNumericTopNMetricSpecTest.java b/processing/src/test/java/io/druid/query/topn/AlphaNumericTopNMetricSpecTest.java index 877d431dd8b..36562c3388a 100644 --- a/processing/src/test/java/io/druid/query/topn/AlphaNumericTopNMetricSpecTest.java +++ b/processing/src/test/java/io/druid/query/topn/AlphaNumericTopNMetricSpecTest.java @@ -104,8 +104,8 @@ public class AlphaNumericTopNMetricSpecTest + " \"previousStop\": \"test\"\n" + "}"; ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), AlphaNumericTopNMetricSpec.class); - TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), AlphaNumericTopNMetricSpec.class); + TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class); + TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class); Assert.assertEquals(expectedMetricSpec, actualMetricSpec); Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1); } diff --git a/processing/src/test/java/io/druid/query/topn/DimensionTopNMetricSpecTest.java b/processing/src/test/java/io/druid/query/topn/DimensionTopNMetricSpecTest.java index 292618ac821..7091ef5ee93 100644 --- a/processing/src/test/java/io/druid/query/topn/DimensionTopNMetricSpecTest.java +++ b/processing/src/test/java/io/druid/query/topn/DimensionTopNMetricSpecTest.java @@ -44,8 +44,8 @@ public class DimensionTopNMetricSpecTest + " \"previousStop\": \"test\"\n" + "}"; ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), DimensionTopNMetricSpec.class); - TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), DimensionTopNMetricSpec.class); + TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class); + TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class); Assert.assertEquals(expectedMetricSpec, actualMetricSpec); Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1); } @@ -65,8 +65,8 @@ public class DimensionTopNMetricSpecTest + " \"previousStop\": \"test\"\n" + "}"; ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), DimensionTopNMetricSpec.class); - TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), DimensionTopNMetricSpec.class); + TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class); + TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class); Assert.assertEquals(expectedMetricSpec, actualMetricSpec); Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1); } @@ -86,8 +86,8 @@ public class DimensionTopNMetricSpecTest + " \"previousStop\": \"test\"\n" + "}"; ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), DimensionTopNMetricSpec.class); - TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), DimensionTopNMetricSpec.class); + TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class); + TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class); Assert.assertEquals(expectedMetricSpec, actualMetricSpec); Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1); } @@ -107,8 +107,8 @@ public class DimensionTopNMetricSpecTest + " \"previousStop\": \"test\"\n" + "}"; ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); - TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), DimensionTopNMetricSpec.class); - TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), DimensionTopNMetricSpec.class); + TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class); + TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class); Assert.assertEquals(expectedMetricSpec, actualMetricSpec); Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1); } diff --git a/server/pom.xml b/server/pom.xml index 0b9a07a3b9e..4173650b1b0 100644 --- a/server/pom.xml +++ b/server/pom.xml @@ -70,6 +70,10 @@ org.apache.zookeeper zookeeper + + org.apache.httpcomponents + httpclient + org.apache.curator curator-framework diff --git a/server/src/main/java/io/druid/guice/AWSModule.java b/server/src/main/java/io/druid/guice/AWSModule.java index 15925816a28..4127f3cb200 100644 --- a/server/src/main/java/io/druid/guice/AWSModule.java +++ b/server/src/main/java/io/druid/guice/AWSModule.java @@ -27,6 +27,8 @@ import com.google.inject.Module; import com.google.inject.Provides; import io.druid.common.aws.AWSCredentialsConfig; import io.druid.common.aws.AWSCredentialsUtils; +import io.druid.common.aws.AWSEndpointConfig; +import io.druid.common.aws.AWSProxyConfig; /** */ @@ -36,6 +38,8 @@ public class AWSModule implements Module public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class); + JsonConfigProvider.bind(binder, "druid.s3.proxy", AWSProxyConfig.class); + JsonConfigProvider.bind(binder, "druid.s3.endpoint", AWSEndpointConfig.class); } @Provides diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java index cf3ecea6fba..aaab6f9dae5 100644 --- a/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java +++ b/server/src/main/java/io/druid/segment/realtime/firehose/HttpFirehoseFactory.java @@ -23,11 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.net.HttpHeaders; import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory; import io.druid.java.util.common.CompressionUtils; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.logger.Logger; -import org.apache.http.HttpHeaders; import java.io.IOException; import java.io.InputStream; diff --git a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java index 6f67ce06d76..d10880fe83e 100644 --- a/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java +++ b/server/src/main/java/io/druid/server/AsyncQueryForwardingServlet.java @@ -58,6 +58,7 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; @@ -465,13 +466,11 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu TimeUnit.NANOSECONDS.toMillis(requestTimeNs), "success", success - && result.getResponse().getStatus() == javax.ws.rs.core.Response.Status.OK.getStatusCode() + && result.getResponse().getStatus() == Status.OK.getStatusCode() ) ) ) ); - - } catch (Exception e) { log.error(e, "Unable to log query [%s]!", query); diff --git a/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java b/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java index 94a776e457f..1b1508b5558 100644 --- a/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java +++ b/server/src/main/java/io/druid/server/initialization/jetty/JettyServerModule.java @@ -60,7 +60,6 @@ import io.druid.server.initialization.TLSServerConfig; import io.druid.server.metrics.DataSourceTaskIdHolder; import io.druid.server.metrics.MetricsModule; import io.druid.server.metrics.MonitorsConfig; -import org.apache.http.HttpVersion; import org.eclipse.jetty.server.ConnectionFactory; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; @@ -90,6 +89,7 @@ public class JettyServerModule extends JerseyServletModule private static final Logger log = new Logger(JettyServerModule.class); private static final AtomicInteger activeConnections = new AtomicInteger(); + private static final String HTTP_1_1_STRING = "HTTP/1.1"; @Override protected void configureServlets() @@ -268,7 +268,7 @@ public class JettyServerModule extends JerseyServletModule httpsConfiguration.setRequestHeaderSize(config.getMaxRequestHeaderSize()); final ServerConnector connector = new ServerConnector( server, - new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.toString()), + new SslConnectionFactory(sslContextFactory, HTTP_1_1_STRING), new HttpConnectionFactory(httpsConfiguration) ); connector.setPort(node.getTlsPort()); diff --git a/server/src/test/java/io/druid/query/dimension/LookupDimensionSpecTest.java b/server/src/test/java/io/druid/query/dimension/LookupDimensionSpecTest.java index 7e1ef423f95..02c44c29725 100644 --- a/server/src/test/java/io/druid/query/dimension/LookupDimensionSpecTest.java +++ b/server/src/test/java/io/druid/query/dimension/LookupDimensionSpecTest.java @@ -21,6 +21,7 @@ package io.druid.query.dimension; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import io.druid.jackson.DefaultObjectMapper; @@ -70,12 +71,13 @@ public class LookupDimensionSpecTest public void testSerDesr(DimensionSpec lookupDimSpec) throws IOException { ObjectMapper mapper = new DefaultObjectMapper(); + mapper.registerSubtypes(new NamedType(LookupDimensionSpec.class, "lookup")); InjectableValues injectableValues = new InjectableValues.Std().addValue( LookupReferencesManager.class, LOOKUP_REF_MANAGER ); String serLookup = mapper.writeValueAsString(lookupDimSpec); - Assert.assertEquals(lookupDimSpec, mapper.reader(LookupDimensionSpec.class).with(injectableValues).readValue(serLookup)); + Assert.assertEquals(lookupDimSpec, mapper.reader(DimensionSpec.class).with(injectableValues).readValue(serLookup)); } private Object[] parametersForTestSerDesr()