Use the official aws-sdk instead of jet3t (#5382)

* Use the official aws-sdk instead of jet3t

* fix compile and serde tests

* address comments and fix test

* add http version string

* remove redundant dependencies, fix potential NPE, and fix test

* resolve TODOs

* fix build

* downgrade jackson version to 2.6.7

* fix test

* resolve the last TODO

* support proxy and endpoint configurations

* fix build

* remove debugging log

* downgrade hadoop version to 2.8.3

* fix tests

* remove unused log

* fix it test

* revert KerberosAuthenticator change

* change hadoop-aws scope to provided in hdfs-storage

* address comments

* address comments
This commit is contained in:
Jihoon Son 2018-03-21 15:36:54 -07:00 committed by GitHub
parent 885b975c95
commit 1ad898bde2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
51 changed files with 1131 additions and 910 deletions

View File

@ -47,6 +47,7 @@ matrix:
- sudo: false
env:
- NAME="other modules test"
- AWS_REGION=us-east-1 # set a aws region for unit tests
install: echo "MAVEN_OPTS='-Xmx3000m'" > ~/.mavenrc && mvn install -q -ff -DskipTests -B
before_script:
- unset _JAVA_OPTIONS

View File

@ -130,8 +130,8 @@ public class JsonConfigProvider<T> implements Provider<Supplier<T>>
Key<Supplier<T>> supplierKey
)
{
binder.bind(supplierKey).toProvider((Provider) of(propertyBase, clazz)).in(LazySingleton.class);
binder.bind(instanceKey).toProvider(new SupplierProvider<T>(supplierKey));
binder.bind(supplierKey).toProvider(of(propertyBase, clazz)).in(LazySingleton.class);
binder.bind(instanceKey).toProvider(new SupplierProvider<>(supplierKey));
}
@SuppressWarnings("unchecked")

View File

@ -101,9 +101,9 @@ public class InputRowParserSerdeTest
null
)
);
final MapInputRowParser parser2 = jsonMapper.readValue(
final MapInputRowParser parser2 = (MapInputRowParser) jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
MapInputRowParser.class
InputRowParser.class
);
final InputRow parsed = parser2.parseBatch(
ImmutableMap.<String, Object>of(
@ -134,9 +134,9 @@ public class InputRowParserSerdeTest
null
)
);
final MapInputRowParser parser2 = jsonMapper.readValue(
final MapInputRowParser parser2 = (MapInputRowParser) jsonMapper.readValue(
jsonMapper.writeValueAsBytes(parser),
MapInputRowParser.class
InputRowParser.class
);
final InputRow parsed = parser2.parseBatch(
ImmutableMap.<String, Object>of(

View File

@ -91,9 +91,9 @@ public class JSONParseSpecTest
feature
);
final JSONParseSpec serde = jsonMapper.readValue(
final JSONParseSpec serde = (JSONParseSpec) jsonMapper.readValue(
jsonMapper.writeValueAsString(spec),
JSONParseSpec.class
ParseSpec.class
);
Assert.assertEquals("timestamp", serde.getTimestampSpec().getTimestampColumn());
Assert.assertEquals("iso", serde.getTimestampSpec().getTimestampFormat());

View File

@ -58,9 +58,9 @@ public class JavaScriptParseSpecTest
"abc",
JavaScriptConfig.getEnabledInstance()
);
final JavaScriptParseSpec serde = jsonMapper.readValue(
final JavaScriptParseSpec serde = (JavaScriptParseSpec) jsonMapper.readValue(
jsonMapper.writeValueAsString(spec),
JavaScriptParseSpec.class
ParseSpec.class
);
Assert.assertEquals("abc", serde.getTimestampSpec().getTimestampColumn());
Assert.assertEquals("iso", serde.getTimestampSpec().getTimestampFormat());

View File

@ -43,9 +43,9 @@ public class RegexParseSpecTest
Collections.singletonList("abc"),
"abc"
);
final RegexParseSpec serde = jsonMapper.readValue(
final RegexParseSpec serde = (RegexParseSpec) jsonMapper.readValue(
jsonMapper.writeValueAsString(spec),
RegexParseSpec.class
ParseSpec.class
);
Assert.assertEquals("abc", serde.getTimestampSpec().getTimestampColumn());
Assert.assertEquals("iso", serde.getTimestampSpec().getTimestampFormat());

View File

@ -37,7 +37,7 @@
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-ec2</artifactId>
<artifactId>aws-java-sdk-bundle</artifactId>
</dependency>
<!-- Tests -->

View File

@ -0,0 +1,52 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.common.aws;
import com.fasterxml.jackson.annotation.JsonProperty;
public class AWSEndpointConfig
{
@JsonProperty
private String url;
@JsonProperty
private String serviceName;
@JsonProperty
private String signingRegion;
@JsonProperty
public String getUrl()
{
return url;
}
@JsonProperty
public String getServiceName()
{
return serviceName;
}
@JsonProperty
public String getSigningRegion()
{
return signingRegion;
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.common.aws;
import com.fasterxml.jackson.annotation.JsonProperty;
public class AWSProxyConfig
{
@JsonProperty
private String host;
@JsonProperty
private int port = -1; // AWS's default proxy port is -1
@JsonProperty
private String username;
@JsonProperty
private String password;
@JsonProperty
public String getHost()
{
return host;
}
@JsonProperty
public int getPort()
{
return port;
}
@JsonProperty
public String getUsername()
{
return username;
}
@JsonProperty
public String getPassword()
{
return password;
}
}

View File

@ -157,21 +157,7 @@
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
</dependency>
<!--
Druid doesn't ACTUALLY depend on jets3t in its core, but quite a few of the extensions do. This leads to a nasty ClassLoader problem
There's a bug in
https://github.com/apache/httpclient/blob/4.5.2/httpclient/src/main/java-deprecated/org/apache/http/impl/client/AbstractHttpClient.java#L332
Where httpclient does not care what your context classloader is when looking for the connection manager factories. See https://issues.apache.org/jira/browse/HTTPCLIENT-1727
A few extensions depend on jets3t, so we include it explicitly here to make sure it can load up its
org.jets3t.service.utils.RestUtils$ConnManagerFactory
properly
Future releases which include HTTPCLIENT-1727 should probably set the context loader whenever jets3t calls are used
-->
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>

View File

@ -12,12 +12,18 @@ S3-compatible deep storage is basically either S3 or something like Google Stora
### Configuration
|Property|Possible Values|Description|Default|
|--------|---------------|-----------|-------|
|`druid.s3.accessKey`||S3 access key.|Must be set.|
|`druid.s3.secretKey`||S3 secret key.|Must be set.|
|`druid.storage.bucket`||Bucket to store in.|Must be set.|
|`druid.storage.baseKey`||Base key prefix to use, i.e. what directory.|Must be set.|
|Property|Description|Default|
|--------|-----------|-------|
|`druid.s3.accessKey`|S3 access key.|Must be set.|
|`druid.s3.secretKey`|S3 secret key.|Must be set.|
|`druid.storage.bucket`|Bucket to store in.|Must be set.|
|`druid.storage.baseKey`|Base key prefix to use, i.e. what directory.|Must be set.|
|`druid.s3.endpoint.url`|Service endpoint either with or without the protocol.|None|
|`druid.s3.endpoint.signingRegion`|Region to use for SigV4 signing of requests (e.g. us-west-1).|None|
|`druid.s3.proxy.host`|Proxy host to connect through.|None|
|`druid.s3.proxy.port`|Port on the proxy host to connect through.|None|
|`druid.s3.proxy.username`|User name to use when connecting through a proxy.|None|
|`druid.s3.proxy.password`|Password to use when connecting through a proxy.|None|
## StaticS3Firehose

View File

@ -334,13 +334,14 @@ public class KerberosAuthenticator implements Authenticator
};
if (newToken && !token.isExpired() && token != AuthenticationToken.ANONYMOUS) {
String signedToken = mySigner.sign(token.toString());
tokenToAuthCookie(httpResponse,
signedToken,
getCookieDomain(),
getCookiePath(),
token.getExpires(),
!token.isExpired() && token.getExpires() > 0,
isHttps
tokenToAuthCookie(
httpResponse,
signedToken,
getCookieDomain(),
getCookiePath(),
token.getExpires(),
!token.isExpired() && token.getExpires() > 0,
isHttps
);
}
doFilter(filterChain, httpRequest, httpResponse);
@ -361,8 +362,14 @@ public class KerberosAuthenticator implements Authenticator
}
if (unauthorizedResponse) {
if (!httpResponse.isCommitted()) {
tokenToAuthCookie(httpResponse, "", getCookieDomain(),
getCookiePath(), 0, false, isHttps
tokenToAuthCookie(
httpResponse,
"",
getCookieDomain(),
getCookiePath(),
0,
false,
isHttps
);
// If response code is 401. Then WWW-Authenticate Header should be
// present.. reset to 403 if not found..

View File

@ -145,16 +145,7 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop.compile.version}</version>
<exclusions>
<exclusion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>

View File

@ -22,11 +22,11 @@ package io.druid.storage.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentFinder;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.java.util.common.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;

View File

@ -18,80 +18,75 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<modelVersion>4.0.0</modelVersion>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-s3-extensions</artifactId>
<name>druid-s3-extensions</name>
<description>druid-s3-extensions</description>
<groupId>io.druid.extensions</groupId>
<artifactId>druid-s3-extensions</artifactId>
<name>druid-s3-extensions</name>
<description>druid-s3-extensions</description>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.13.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<parent>
<groupId>io.druid</groupId>
<artifactId>druid</artifactId>
<version>0.13.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-aws-common</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>java-util</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-guice</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<dependencies>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-api</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-aws-common</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>java-util</artifactId>
<version>${project.parent.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-guice</artifactId>
<version>${jackson.version}</version>
<scope>provided</scope>
</dependency>
<!-- Tests -->
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<!-- Tests -->
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-server</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.druid</groupId>
<artifactId>druid-processing</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easymock</groupId>
<artifactId>easymock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -19,28 +19,32 @@
package io.druid.firehose.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Lists;
import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.storage.s3.S3Utils;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
@ -48,18 +52,18 @@ import java.util.stream.Collectors;
/**
* Builds firehoses that read from a predefined list of S3 objects and then dry up.
*/
public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactory<S3Object>
public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactory<S3ObjectSummary>
{
private static final Logger log = new Logger(StaticS3FirehoseFactory.class);
private static final long MAX_LISTING_LENGTH = 1024;
private static final int MAX_LISTING_LENGTH = 1024;
private final RestS3Service s3Client;
private final AmazonS3 s3Client;
private final List<URI> uris;
private final List<URI> prefixes;
@JsonCreator
public StaticS3FirehoseFactory(
@JacksonInject("s3Client") RestS3Service s3Client,
@JacksonInject("s3Client") AmazonS3 s3Client,
@JsonProperty("uris") List<URI> uris,
@JsonProperty("prefixes") List<URI> prefixes,
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
@ -70,7 +74,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
)
{
super(maxCacheCapacityBytes, maxFetchCapacityBytes, prefetchTriggerBytes, fetchTimeout, maxFetchRetry);
this.s3Client = Preconditions.checkNotNull(s3Client, "null s3Client");
this.s3Client = Preconditions.checkNotNull(s3Client, "s3Client");
this.uris = uris == null ? new ArrayList<>() : uris;
this.prefixes = prefixes == null ? new ArrayList<>() : prefixes;
@ -104,7 +108,7 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
}
@Override
protected Collection<S3Object> initObjects() throws IOException
protected Collection<S3ObjectSummary> initObjects() throws IOException
{
// Here, the returned s3 objects contain minimal information without data.
// Getting data is deferred until openObjectStream() is called for each object.
@ -113,53 +117,49 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
.map(
uri -> {
final String s3Bucket = uri.getAuthority();
final S3Object s3Object = new S3Object(extractS3Key(uri));
s3Object.setBucketName(s3Bucket);
return s3Object;
final String key = S3Utils.extractS3Key(uri);
return S3Utils.getSingleObjectSummary(s3Client, s3Bucket, key);
}
)
.collect(Collectors.toList());
} else {
final List<S3Object> objects = new ArrayList<>();
final List<S3ObjectSummary> objects = new ArrayList<>();
for (URI uri : prefixes) {
final String bucket = uri.getAuthority();
final String prefix = extractS3Key(uri);
final String prefix = S3Utils.extractS3Key(uri);
try {
String lastKey = null;
StorageObjectsChunk objectsChunk;
do {
objectsChunk = s3Client.listObjectsChunked(
bucket,
prefix,
null,
MAX_LISTING_LENGTH,
lastKey
);
Arrays.stream(objectsChunk.getObjects())
.filter(storageObject -> !storageObject.isDirectoryPlaceholder())
.forEach(storageObject -> objects.add((S3Object) storageObject));
lastKey = objectsChunk.getPriorLastKey();
} while (!objectsChunk.isListingComplete());
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
bucket,
prefix,
MAX_LISTING_LENGTH
);
objects.addAll(Lists.newArrayList(objectSummaryIterator));
}
catch (ServiceException outerException) {
catch (AmazonS3Exception outerException) {
log.error(outerException, "Exception while listing on %s", uri);
if (outerException.getResponseCode() == 403) {
if (outerException.getStatusCode() == 403) {
// The "Access Denied" means users might not have a proper permission for listing on the given uri.
// Usually this is not a problem, but the uris might be the full paths to input objects instead of prefixes.
// In this case, users should be able to get objects if they have a proper permission for GetObject.
log.warn("Access denied for %s. Try to get the object from the uri without listing", uri);
try {
final S3Object s3Object = s3Client.getObject(bucket, prefix);
if (!s3Object.isDirectoryPlaceholder()) {
objects.add(s3Object);
final ObjectMetadata objectMetadata = s3Client.getObjectMetadata(bucket, prefix);
if (!S3Utils.isDirectoryPlaceholder(prefix, objectMetadata)) {
objects.add(S3Utils.getSingleObjectSummary(s3Client, bucket, prefix));
} else {
throw new IOException(uri + " is a directory placeholder, "
+ "but failed to get the object list under the directory due to permission");
throw new IOE(
"[%s] is a directory placeholder, "
+ "but failed to get the object list under the directory due to permission",
uri
);
}
}
catch (S3ServiceException innerException) {
catch (AmazonS3Exception innerException) {
throw new IOException(innerException);
}
} else {
@ -171,49 +171,46 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
}
}
private static String extractS3Key(URI uri)
{
return uri.getPath().startsWith("/")
? uri.getPath().substring(1)
: uri.getPath();
}
@Override
protected InputStream openObjectStream(S3Object object) throws IOException
protected InputStream openObjectStream(S3ObjectSummary object) throws IOException
{
try {
// Get data of the given object and open an input stream
return s3Client.getObject(object.getBucketName(), object.getKey()).getDataInputStream();
final S3Object s3Object = s3Client.getObject(object.getBucketName(), object.getKey());
if (s3Object == null) {
throw new ISE("Failed to get an s3 object for bucket[%s] and key[%s]", object.getBucketName(), object.getKey());
}
return s3Object.getObjectContent();
}
catch (ServiceException e) {
catch (AmazonS3Exception e) {
throw new IOException(e);
}
}
@Override
protected InputStream openObjectStream(S3Object object, long start) throws IOException
protected InputStream openObjectStream(S3ObjectSummary object, long start) throws IOException
{
final GetObjectRequest request = new GetObjectRequest(object.getBucketName(), object.getKey());
request.setRange(start);
try {
final S3Object result = s3Client.getObject(
object.getBucketName(),
object.getKey(),
null,
null,
null,
null,
start,
null
);
return result.getDataInputStream();
final S3Object s3Object = s3Client.getObject(request);
if (s3Object == null) {
throw new ISE(
"Failed to get an s3 object for bucket[%s], key[%s], and start[%d]",
object.getBucketName(),
object.getKey(),
start
);
}
return s3Object.getObjectContent();
}
catch (ServiceException e) {
catch (AmazonS3Exception e) {
throw new IOException(e);
}
}
@Override
protected InputStream wrapObjectStream(S3Object object, InputStream stream) throws IOException
protected InputStream wrapObjectStream(S3ObjectSummary object, InputStream stream) throws IOException
{
return object.getKey().endsWith(".gz") ? CompressionUtils.gzipInputStream(stream) : stream;
}

View File

@ -1,70 +0,0 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.storage.s3;
import com.amazonaws.auth.AWSCredentialsProvider;
import org.jets3t.service.security.AWSSessionCredentials;
public class AWSSessionCredentialsAdapter extends AWSSessionCredentials
{
private final AWSCredentialsProvider provider;
public AWSSessionCredentialsAdapter(AWSCredentialsProvider provider)
{
super(null, null, null);
if (provider.getCredentials() instanceof com.amazonaws.auth.AWSSessionCredentials) {
this.provider = provider;
} else {
throw new IllegalArgumentException("provider does not contain session credentials");
}
}
@Override
protected String getTypeName()
{
return "AWSSessionCredentialsAdapter";
}
@Override
public String getVersionPrefix()
{
return "AWSSessionCredentialsAdapter, version: ";
}
@Override
public String getAccessKey()
{
return provider.getCredentials().getAWSAccessKeyId();
}
@Override
public String getSecretKey()
{
return provider.getCredentials().getAWSSecretKey();
}
@Override
public String getSessionToken()
{
com.amazonaws.auth.AWSSessionCredentials sessionCredentials =
(com.amazonaws.auth.AWSSessionCredentials) provider.getCredentials();
return sessionCredentials.getSessionToken();
}
}

View File

@ -19,6 +19,7 @@
package io.druid.storage.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableMap;
@ -28,7 +29,6 @@ import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.LoadSpec;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSegmentArchiver
@ -40,7 +40,7 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg
@Inject
public S3DataSegmentArchiver(
@Json ObjectMapper mapper,
RestS3Service s3Client,
AmazonS3 s3Client,
S3DataSegmentArchiverConfig archiveConfig,
S3DataSegmentPusherConfig restoreConfig
)

View File

@ -19,22 +19,24 @@
package io.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentFinder;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@ -43,13 +45,13 @@ public class S3DataSegmentFinder implements DataSegmentFinder
{
private static final Logger log = new Logger(S3DataSegmentFinder.class);
private final RestS3Service s3Client;
private final AmazonS3 s3Client;
private final ObjectMapper jsonMapper;
private final S3DataSegmentPusherConfig config;
@Inject
public S3DataSegmentFinder(
RestS3Service s3Client,
AmazonS3 s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
@ -65,24 +67,24 @@ public class S3DataSegmentFinder implements DataSegmentFinder
final Set<DataSegment> segments = Sets.newHashSet();
try {
Iterator<StorageObject> objectsIterator = S3Utils.storageObjectsIterator(
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
config.getBucket(),
workingDirPath.length() == 0 ? config.getBaseKey() : workingDirPath,
config.getMaxListingLength());
config.getMaxListingLength()
);
while (objectsIterator.hasNext()) {
StorageObject storageObject = objectsIterator.next();
storageObject.closeDataInputStream();
while (objectSummaryIterator.hasNext()) {
final S3ObjectSummary objectSummary = objectSummaryIterator.next();
if (S3Utils.toFilename(storageObject.getKey()).equals("descriptor.json")) {
final String descriptorJson = storageObject.getKey();
if (S3Utils.toFilename(objectSummary.getKey()).equals("descriptor.json")) {
final String descriptorJson = objectSummary.getKey();
String indexZip = S3Utils.indexZipForSegmentPath(descriptorJson);
if (S3Utils.isObjectInBucket(s3Client, config.getBucket(), indexZip)) {
S3Object indexObject = s3Client.getObject(config.getBucket(), descriptorJson);
try (InputStream is = indexObject.getDataInputStream()) {
if (S3Utils.isObjectInBucketIgnoringPermission(s3Client, config.getBucket(), indexZip)) {
try (S3Object indexObject = s3Client.getObject(config.getBucket(), descriptorJson);
S3ObjectInputStream is = indexObject.getObjectContent()) {
final ObjectMetadata objectMetadata = indexObject.getObjectMetadata();
final DataSegment dataSegment = jsonMapper.readValue(is, DataSegment.class);
log.info("Found segment [%s] located at [%s]", dataSegment.getIdentifier(), indexZip);
@ -99,8 +101,10 @@ public class S3DataSegmentFinder implements DataSegmentFinder
descriptorJson,
indexObject
);
S3Object newDescJsonObject = new S3Object(descriptorJson, jsonMapper.writeValueAsString(dataSegment));
s3Client.putObject(config.getBucket(), newDescJsonObject);
final ByteArrayInputStream bais = new ByteArrayInputStream(
StringUtils.toUtf8(jsonMapper.writeValueAsString(dataSegment))
);
s3Client.putObject(config.getBucket(), descriptorJson, bais, objectMetadata);
}
}
segments.add(dataSegment);
@ -114,7 +118,7 @@ public class S3DataSegmentFinder implements DataSegmentFinder
}
}
}
catch (ServiceException e) {
catch (AmazonServiceException e) {
throw new SegmentLoadingException(e, "Problem interacting with S3");
}
catch (IOException e) {

View File

@ -19,14 +19,14 @@
package io.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.google.inject.Inject;
import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import java.util.Map;
@ -36,11 +36,11 @@ public class S3DataSegmentKiller implements DataSegmentKiller
{
private static final Logger log = new Logger(S3DataSegmentKiller.class);
private final RestS3Service s3Client;
private final AmazonS3 s3Client;
@Inject
public S3DataSegmentKiller(
RestS3Service s3Client
AmazonS3 s3Client
)
{
this.s3Client = s3Client;
@ -55,16 +55,16 @@ public class S3DataSegmentKiller implements DataSegmentKiller
String s3Path = MapUtils.getString(loadSpec, "key");
String s3DescriptorPath = S3Utils.descriptorPathForSegmentPath(s3Path);
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
if (s3Client.doesObjectExist(s3Bucket, s3Path)) {
log.info("Removing index file[s3://%s/%s] from s3!", s3Bucket, s3Path);
s3Client.deleteObject(s3Bucket, s3Path);
}
if (s3Client.isObjectInBucket(s3Bucket, s3DescriptorPath)) {
if (s3Client.doesObjectExist(s3Bucket, s3DescriptorPath)) {
log.info("Removing descriptor file[s3://%s/%s] from s3!", s3Bucket, s3DescriptorPath);
s3Client.deleteObject(s3Bucket, s3DescriptorPath);
}
}
catch (ServiceException e) {
catch (AmazonServiceException e) {
throw new SegmentLoadingException(e, "Couldn't kill segment[%s]: [%s]", segment.getIdentifier(), e);
}
}

View File

@ -19,6 +19,13 @@
package io.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
@ -34,10 +41,6 @@ import io.druid.segment.loading.DataSegmentMover;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.IOException;
import java.util.Map;
@ -46,12 +49,12 @@ public class S3DataSegmentMover implements DataSegmentMover
{
private static final Logger log = new Logger(S3DataSegmentMover.class);
private final RestS3Service s3Client;
private final AmazonS3 s3Client;
private final S3DataSegmentPusherConfig config;
@Inject
public S3DataSegmentMover(
RestS3Service s3Client,
AmazonS3 s3Client,
S3DataSegmentPusherConfig config
)
{
@ -103,7 +106,7 @@ public class S3DataSegmentMover implements DataSegmentMover
.build()
);
}
catch (ServiceException e) {
catch (AmazonServiceException e) {
throw new SegmentLoadingException(e, "Unable to move segment[%s]: [%s]", segment.getIdentifier(), e);
}
}
@ -113,7 +116,7 @@ public class S3DataSegmentMover implements DataSegmentMover
final String s3Path,
final String targetS3Bucket,
final String targetS3Path
) throws ServiceException, SegmentLoadingException
) throws SegmentLoadingException
{
try {
S3Utils.retryS3Operation(
@ -129,7 +132,7 @@ public class S3DataSegmentMover implements DataSegmentMover
selfCheckingMove(s3Bucket, targetS3Bucket, s3Path, targetS3Path, copyMsg);
return null;
}
catch (ServiceException | IOException | SegmentLoadingException e) {
catch (AmazonServiceException | IOException | SegmentLoadingException e) {
log.info(e, "Error while trying to move " + copyMsg);
throw e;
}
@ -137,7 +140,7 @@ public class S3DataSegmentMover implements DataSegmentMover
);
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e, ServiceException.class);
Throwables.propagateIfInstanceOf(e, AmazonServiceException.class);
Throwables.propagateIfInstanceOf(e, SegmentLoadingException.class);
throw Throwables.propagate(e);
}
@ -155,40 +158,41 @@ public class S3DataSegmentMover implements DataSegmentMover
String s3Path,
String targetS3Path,
String copyMsg
) throws ServiceException, IOException, SegmentLoadingException
) throws IOException, SegmentLoadingException
{
if (s3Bucket.equals(targetS3Bucket) && s3Path.equals(targetS3Path)) {
log.info("No need to move file[s3://%s/%s] onto itself", s3Bucket, s3Path);
return;
}
if (s3Client.isObjectInBucket(s3Bucket, s3Path)) {
final S3Object[] list = s3Client.listObjects(s3Bucket, s3Path, "");
if (list.length == 0) {
if (s3Client.doesObjectExist(s3Bucket, s3Path)) {
final ListObjectsV2Result listResult = s3Client.listObjectsV2(
new ListObjectsV2Request()
.withBucketName(s3Bucket)
.withPrefix(s3Path)
.withMaxKeys(1)
);
if (listResult.getKeyCount() == 0) {
// should never happen
throw new ISE("Unable to list object [s3://%s/%s]", s3Bucket, s3Path);
}
final S3Object s3Object = list[0];
if (s3Object.getStorageClass() != null &&
s3Object.getStorageClass().equals(S3Object.STORAGE_CLASS_GLACIER)) {
throw new ServiceException(StringUtils.format(
"Cannot move file[s3://%s/%s] of storage class glacier, skipping.",
s3Bucket,
s3Path
));
final S3ObjectSummary objectSummary = listResult.getObjectSummaries().get(0);
if (objectSummary.getStorageClass() != null &&
StorageClass.fromValue(StringUtils.toUpperCase(objectSummary.getStorageClass())).equals(StorageClass.Glacier)) {
throw new AmazonServiceException(
StringUtils.format(
"Cannot move file[s3://%s/%s] of storage class glacier, skipping.",
s3Bucket,
s3Path
)
);
} else {
log.info("Moving file %s", copyMsg);
final S3Object target = new S3Object(targetS3Path);
final CopyObjectRequest copyRequest = new CopyObjectRequest(s3Bucket, s3Path, targetS3Bucket, targetS3Path);
if (!config.getDisableAcl()) {
target.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
copyRequest.setAccessControlList(S3Utils.grantFullControlToBucketOwner(s3Client, targetS3Bucket));
}
s3Client.copyObject(
s3Bucket,
s3Path,
targetS3Bucket,
target,
false
);
if (!s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) {
s3Client.copyObject(copyRequest);
if (!s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) {
throw new IOE(
"After copy was reported as successful the file doesn't exist in the target location [%s]",
copyMsg
@ -199,7 +203,7 @@ public class S3DataSegmentMover implements DataSegmentMover
}
} else {
// ensure object exists in target location
if (s3Client.isObjectInBucket(targetS3Bucket, targetS3Path)) {
if (s3Client.doesObjectExist(targetS3Bucket, targetS3Path)) {
log.info(
"Not moving file [s3://%s/%s], already present in target location [s3://%s/%s]",
s3Bucket, s3Path,

View File

@ -19,6 +19,11 @@
package io.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
@ -33,17 +38,15 @@ import io.druid.java.util.common.MapUtils;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.UOE;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.segment.loading.URIDataPuller;
import io.druid.timeline.DataSegment;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.StorageObject;
import javax.tools.FileObject;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -59,17 +62,15 @@ public class S3DataSegmentPuller implements URIDataPuller
{
public static final int DEFAULT_RETRY_COUNT = 3;
public static FileObject buildFileObject(final URI uri, final RestS3Service s3Client) throws ServiceException
private static FileObject buildFileObject(final URI uri, final AmazonS3 s3Client) throws AmazonServiceException
{
final S3Coords coords = new S3Coords(checkURI(uri));
final StorageObject s3Obj = s3Client.getObjectDetails(coords.bucket, coords.path);
final S3ObjectSummary objectSummary = S3Utils.getSingleObjectSummary(s3Client, coords.bucket, coords.path);
final String path = uri.getPath();
return new FileObject()
{
final Object inputStreamOpener = new Object();
volatile boolean streamAcquired = false;
volatile StorageObject storageObject = s3Obj;
S3Object s3Object = null;
@Override
public URI toUri()
@ -84,22 +85,33 @@ public class S3DataSegmentPuller implements URIDataPuller
return Files.getNameWithoutExtension(path) + (Strings.isNullOrEmpty(ext) ? "" : ("." + ext));
}
/**
* Returns an input stream for a s3 object. The returned input stream is not thread-safe.
*/
@Override
public InputStream openInputStream() throws IOException
{
try {
synchronized (inputStreamOpener) {
if (streamAcquired) {
return storageObject.getDataInputStream();
}
if (s3Object == null) {
// lazily promote to full GET
storageObject = s3Client.getObject(s3Obj.getBucketName(), s3Obj.getKey());
final InputStream stream = storageObject.getDataInputStream();
streamAcquired = true;
return stream;
s3Object = s3Client.getObject(objectSummary.getBucketName(), objectSummary.getKey());
}
final InputStream in = s3Object.getObjectContent();
final Closer closer = Closer.create();
closer.register(in);
closer.register(s3Object);
return new FilterInputStream(in)
{
@Override
public void close() throws IOException
{
closer.close();
}
};
}
catch (ServiceException e) {
catch (AmazonServiceException e) {
throw new IOE(e, "Could not load S3 URI [%s]", uri);
}
}
@ -131,7 +143,7 @@ public class S3DataSegmentPuller implements URIDataPuller
@Override
public long getLastModified()
{
return s3Obj.getLastModifiedDate().getTime();
return objectSummary.getLastModified().getTime();
}
@Override
@ -149,11 +161,11 @@ public class S3DataSegmentPuller implements URIDataPuller
protected static final String BUCKET = "bucket";
protected static final String KEY = "key";
protected final RestS3Service s3Client;
protected final AmazonS3 s3Client;
@Inject
public S3DataSegmentPuller(
RestS3Service s3Client
AmazonS3 s3Client
)
{
this.s3Client = s3Client;
@ -180,7 +192,7 @@ public class S3DataSegmentPuller implements URIDataPuller
try {
return buildFileObject(uri, s3Client).openInputStream();
}
catch (ServiceException e) {
catch (AmazonServiceException e) {
if (e.getCause() != null) {
if (S3Utils.S3RETRY.apply(e)) {
throw new IOException("Recoverable exception", e);
@ -242,7 +254,7 @@ public class S3DataSegmentPuller implements URIDataPuller
try {
return buildFileObject(uri, s3Client).openInputStream();
}
catch (ServiceException e) {
catch (AmazonServiceException e) {
throw new IOE(e, "Could not load URI [%s]", uri);
}
}
@ -259,8 +271,8 @@ public class S3DataSegmentPuller implements URIDataPuller
if (e == null) {
return false;
}
if (e instanceof ServiceException) {
return S3Utils.isServiceExceptionRecoverable((ServiceException) e);
if (e instanceof AmazonServiceException) {
return S3Utils.isServiceExceptionRecoverable((AmazonServiceException) e);
}
if (S3Utils.S3RETRY.apply(e)) {
return true;
@ -284,10 +296,11 @@ public class S3DataSegmentPuller implements URIDataPuller
public String getVersion(URI uri) throws IOException
{
try {
final FileObject object = buildFileObject(uri, s3Client);
return StringUtils.format("%d", object.getLastModified());
final S3Coords coords = new S3Coords(checkURI(uri));
final S3ObjectSummary objectSummary = S3Utils.getSingleObjectSummary(s3Client, coords.bucket, coords.path);
return StringUtils.format("%d", objectSummary.getLastModified().getTime());
}
catch (ServiceException e) {
catch (AmazonServiceException e) {
if (S3Utils.isServiceExceptionRecoverable(e)) {
// The recoverable logic is always true for IOException, so we want to only pass IOException if it is recoverable
throw new IOE(e, "Could not fetch last modified timestamp from URI [%s]", uri);
@ -301,10 +314,10 @@ public class S3DataSegmentPuller implements URIDataPuller
{
try {
return S3Utils.retryS3Operation(
() -> S3Utils.isObjectInBucket(s3Client, coords.bucket, coords.path)
() -> S3Utils.isObjectInBucketIgnoringPermission(s3Client, coords.bucket, coords.path)
);
}
catch (S3ServiceException | IOException e) {
catch (AmazonS3Exception | IOException e) {
throw new SegmentLoadingException(e, "S3 fail! Key[%s]", coords);
}
catch (Exception e) {

View File

@ -19,21 +19,20 @@
package io.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.segment.SegmentUtils;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.timeline.DataSegment;
import org.jets3t.service.ServiceException;
import org.jets3t.service.acl.gs.GSAccessControlList;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import java.io.File;
import java.io.IOException;
@ -46,13 +45,13 @@ public class S3DataSegmentPusher implements DataSegmentPusher
{
private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class);
private final RestS3Service s3Client;
private final AmazonS3 s3Client;
private final S3DataSegmentPusherConfig config;
private final ObjectMapper jsonMapper;
@Inject
public S3DataSegmentPusher(
RestS3Service s3Client,
AmazonS3 s3Client,
S3DataSegmentPusherConfig config,
ObjectMapper jsonMapper
)
@ -97,45 +96,43 @@ public class S3DataSegmentPusher implements DataSegmentPusher
final File zipOutFile = File.createTempFile("druid", "index.zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
final DataSegment outSegment = inSegment.withSize(indexSize)
.withLoadSpec(makeLoadSpec(config.getBucket(), s3Path))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
final File descriptorFile = File.createTempFile("druid", "descriptor.json");
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment));
try {
return S3Utils.retryS3Operation(
() -> {
S3Object toPush = new S3Object(zipOutFile);
putObject(config.getBucket(), s3Path, toPush, replaceExisting);
final DataSegment outSegment = inSegment.withSize(indexSize)
.withLoadSpec(makeLoadSpec(config.getBucket(), toPush.getKey()))
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
File descriptorFile = File.createTempFile("druid", "descriptor.json");
// Avoid using Guava in DataSegmentPushers because they might be used with very diverse Guava versions in
// runtime, and because Guava deletes methods over time, that causes incompatibilities.
Files.write(descriptorFile.toPath(), jsonMapper.writeValueAsBytes(outSegment));
S3Object descriptorObject = new S3Object(descriptorFile);
putObject(
uploadFileIfPossible(s3Client, config.getBucket(), s3Path, zipOutFile, replaceExisting);
uploadFileIfPossible(
s3Client,
config.getBucket(),
S3Utils.descriptorPathForSegmentPath(s3Path),
descriptorObject,
descriptorFile,
replaceExisting
);
log.info("Deleting zipped index File[%s]", zipOutFile);
zipOutFile.delete();
log.info("Deleting descriptor file[%s]", descriptorFile);
descriptorFile.delete();
return outSegment;
}
);
}
catch (ServiceException e) {
catch (AmazonServiceException e) {
throw new IOException(e);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
finally {
log.info("Deleting temporary cached index.zip");
zipOutFile.delete();
log.info("Deleting temporary cached descriptor.json");
descriptorFile.delete();
}
}
@Override
@ -163,21 +160,26 @@ public class S3DataSegmentPusher implements DataSegmentPusher
);
}
private void putObject(String bucketName, String path, S3Object object, boolean replaceExisting)
throws ServiceException
private void uploadFileIfPossible(
AmazonS3 s3Client,
String bucket,
String key,
File file,
boolean replaceExisting
)
{
object.setBucketName(bucketName);
object.setKey(path);
if (!config.getDisableAcl()) {
object.setAcl(GSAccessControlList.REST_CANNED_BUCKET_OWNER_FULL_CONTROL);
}
log.info("Pushing %s.", object);
if (!replaceExisting && S3Utils.isObjectInBucket(s3Client, bucketName, object.getKey())) {
log.info("Skipping push because key [%s] exists && replaceExisting == false", object.getKey());
if (!replaceExisting && S3Utils.isObjectInBucketIgnoringPermission(s3Client, bucket, key)) {
log.info("Skipping push because key [%s] exists && replaceExisting == false", key);
} else {
s3Client.putObject(bucketName, object);
final PutObjectRequest indexFilePutRequest = new PutObjectRequest(bucket, key, file);
if (!config.getDisableAcl()) {
indexFilePutRequest.setAccessControlList(
S3Utils.grantFullControlToBucketOwner(s3Client, bucket)
);
}
log.info("Pushing [%s] to bucket[%s] and key[%s].", file, bucket, key);
s3Client.putObject(indexFilePutRequest);
}
}
}

View File

@ -19,8 +19,12 @@
package io.druid.storage.s3;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.ClientConfigurationFactory;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.S3ClientOptions;
import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.Module;
import com.google.common.collect.ImmutableList;
@ -28,13 +32,14 @@ import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.multibindings.MapBinder;
import io.druid.common.aws.AWSCredentialsConfig;
import io.druid.common.aws.AWSEndpointConfig;
import io.druid.common.aws.AWSProxyConfig;
import io.druid.data.SearchableVersionedDataFinder;
import io.druid.guice.Binders;
import io.druid.guice.JsonConfigProvider;
import io.druid.guice.LazySingleton;
import io.druid.initialization.DruidModule;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.security.AWSCredentials;
import org.apache.commons.lang.StringUtils;
import java.util.List;
@ -75,6 +80,8 @@ public class S3StorageDruidModule implements DruidModule
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class);
JsonConfigProvider.bind(binder, "druid.s3.proxy", AWSProxyConfig.class);
JsonConfigProvider.bind(binder, "druid.s3.endpoint", AWSEndpointConfig.class);
MapBinder.newMapBinder(binder, String.class, SearchableVersionedDataFinder.class)
.addBinding("s3")
.to(S3TimestampVersionedDataFinder.class)
@ -101,15 +108,44 @@ public class S3StorageDruidModule implements DruidModule
@Provides
@LazySingleton
public RestS3Service getRestS3Service(AWSCredentialsProvider provider)
public AmazonS3 getAmazonS3Client(
AWSCredentialsProvider provider,
AWSProxyConfig proxyConfig,
AWSEndpointConfig endpointConfig
)
{
if (provider.getCredentials() instanceof AWSSessionCredentials) {
return new RestS3Service(new AWSSessionCredentialsAdapter(provider));
} else {
return new RestS3Service(new AWSCredentials(
provider.getCredentials().getAWSAccessKeyId(),
provider.getCredentials().getAWSSecretKey()
));
// AmazonS3ClientBuilder can't be used because it makes integration tests failed
final ClientConfiguration configuration = new ClientConfigurationFactory().getConfig();
final AmazonS3Client client = new AmazonS3Client(provider, setProxyConfig(configuration, proxyConfig));
if (StringUtils.isNotEmpty(endpointConfig.getUrl())) {
if (StringUtils.isNotEmpty(endpointConfig.getServiceName()) &&
StringUtils.isNotEmpty(endpointConfig.getSigningRegion())) {
client.setEndpoint(endpointConfig.getUrl(), endpointConfig.getServiceName(), endpointConfig.getSigningRegion());
} else {
client.setEndpoint(endpointConfig.getUrl());
}
}
client.setS3ClientOptions(S3ClientOptions.builder().enableForceGlobalBucketAccess().build());
return client;
}
private static ClientConfiguration setProxyConfig(ClientConfiguration conf, AWSProxyConfig proxyConfig)
{
if (StringUtils.isNotEmpty(proxyConfig.getHost())) {
conf.setProxyHost(proxyConfig.getHost());
}
if (proxyConfig.getPort() != -1) {
conf.setProxyPort(proxyConfig.getPort());
}
if (StringUtils.isNotEmpty(proxyConfig.getUsername())) {
conf.setProxyUsername(proxyConfig.getUsername());
}
if (StringUtils.isNotEmpty(proxyConfig.getPassword())) {
conf.setProxyPassword(proxyConfig.getPassword());
}
return conf;
}
}

View File

@ -19,6 +19,11 @@
package io.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.io.ByteSource;
@ -27,10 +32,6 @@ import io.druid.java.util.common.IOE;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.tasklogs.TaskLogs;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageService;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.StorageObject;
import java.io.File;
import java.io.IOException;
@ -43,11 +44,11 @@ public class S3TaskLogs implements TaskLogs
{
private static final Logger log = new Logger(S3TaskLogs.class);
private final StorageService service;
private final AmazonS3 service;
private final S3TaskLogsConfig config;
@Inject
public S3TaskLogs(S3TaskLogsConfig config, RestS3Service service)
public S3TaskLogs(S3TaskLogsConfig config, AmazonS3 service)
{
this.config = config;
this.service = service;
@ -59,9 +60,9 @@ public class S3TaskLogs implements TaskLogs
final String taskKey = getTaskLogKey(taskid);
try {
final StorageObject objectDetails = service.getObjectDetails(config.getS3Bucket(), taskKey, null, null, null, null);
final ObjectMetadata objectMetadata = service.getObjectMetadata(config.getS3Bucket(), taskKey);
return Optional.<ByteSource>of(
return Optional.of(
new ByteSource()
{
@Override
@ -69,36 +70,31 @@ public class S3TaskLogs implements TaskLogs
{
try {
final long start;
final long end = objectDetails.getContentLength() - 1;
final long end = objectMetadata.getContentLength() - 1;
if (offset > 0 && offset < objectDetails.getContentLength()) {
if (offset > 0 && offset < objectMetadata.getContentLength()) {
start = offset;
} else if (offset < 0 && (-1 * offset) < objectDetails.getContentLength()) {
start = objectDetails.getContentLength() + offset;
} else if (offset < 0 && (-1 * offset) < objectMetadata.getContentLength()) {
start = objectMetadata.getContentLength() + offset;
} else {
start = 0;
}
return service.getObject(
config.getS3Bucket(),
taskKey,
null,
null,
new String[]{objectDetails.getETag()},
null,
start,
end
).getDataInputStream();
final GetObjectRequest request = new GetObjectRequest(config.getS3Bucket(), taskKey)
.withMatchingETagConstraint(objectMetadata.getETag())
.withRange(start, end);
return service.getObject(request).getObjectContent();
}
catch (ServiceException e) {
catch (AmazonServiceException e) {
throw new IOException(e);
}
}
}
);
}
catch (ServiceException e) {
if (404 == e.getResponseCode()
catch (AmazonS3Exception e) {
if (404 == e.getStatusCode()
|| "NoSuchKey".equals(e.getErrorCode())
|| "NoSuchBucket".equals(e.getErrorCode())) {
return Optional.absent();
@ -117,9 +113,7 @@ public class S3TaskLogs implements TaskLogs
try {
S3Utils.retryS3Operation(
() -> {
final StorageObject object = new StorageObject(logFile);
object.setKey(taskKey);
service.putObject(config.getS3Bucket(), object);
service.putObject(config.getS3Bucket(), taskKey, logFile);
return null;
}
);

View File

@ -19,16 +19,17 @@
package io.druid.storage.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Throwables;
import com.google.inject.Inject;
import io.druid.data.SearchableVersionedDataFinder;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.Iterator;
import java.util.regex.Pattern;
/**
@ -37,8 +38,10 @@ import java.util.regex.Pattern;
*/
public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implements SearchableVersionedDataFinder<URI>
{
private static final int MAX_LISTING_KEYS = 1000;
@Inject
public S3TimestampVersionedDataFinder(RestS3Service s3Client)
public S3TimestampVersionedDataFinder(AmazonS3 s3Client)
{
super(s3Client);
}
@ -65,23 +68,27 @@ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implemen
final S3Coords coords = new S3Coords(checkURI(uri));
long mostRecent = Long.MIN_VALUE;
URI latest = null;
S3Object[] objects = s3Client.listObjects(coords.bucket, coords.path, null);
if (objects == null) {
return null;
}
for (S3Object storageObject : objects) {
storageObject.closeDataInputStream();
String keyString = storageObject.getKey().substring(coords.path.length());
final Iterator<S3ObjectSummary> objectSummaryIterator = S3Utils.objectSummaryIterator(
s3Client,
coords.bucket,
coords.path,
MAX_LISTING_KEYS
);
while (objectSummaryIterator.hasNext()) {
final S3ObjectSummary objectSummary = objectSummaryIterator.next();
String keyString = objectSummary.getKey().substring(coords.path.length());
if (keyString.startsWith("/")) {
keyString = keyString.substring(1);
}
if (pattern != null && !pattern.matcher(keyString).matches()) {
continue;
}
final long latestModified = storageObject.getLastModifiedDate().getTime();
final long latestModified = objectSummary.getLastModified().getTime();
if (latestModified >= mostRecent) {
mostRecent = latestModified;
latest = new URI(StringUtils.format("s3://%s/%s", storageObject.getBucketName(), storageObject.getKey()));
latest = new URI(
StringUtils.format("s3://%s/%s", objectSummary.getBucketName(), objectSummary.getKey())
);
}
}
return latest;

View File

@ -19,19 +19,27 @@
package io.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.RetryUtils.Task;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.NoSuchElementException;
/**
*
@ -39,25 +47,12 @@ import java.util.Iterator;
public class S3Utils
{
private static final Joiner JOINER = Joiner.on("/").skipNulls();
private static final String MIMETYPE_JETS3T_DIRECTORY = "application/x-directory";
public static void closeStreamsQuietly(S3Object s3Obj)
{
if (s3Obj == null) {
return;
}
try {
s3Obj.closeDataInputStream();
}
catch (IOException e) {
}
}
public static boolean isServiceExceptionRecoverable(ServiceException ex)
static boolean isServiceExceptionRecoverable(AmazonServiceException ex)
{
final boolean isIOException = ex.getCause() instanceof IOException;
final boolean isTimeout = "RequestTimeout".equals(((ServiceException) ex).getErrorCode());
final boolean isTimeout = "RequestTimeout".equals(ex.getErrorCode());
return isIOException || isTimeout;
}
@ -70,8 +65,8 @@ public class S3Utils
return false;
} else if (e instanceof IOException) {
return true;
} else if (e instanceof ServiceException) {
return isServiceExceptionRecoverable((ServiceException) e);
} else if (e instanceof AmazonServiceException) {
return isServiceExceptionRecoverable((AmazonServiceException) e);
} else {
return apply(e.getCause());
}
@ -88,91 +83,81 @@ public class S3Utils
return RetryUtils.retry(f, S3RETRY, maxTries);
}
public static boolean isObjectInBucket(RestS3Service s3Client, String bucketName, String objectKey)
throws ServiceException
static boolean isObjectInBucketIgnoringPermission(AmazonS3 s3Client, String bucketName, String objectKey)
{
try {
s3Client.getObjectDetails(bucketName, objectKey);
return s3Client.doesObjectExist(bucketName, objectKey);
}
catch (ServiceException e) {
if (404 == e.getResponseCode()
|| "NoSuchKey".equals(e.getErrorCode())
|| "NoSuchBucket".equals(e.getErrorCode())) {
return false;
}
if ("AccessDenied".equals(e.getErrorCode())) {
catch (AmazonS3Exception e) {
if (e.getStatusCode() == 404) {
// Object is inaccessible to current user, but does exist.
return true;
}
// Something else has gone wrong
throw e;
}
return true;
}
public static Iterator<StorageObject> storageObjectsIterator(
final RestS3Service s3Client,
public static Iterator<S3ObjectSummary> objectSummaryIterator(
final AmazonS3 s3Client,
final String bucket,
final String prefix,
final long maxListingLength
final int numMaxKeys
)
{
return new Iterator<StorageObject>()
final ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(prefix)
.withMaxKeys(numMaxKeys);
return new Iterator<S3ObjectSummary>()
{
private StorageObjectsChunk objectsChunk;
private int objectsChunkOffset;
private ListObjectsV2Result result;
private Iterator<S3ObjectSummary> objectSummaryIterator;
{
fetchNextBatch();
}
private void fetchNextBatch()
{
result = s3Client.listObjectsV2(request);
objectSummaryIterator = result.getObjectSummaries().iterator();
request.setContinuationToken(result.getContinuationToken());
}
@Override
public boolean hasNext()
{
if (objectsChunk == null) {
objectsChunk = listObjectsChunkedAfter("");
objectsChunkOffset = 0;
}
if (objectsChunk.getObjects().length <= objectsChunkOffset) {
if (objectsChunk.isListingComplete()) {
return false;
} else {
objectsChunk = listObjectsChunkedAfter(objectsChunk.getPriorLastKey());
objectsChunkOffset = 0;
}
}
return true;
}
private StorageObjectsChunk listObjectsChunkedAfter(final String priorLastKey)
{
try {
return retryS3Operation(
() -> s3Client.listObjectsChunked(bucket, prefix, null, maxListingLength, priorLastKey)
);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
return objectSummaryIterator.hasNext() || result.isTruncated();
}
@Override
public StorageObject next()
public S3ObjectSummary next()
{
if (!hasNext()) {
throw new IllegalStateException();
throw new NoSuchElementException();
}
StorageObject storageObject = objectsChunk.getObjects()[objectsChunkOffset];
objectsChunkOffset++;
return storageObject;
if (objectSummaryIterator.hasNext()) {
return objectSummaryIterator.next();
}
if (result.isTruncated()) {
fetchNextBatch();
}
if (!objectSummaryIterator.hasNext()) {
throw new ISE(
"Failed to further iterate on bucket[%s] and prefix[%s]. The last continuationToken was [%s]",
bucket,
prefix,
result.getContinuationToken()
);
}
return objectSummaryIterator.next();
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
@ -184,25 +169,93 @@ public class S3Utils
) + "/index.zip";
}
public static String descriptorPathForSegmentPath(String s3Path)
static String descriptorPathForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/descriptor.json";
}
public static String indexZipForSegmentPath(String s3Path)
static String indexZipForSegmentPath(String s3Path)
{
return s3Path.substring(0, s3Path.lastIndexOf("/")) + "/index.zip";
}
public static String toFilename(String key)
static String toFilename(String key)
{
return toFilename(key, "");
}
public static String toFilename(String key, final String suffix)
static String toFilename(String key, final String suffix)
{
String filename = key.substring(key.lastIndexOf("/") + 1); // characters after last '/'
filename = filename.substring(0, filename.length() - suffix.length()); // remove the suffix from the end
return filename;
}
static AccessControlList grantFullControlToBucketOwner(AmazonS3 s3Client, String bucket)
{
final AccessControlList acl = s3Client.getBucketAcl(bucket);
acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl));
return acl;
}
public static String extractS3Key(URI uri)
{
return uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath();
}
// Copied from org.jets3t.service.model.StorageObject.isDirectoryPlaceholder()
public static boolean isDirectoryPlaceholder(String key, ObjectMetadata objectMetadata)
{
// Recognize "standard" directory place-holder indications used by
// Amazon's AWS Console and Panic's Transmit.
if (key.endsWith("/") && objectMetadata.getContentLength() == 0) {
return true;
}
// Recognize s3sync.rb directory placeholders by MD5/ETag value.
if ("d66759af42f282e1ba19144df2d405d0".equals(objectMetadata.getETag())) {
return true;
}
// Recognize place-holder objects created by the Google Storage console
// or S3 Organizer Firefox extension.
if (key.endsWith("_$folder$") && objectMetadata.getContentLength() == 0) {
return true;
}
// We don't use JetS3t APIs anymore, but the below check is still needed for backward compatibility.
// Recognize legacy JetS3t directory place-holder objects, only gives
// accurate results if an object's metadata is populated.
if (objectMetadata.getContentLength() == 0 && MIMETYPE_JETS3T_DIRECTORY.equals(objectMetadata.getContentType())) {
return true;
}
return false;
}
/**
* Gets a single {@link S3ObjectSummary} from s3. Since this method might return a wrong object if there are multiple
* objects that match the given key, this method should be used only when it's guaranteed that the given key is unique
* in the given bucket.
*
* @param s3Client s3 client
* @param bucket s3 bucket
* @param key unique key for the object to be retrieved
*/
public static S3ObjectSummary getSingleObjectSummary(AmazonS3 s3Client, String bucket, String key)
{
final ListObjectsV2Request request = new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(key)
.withMaxKeys(1);
final ListObjectsV2Result result = s3Client.listObjectsV2(request);
if (result.getKeyCount() == 0) {
throw new ISE("Cannot find object for bucket[%s] and key[%s]", bucket, key);
}
final S3ObjectSummary objectSummary = result.getObjectSummaries().get(0);
if (!objectSummary.getBucketName().equals(bucket) || !objectSummary.getKey().equals(key)) {
throw new ISE("Wrong object[%s] for bucket[%s] and key[%s]", objectSummary, bucket, key);
}
return objectSummary;
}
}

View File

@ -19,8 +19,13 @@
package io.druid.firehose.s3;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.guice.ObjectMapperModule;
import com.google.common.collect.ImmutableList;
@ -29,8 +34,6 @@ import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Provides;
import io.druid.initialization.DruidModule;
import io.druid.jackson.DefaultObjectMapper;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.junit.Assert;
import org.junit.Test;
@ -42,7 +45,7 @@ import java.util.List;
*/
public class StaticS3FirehoseFactoryTest
{
private static final RestS3Service SERVICE = new RestS3Service(null);
private static final AmazonS3Client SERVICE = new AmazonS3Client();
@Test
public void testSerde() throws Exception
@ -75,14 +78,14 @@ public class StaticS3FirehoseFactoryTest
private static ObjectMapper createObjectMapper(DruidModule baseModule)
{
final ObjectMapper baseMapper = new DefaultObjectMapper();
baseModule.getJacksonModules().forEach(baseMapper::registerModule);
final Injector injector = Guice.createInjector(
new ObjectMapperModule(),
baseModule
);
return injector.getInstance(ObjectMapper.class);
final ObjectMapper baseMapper = injector.getInstance(ObjectMapper.class);
baseModule.getJacksonModules().forEach(baseMapper::registerModule);
return baseMapper;
}
private static class TestS3Module implements DruidModule
@ -90,7 +93,9 @@ public class StaticS3FirehoseFactoryTest
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(new SimpleModule());
// Deserializer is need for AmazonS3Client even though it is injected.
// See https://github.com/FasterXML/jackson-databind/issues/962.
return ImmutableList.of(new SimpleModule().addDeserializer(AmazonS3.class, new ItemDeserializer()));
}
@Override
@ -100,9 +105,28 @@ public class StaticS3FirehoseFactoryTest
}
@Provides
public RestS3Service getRestS3Service()
public AmazonS3 getAmazonS3Client()
{
return SERVICE;
}
}
public static class ItemDeserializer extends StdDeserializer<AmazonS3>
{
public ItemDeserializer()
{
this(null);
}
public ItemDeserializer(Class<?> vc)
{
super(vc);
}
@Override
public AmazonS3 deserialize(JsonParser jp, DeserializationContext ctxt)
{
throw new UnsupportedOperationException();
}
}
}

View File

@ -19,6 +19,7 @@
package io.druid.storage.s3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.InjectableValues;
@ -30,7 +31,6 @@ import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.Intervals;
import io.druid.timeline.DataSegment;
import org.easymock.EasyMock;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@ -65,7 +65,7 @@ public class S3DataSegmentArchiverTest
}
};
private static final S3DataSegmentPusherConfig PUSHER_CONFIG = new S3DataSegmentPusherConfig();
private static final RestS3Service S3_SERVICE = EasyMock.createStrictMock(RestS3Service.class);
private static final AmazonS3Client S3_SERVICE = EasyMock.createStrictMock(AmazonS3Client.class);
private static final S3DataSegmentPuller PULLER = new S3DataSegmentPuller(S3_SERVICE);
private static final DataSegment SOURCE_SEGMENT = DataSegment
.builder()

View File

@ -19,6 +19,15 @@
package io.druid.storage.s3;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Predicate;
@ -31,17 +40,13 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.TestHelper;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.jets3t.service.ServiceException;
import org.jets3t.service.StorageObjectsChunk;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
@ -50,8 +55,12 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -105,7 +114,7 @@ public class S3DataSegmentFinderTest
@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
RestS3Service mockS3Client;
MockAmazonS3Client mockS3Client;
S3DataSegmentPusherConfig config;
private String bucket;
@ -122,8 +131,6 @@ public class S3DataSegmentFinderTest
private String indexZip4_0;
private String indexZip4_1;
@BeforeClass
public static void setUpStatic()
{
@ -140,7 +147,7 @@ public class S3DataSegmentFinderTest
config.setBucket(bucket);
config.setBaseKey(baseKey);
mockS3Client = new MockStorageService(temporaryFolder.newFolder());
mockS3Client = new MockAmazonS3Client(temporaryFolder.newFolder());
descriptor1 = S3Utils.descriptorPathForSegmentPath(baseKey + "/interval1/v1/0/");
@ -154,17 +161,17 @@ public class S3DataSegmentFinderTest
indexZip4_0 = S3Utils.indexZipForSegmentPath(descriptor4_0);
indexZip4_1 = S3Utils.indexZipForSegmentPath(descriptor4_1);
mockS3Client.putObject(bucket, new S3Object(descriptor1, mapper.writeValueAsString(SEGMENT_1)));
mockS3Client.putObject(bucket, new S3Object(descriptor2, mapper.writeValueAsString(SEGMENT_2)));
mockS3Client.putObject(bucket, new S3Object(descriptor3, mapper.writeValueAsString(SEGMENT_3)));
mockS3Client.putObject(bucket, new S3Object(descriptor4_0, mapper.writeValueAsString(SEGMENT_4_0)));
mockS3Client.putObject(bucket, new S3Object(descriptor4_1, mapper.writeValueAsString(SEGMENT_4_1)));
mockS3Client.putObject(bucket, descriptor1, mapper.writeValueAsString(SEGMENT_1));
mockS3Client.putObject(bucket, descriptor2, mapper.writeValueAsString(SEGMENT_2));
mockS3Client.putObject(bucket, descriptor3, mapper.writeValueAsString(SEGMENT_3));
mockS3Client.putObject(bucket, descriptor4_0, mapper.writeValueAsString(SEGMENT_4_0));
mockS3Client.putObject(bucket, descriptor4_1, mapper.writeValueAsString(SEGMENT_4_1));
mockS3Client.putObject(bucket, new S3Object(indexZip1, "dummy"));
mockS3Client.putObject(bucket, new S3Object(indexZip2, "dummy"));
mockS3Client.putObject(bucket, new S3Object(indexZip3, "dummy"));
mockS3Client.putObject(bucket, new S3Object(indexZip4_0, "dummy"));
mockS3Client.putObject(bucket, new S3Object(indexZip4_1, "dummy"));
mockS3Client.putObject(bucket, indexZip1, "dummy");
mockS3Client.putObject(bucket, indexZip2, "dummy");
mockS3Client.putObject(bucket, indexZip3, "dummy");
mockS3Client.putObject(bucket, indexZip4_0, "dummy");
mockS3Client.putObject(bucket, indexZip4_1, "dummy");
}
@Test
@ -210,34 +217,34 @@ public class S3DataSegmentFinderTest
final String serializedSegment4_1 = mapper.writeValueAsString(updatedSegment4_1);
Assert.assertNotEquals(serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent()));
Assert.assertNotEquals(serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent()));
Assert.assertNotEquals(serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent()));
Assert.assertNotEquals(serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent()));
Assert.assertNotEquals(serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent()));
final Set<DataSegment> segments2 = s3DataSegmentFinder.findSegments("", true);
Assert.assertEquals(segments, segments2);
Assert.assertEquals(serializedSegment1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor1).getObjectContent()));
Assert.assertEquals(serializedSegment2,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor2).getObjectContent()));
Assert.assertEquals(serializedSegment3,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor3).getObjectContent()));
Assert.assertEquals(serializedSegment4_0,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_0).getObjectContent()));
Assert.assertEquals(serializedSegment4_1,
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getDataInputStream()));
IOUtils.toString(mockS3Client.getObject(bucket, descriptor4_1).getObjectContent()));
}
@Test(expected = SegmentLoadingException.class)
public void testFindSegmentsFail() throws SegmentLoadingException, ServiceException
public void testFindSegmentsFail() throws SegmentLoadingException
{
mockS3Client.deleteObject(bucket, indexZip4_1);
@ -275,21 +282,8 @@ public class S3DataSegmentFinderTest
final String descriptorPath = S3Utils.descriptorPathForSegmentPath(segmentPath);
final String indexPath = S3Utils.indexZipForSegmentPath(segmentPath);
mockS3Client.putObject(
config.getBucket(),
new S3Object(
descriptorPath,
mapper.writeValueAsString(segmentMissingLoadSpec)
)
);
mockS3Client.putObject(
config.getBucket(),
new S3Object(
indexPath,
"dummy"
)
);
mockS3Client.putObject(config.getBucket(), descriptorPath, mapper.writeValueAsString(segmentMissingLoadSpec));
mockS3Client.putObject(config.getBucket(), indexPath, "dummy");
Set<DataSegment> segments = s3DataSegmentFinder.findSegments(segmentPath, false);
Assert.assertEquals(1, segments.size());
@ -308,24 +302,34 @@ public class S3DataSegmentFinderTest
return S3Utils.descriptorPathForSegmentPath(String.valueOf(segment.getLoadSpec().get("key")));
}
private static class MockStorageService extends RestS3Service
private static class MockAmazonS3Client extends AmazonS3Client
{
private final File baseDir;
private final Map<String, Set<String>> storage = Maps.newHashMap();
public MockStorageService(File baseDir)
public MockAmazonS3Client(File baseDir)
{
super(null);
super();
this.baseDir = baseDir;
}
@Override
public StorageObjectsChunk listObjectsChunked(
final String bucketName, final String prefix, final String delimiter,
final long maxListingLength, final String priorLastKey
) throws ServiceException
public boolean doesObjectExist(String bucketName, String objectName)
{
List<String> keysOrigin = Lists.newArrayList(storage.get(bucketName));
final Set<String> keys = storage.get(bucketName);
if (keys != null) {
return keys.contains(objectName);
}
return false;
}
@Override
public ListObjectsV2Result listObjectsV2(ListObjectsV2Request listObjectsV2Request)
{
final String bucketName = listObjectsV2Request.getBucketName();
final String prefix = listObjectsV2Request.getPrefix();
final List<String> keysOrigin = Lists.newArrayList(storage.get(bucketName));
Predicate<String> prefixFilter = new Predicate<String>()
{
@ -341,11 +345,11 @@ public class S3DataSegmentFinderTest
);
int startOffset = 0;
if (priorLastKey != null) {
startOffset = keys.indexOf(priorLastKey) + 1;
if (listObjectsV2Request.getContinuationToken() != null) {
startOffset = keys.indexOf(listObjectsV2Request.getContinuationToken()) + 1;
}
int endOffset = startOffset + (int) maxListingLength; // exclusive
int endOffset = startOffset + listObjectsV2Request.getMaxKeys(); // exclusive
if (endOffset > keys.size()) {
endOffset = keys.size();
}
@ -355,64 +359,73 @@ public class S3DataSegmentFinderTest
newPriorLastkey = null;
}
List<StorageObject> objects = Lists.newArrayList();
List<S3ObjectSummary> objects = new ArrayList<>();
for (String objectKey : keys.subList(startOffset, endOffset)) {
objects.add(getObjectDetails(bucketName, objectKey));
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucketName);
objectSummary.setKey(objectKey);
objects.add(objectSummary);
}
return new StorageObjectsChunk(
prefix, delimiter, objects.toArray(new StorageObject[]{}), null, newPriorLastkey);
}
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setBucketName(bucketName);
result.setKeyCount(objects.size());
result.getObjectSummaries().addAll(objects);
result.setContinuationToken(newPriorLastkey);
result.setTruncated(newPriorLastkey != null);
@Override
public StorageObject getObjectDetails(String bucketName, String objectKey) throws ServiceException
{
if (!storage.containsKey(bucketName)) {
ServiceException ex = new ServiceException();
ex.setResponseCode(404);
ex.setErrorCode("NoSuchBucket");
throw ex;
}
if (!storage.get(bucketName).contains(objectKey)) {
ServiceException ex = new ServiceException();
ex.setResponseCode(404);
ex.setErrorCode("NoSuchKey");
throw ex;
}
final File objectPath = new File(baseDir, objectKey);
StorageObject storageObject = new StorageObject();
storageObject.setBucketName(bucketName);
storageObject.setKey(objectKey);
storageObject.setDataInputFile(objectPath);
return storageObject;
return result;
}
@Override
public S3Object getObject(String bucketName, String objectKey)
{
if (!storage.containsKey(bucketName)) {
AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest");
ex.setStatusCode(404);
ex.setErrorCode("NoSuchBucket");
throw ex;
}
if (!storage.get(bucketName).contains(objectKey)) {
AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest");
ex.setStatusCode(404);
ex.setErrorCode("NoSuchKey");
throw ex;
}
final File objectPath = new File(baseDir, objectKey);
S3Object s3Object = new S3Object();
s3Object.setBucketName(bucketName);
s3Object.setKey(objectKey);
s3Object.setDataInputFile(objectPath);
return s3Object;
S3Object storageObject = new S3Object();
storageObject.setBucketName(bucketName);
storageObject.setKey(objectKey);
try {
storageObject.setObjectContent(new FileInputStream(objectPath));
}
catch (FileNotFoundException e) {
AmazonServiceException ex = new AmazonS3Exception("S3DataSegmentFinderTest", e);
ex.setStatusCode(500);
ex.setErrorCode("InternalError");
throw ex;
}
return storageObject;
}
@Override
public S3Object putObject(final String bucketName, final S3Object object)
public PutObjectResult putObject(String bucketName, String key, String data)
{
return putObject(bucketName, key, new ByteArrayInputStream(StringUtils.toUtf8(data)), null);
}
@Override
public PutObjectResult putObject(String bucketName, String key, InputStream input, ObjectMetadata metadata)
{
if (!storage.containsKey(bucketName)) {
storage.put(bucketName, Sets.<String>newHashSet());
storage.put(bucketName, Sets.newHashSet());
}
storage.get(bucketName).add(object.getKey());
storage.get(bucketName).add(key);
final File objectPath = new File(baseDir, object.getKey());
final File objectPath = new File(baseDir, key);
if (!objectPath.getParentFile().exists()) {
objectPath.getParentFile().mkdirs();
@ -420,7 +433,7 @@ public class S3DataSegmentFinderTest
try {
try (
InputStream in = object.getDataInputStream()
InputStream in = input
) {
FileUtils.copyInputStreamToFile(in, objectPath);
}
@ -429,7 +442,7 @@ public class S3DataSegmentFinderTest
throw Throwables.propagate(e);
}
return object;
return new PutObjectResult();
}
@Override

View File

@ -19,6 +19,22 @@
package io.druid.storage.s3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyObjectResult;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.StorageClass;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
@ -28,12 +44,11 @@ import io.druid.java.util.common.MapUtils;
import io.druid.segment.loading.SegmentLoadingException;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.jets3t.service.model.StorageObject;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@ -59,11 +74,17 @@ public class S3DataSegmentMoverTest
@Test
public void testMove() throws Exception
{
MockStorageService mockS3Client = new MockStorageService();
MockAmazonS3Client mockS3Client = new MockAmazonS3Client();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"));
mockS3Client.putObject("main", new S3Object("baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"));
mockS3Client.putObject(
"main",
"baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"
);
mockS3Client.putObject(
"main",
"baseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"
);
DataSegment movedSegment = mover.move(
sourceSegment,
@ -79,11 +100,17 @@ public class S3DataSegmentMoverTest
@Test
public void testMoveNoop() throws Exception
{
MockStorageService mockS3Client = new MockStorageService();
MockAmazonS3Client mockS3Client = new MockAmazonS3Client();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"));
mockS3Client.putObject("archive", new S3Object("targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"));
mockS3Client.putObject(
"archive",
"targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/index.zip"
);
mockS3Client.putObject(
"archive",
"targetBaseKey/test/2013-01-01T00:00:00.000Z_2013-01-02T00:00:00.000Z/1/0/descriptor.json"
);
DataSegment movedSegment = mover.move(
sourceSegment,
@ -100,7 +127,7 @@ public class S3DataSegmentMoverTest
@Test(expected = SegmentLoadingException.class)
public void testMoveException() throws Exception
{
MockStorageService mockS3Client = new MockStorageService();
MockAmazonS3Client mockS3Client = new MockAmazonS3Client();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
mover.move(
@ -112,7 +139,7 @@ public class S3DataSegmentMoverTest
@Test
public void testIgnoresGoneButAlreadyMoved() throws Exception
{
MockStorageService mockS3Client = new MockStorageService();
MockAmazonS3Client mockS3Client = new MockAmazonS3Client();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
mover.move(new DataSegment(
"test",
@ -135,7 +162,7 @@ public class S3DataSegmentMoverTest
@Test(expected = SegmentLoadingException.class)
public void testFailsToMoveMissing() throws Exception
{
MockStorageService mockS3Client = new MockStorageService();
MockAmazonS3Client mockS3Client = new MockAmazonS3Client();
S3DataSegmentMover mover = new S3DataSegmentMover(mockS3Client, new S3DataSegmentPusherConfig());
mover.move(new DataSegment(
"test",
@ -155,15 +182,15 @@ public class S3DataSegmentMoverTest
), ImmutableMap.<String, Object>of("bucket", "DOES NOT EXIST", "baseKey", "baseKey2"));
}
private static class MockStorageService extends RestS3Service
private static class MockAmazonS3Client extends AmazonS3Client
{
Map<String, Set<String>> storage = Maps.newHashMap();
boolean copied = false;
boolean deletedOld = false;
private MockStorageService()
private MockAmazonS3Client()
{
super(null);
super();
}
public boolean didMove()
@ -172,37 +199,68 @@ public class S3DataSegmentMoverTest
}
@Override
public boolean isObjectInBucket(String bucketName, String objectKey)
public AccessControlList getBucketAcl(String bucketName)
{
final AccessControlList acl = new AccessControlList();
acl.setOwner(new Owner("ownerId", "owner"));
acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl));
return acl;
}
@Override
public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest)
{
return new ObjectMetadata();
}
@Override
public boolean doesObjectExist(String bucketName, String objectKey)
{
Set<String> objects = storage.get(bucketName);
return (objects != null && objects.contains(objectKey));
}
@Override
public S3Object[] listObjects(String bucketName, String objectKey, String separator)
public ListObjectsV2Result listObjectsV2(ListObjectsV2Request listObjectsV2Request)
{
if (isObjectInBucket(bucketName, objectKey)) {
final S3Object object = new S3Object(objectKey);
object.setStorageClass(S3Object.STORAGE_CLASS_STANDARD);
return new S3Object[]{object};
final String bucketName = listObjectsV2Request.getBucketName();
final String objectKey = listObjectsV2Request.getPrefix();
if (doesObjectExist(bucketName, objectKey)) {
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucketName);
objectSummary.setKey(objectKey);
objectSummary.setStorageClass(StorageClass.Standard.name());
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setBucketName(bucketName);
result.setPrefix(objectKey);
result.setKeyCount(1);
result.getObjectSummaries().add(objectSummary);
result.setTruncated(true);
return result;
} else {
return new ListObjectsV2Result();
}
return new S3Object[]{};
}
@Override
public Map<String, Object> copyObject(
String sourceBucketName,
String sourceObjectKey,
String destinationBucketName,
StorageObject destinationObject,
boolean replaceMetadata
)
public CopyObjectResult copyObject(CopyObjectRequest copyObjectRequest)
{
final String sourceBucketName = copyObjectRequest.getSourceBucketName();
final String sourceObjectKey = copyObjectRequest.getSourceKey();
final String destinationBucketName = copyObjectRequest.getDestinationBucketName();
final String destinationObjectKey = copyObjectRequest.getDestinationKey();
copied = true;
if (isObjectInBucket(sourceBucketName, sourceObjectKey)) {
this.putObject(destinationBucketName, new S3Object(destinationObject.getKey()));
if (doesObjectExist(sourceBucketName, sourceObjectKey)) {
storage.computeIfAbsent(destinationBucketName, k -> new HashSet<>())
.add(destinationObjectKey);
return new CopyObjectResult();
} else {
final AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentMoverTest");
exception.setErrorCode("NoSuchKey");
exception.setStatusCode(404);
throw exception;
}
return null;
}
@Override
@ -212,14 +270,19 @@ public class S3DataSegmentMoverTest
storage.get(bucket).remove(objectKey);
}
public PutObjectResult putObject(String bucketName, String key)
{
return putObject(bucketName, key, (File) null);
}
@Override
public S3Object putObject(String bucketName, S3Object object)
public PutObjectResult putObject(String bucketName, String key, File file)
{
if (!storage.containsKey(bucketName)) {
storage.put(bucketName, Sets.newHashSet());
}
storage.get(bucketName).add(object.getKey());
return object;
storage.get(bucketName).add(key);
return new PutObjectResult();
}
}
}

View File

@ -19,9 +19,21 @@
package io.druid.storage.s3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import io.druid.java.util.common.FileUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.segment.loading.SegmentLoadingException;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@ -30,15 +42,6 @@ import java.io.OutputStream;
import java.net.URI;
import java.util.Date;
import java.util.zip.GZIPOutputStream;
import org.easymock.EasyMock;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
/**
*
@ -50,26 +53,29 @@ public class S3DataSegmentPullerTest
public TemporaryFolder temporaryFolder = new TemporaryFolder();
@Test
public void testSimpleGetVersion() throws ServiceException, IOException
public void testSimpleGetVersion() throws IOException
{
String bucket = "bucket";
String keyPrefix = "prefix/dir/0";
RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
S3Object object0 = new S3Object();
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucket);
objectSummary.setKey(keyPrefix + "/renames-0.gz");
objectSummary.setLastModified(new Date(0));
object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/renames-0.gz");
object0.setLastModifiedDate(new Date(0));
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setKeyCount(1);
result.getObjectSummaries().add(objectSummary);
EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
.andReturn(object0)
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(result)
.once();
S3DataSegmentPuller puller = new S3DataSegmentPuller(s3Client);
EasyMock.replay(s3Client);
String version = puller.getVersion(URI.create(StringUtils.format("s3://%s/%s", bucket, object0.getKey())));
String version = puller.getVersion(URI.create(StringUtils.format("s3://%s/%s", bucket, objectSummary.getKey())));
EasyMock.verify(s3Client);
@ -77,11 +83,11 @@ public class S3DataSegmentPullerTest
}
@Test
public void testGZUncompress() throws ServiceException, IOException, SegmentLoadingException
public void testGZUncompress() throws IOException, SegmentLoadingException
{
final String bucket = "bucket";
final String keyPrefix = "prefix/dir/0";
final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
final AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
final byte[] value = bucket.getBytes("utf8");
final File tmpFile = temporaryFolder.newFile("gzTest.gz");
@ -91,19 +97,27 @@ public class S3DataSegmentPullerTest
}
final S3Object object0 = new S3Object();
object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/renames-0.gz");
object0.setLastModifiedDate(new Date(0));
object0.setDataInputStream(new FileInputStream(tmpFile));
object0.getObjectMetadata().setLastModified(new Date(0));
object0.setObjectContent(new FileInputStream(tmpFile));
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucket);
objectSummary.setKey(keyPrefix + "/renames-0.gz");
objectSummary.setLastModified(new Date(0));
final ListObjectsV2Result listObjectsResult = new ListObjectsV2Result();
listObjectsResult.setKeyCount(1);
listObjectsResult.getObjectSummaries().add(objectSummary);
final File tmpDir = temporaryFolder.newFolder("gzTestDir");
EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(null)
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(true)
.once();
EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(object0)
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(listObjectsResult)
.once();
EasyMock.expect(s3Client.getObject(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(object0)
@ -126,11 +140,11 @@ public class S3DataSegmentPullerTest
}
@Test
public void testGZUncompressRetries() throws ServiceException, IOException, SegmentLoadingException
public void testGZUncompressRetries() throws IOException, SegmentLoadingException
{
final String bucket = "bucket";
final String keyPrefix = "prefix/dir/0";
final RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
final AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
final byte[] value = bucket.getBytes("utf8");
final File tmpFile = temporaryFolder.newFile("gzTest.gz");
@ -143,25 +157,34 @@ public class S3DataSegmentPullerTest
object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/renames-0.gz");
object0.setLastModifiedDate(new Date(0));
object0.setDataInputStream(new FileInputStream(tmpFile));
object0.getObjectMetadata().setLastModified(new Date(0));
object0.setObjectContent(new FileInputStream(tmpFile));
final S3ObjectSummary objectSummary = new S3ObjectSummary();
objectSummary.setBucketName(bucket);
objectSummary.setKey(keyPrefix + "/renames-0.gz");
objectSummary.setLastModified(new Date(0));
final ListObjectsV2Result listObjectsResult = new ListObjectsV2Result();
listObjectsResult.setKeyCount(1);
listObjectsResult.getObjectSummaries().add(objectSummary);
File tmpDir = temporaryFolder.newFolder("gzTestDir");
S3ServiceException exception = new S3ServiceException();
AmazonS3Exception exception = new AmazonS3Exception("S3DataSegmentPullerTest");
exception.setErrorCode("NoSuchKey");
exception.setResponseCode(404);
EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(null)
exception.setStatusCode(404);
EasyMock.expect(s3Client.doesObjectExist(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(true)
.once();
EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(object0)
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(listObjectsResult)
.once();
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
.andThrow(exception)
.once();
EasyMock.expect(s3Client.getObjectDetails(EasyMock.eq(object0.getBucketName()), EasyMock.eq(object0.getKey())))
.andReturn(object0)
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(listObjectsResult)
.once();
EasyMock.expect(s3Client.getObject(EasyMock.eq(bucket), EasyMock.eq(object0.getKey())))
.andReturn(object0)

View File

@ -19,6 +19,14 @@
package io.druid.storage.s3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.CanonicalGrantee;
import com.amazonaws.services.s3.model.Grant;
import com.amazonaws.services.s3.model.Owner;
import com.amazonaws.services.s3.model.Permission;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -31,14 +39,13 @@ import org.apache.commons.io.IOUtils;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileInputStream;
/**
*/
@ -65,27 +72,38 @@ public class S3DataSegmentPusherTest
@Test
public void testPush() throws Exception
{
RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
Capture<S3Object> capturedS3Object = Capture.newInstance();
final AccessControlList acl = new AccessControlList();
acl.setOwner(new Owner("ownerId", "owner"));
acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl));
EasyMock.expect(s3Client.getBucketAcl(EasyMock.eq("bucket"))).andReturn(acl).once();
EasyMock.expect(s3Client.putObject(EasyMock.anyObject()))
.andReturn(new PutObjectResult())
.once();
EasyMock.expect(s3Client.getBucketAcl(EasyMock.eq("bucket"))).andReturn(acl).once();
Capture<PutObjectRequest> capturedPutRequest = Capture.newInstance();
ValueContainer<String> capturedS3SegmentJson = new ValueContainer<>();
EasyMock.expect(s3Client.putObject(EasyMock.anyString(), EasyMock.capture(capturedS3Object)))
EasyMock.expect(s3Client.putObject(EasyMock.capture(capturedPutRequest)))
.andAnswer(
new IAnswer<S3Object>()
new IAnswer<PutObjectResult>()
{
@Override
public S3Object answer() throws Throwable
public PutObjectResult answer() throws Throwable
{
capturedS3SegmentJson.setValue(
IOUtils.toString(capturedS3Object.getValue().getDataInputStream(), "utf-8")
IOUtils.toString(new FileInputStream(capturedPutRequest.getValue().getFile()), "utf-8")
);
return null;
return new PutObjectResult();
}
}
)
.atLeastOnce();
EasyMock.replay(s3Client);
.once();
EasyMock.replay(s3Client);
S3DataSegmentPusherConfig config = new S3DataSegmentPusherConfig();
config.setBucket("bucket");

View File

@ -19,11 +19,12 @@
package io.druid.storage.s3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import io.druid.java.util.common.StringUtils;
import org.easymock.EasyMock;
import org.jets3t.service.S3ServiceException;
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
import org.jets3t.service.model.S3Object;
import org.junit.Assert;
import org.junit.Test;
@ -35,25 +36,31 @@ public class S3TimestampVersionedDataFinderTest
{
@Test
public void testSimpleLatestVersion() throws S3ServiceException
public void testSimpleLatestVersion()
{
String bucket = "bucket";
String keyPrefix = "prefix/dir/0";
RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
S3Object object0 = new S3Object(), object1 = new S3Object();
S3ObjectSummary object0 = new S3ObjectSummary(), object1 = new S3ObjectSummary();
object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/renames-0.gz");
object0.setLastModifiedDate(new Date(0));
object0.setLastModified(new Date(0));
object1.setBucketName(bucket);
object1.setKey(keyPrefix + "/renames-1.gz");
object1.setLastModifiedDate(new Date(1));
object1.setLastModified(new Date(1));
EasyMock.expect(s3Client.listObjects(EasyMock.eq(bucket), EasyMock.anyString(), EasyMock.<String>isNull())).andReturn(
new S3Object[]{object0, object1}
).once();
final ListObjectsV2Result result = new ListObjectsV2Result();
result.getObjectSummaries().add(object0);
result.getObjectSummaries().add(object1);
result.setKeyCount(2);
result.setTruncated(false);
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(result)
.once();
S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client);
Pattern pattern = Pattern.compile("renames-[0-9]*\\.gz");
@ -71,25 +78,19 @@ public class S3TimestampVersionedDataFinderTest
}
@Test
public void testMissing() throws S3ServiceException
public void testMissing()
{
String bucket = "bucket";
String keyPrefix = "prefix/dir/0";
RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
S3Object object0 = new S3Object(), object1 = new S3Object();
final ListObjectsV2Result result = new ListObjectsV2Result();
result.setKeyCount(0);
result.setTruncated(false);
object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/renames-0.gz");
object0.setLastModifiedDate(new Date(0));
object1.setBucketName(bucket);
object1.setKey(keyPrefix + "/renames-1.gz");
object1.setLastModifiedDate(new Date(1));
EasyMock.expect(s3Client.listObjects(EasyMock.eq(bucket), EasyMock.anyString(), EasyMock.<String>isNull())).andReturn(
null
).once();
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(result)
.once();
S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client);
Pattern pattern = Pattern.compile("renames-[0-9]*\\.gz");
@ -105,21 +106,26 @@ public class S3TimestampVersionedDataFinderTest
}
@Test
public void testFindSelf() throws S3ServiceException
public void testFindSelf()
{
String bucket = "bucket";
String keyPrefix = "prefix/dir/0";
RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
S3Object object0 = new S3Object();
S3ObjectSummary object0 = new S3ObjectSummary();
object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/renames-0.gz");
object0.setLastModifiedDate(new Date(0));
object0.setLastModified(new Date(0));
EasyMock.expect(s3Client.listObjects(EasyMock.eq(bucket), EasyMock.anyString(), EasyMock.<String>isNull())).andReturn(
new S3Object[]{object0}
).once();
final ListObjectsV2Result result = new ListObjectsV2Result();
result.getObjectSummaries().add(object0);
result.setKeyCount(1);
result.setTruncated(false);
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(result)
.once();
S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client);
Pattern pattern = Pattern.compile("renames-[0-9]*\\.gz");
@ -137,21 +143,26 @@ public class S3TimestampVersionedDataFinderTest
}
@Test
public void testFindExact() throws S3ServiceException
public void testFindExact()
{
String bucket = "bucket";
String keyPrefix = "prefix/dir/0";
RestS3Service s3Client = EasyMock.createStrictMock(RestS3Service.class);
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
S3Object object0 = new S3Object();
S3ObjectSummary object0 = new S3ObjectSummary();
object0.setBucketName(bucket);
object0.setKey(keyPrefix + "/renames-0.gz");
object0.setLastModifiedDate(new Date(0));
object0.setLastModified(new Date(0));
EasyMock.expect(s3Client.listObjects(EasyMock.eq(bucket), EasyMock.anyString(), EasyMock.<String>isNull())).andReturn(
new S3Object[]{object0}
).once();
final ListObjectsV2Result result = new ListObjectsV2Result();
result.getObjectSummaries().add(object0);
result.setKeyCount(1);
result.setTruncated(false);
EasyMock.expect(s3Client.listObjectsV2(EasyMock.anyObject(ListObjectsV2Request.class)))
.andReturn(result)
.once();
S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client);

View File

@ -24,6 +24,8 @@ import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSSessionCredentials;
import com.google.common.io.Files;
import io.druid.common.aws.AWSCredentialsConfig;
import io.druid.common.aws.AWSEndpointConfig;
import io.druid.common.aws.AWSProxyConfig;
import io.druid.guice.AWSModule;
import io.druid.metadata.DefaultPasswordProvider;
import org.easymock.EasyMock;
@ -58,7 +60,7 @@ public class TestAWSCredentialsProvider
assertEquals(credentials.getAWSSecretKey(), "secretKeySample");
// try to create
s3Module.getRestS3Service(provider);
s3Module.getAmazonS3Client(provider, new AWSProxyConfig(), new AWSEndpointConfig());
}
@Rule
@ -86,6 +88,6 @@ public class TestAWSCredentialsProvider
assertEquals(sessionCredentials.getSessionToken(), "sessionTokenSample");
// try to create
s3Module.getRestS3Service(provider);
s3Module.getAmazonS3Client(provider, new AWSProxyConfig(), new AWSEndpointConfig());
}
}

View File

@ -56,23 +56,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<!-- override jets3t from hadoop-core -->
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<scope>test</scope>
</dependency>
<!-- override httpclient / httpcore version from jets3t -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@ -100,6 +83,21 @@
</dependency>
<!-- Tests -->
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>

View File

@ -395,7 +395,7 @@ public class DetermineHashedPartitionsJob implements Jobby
public int getPartition(LongWritable interval, BytesWritable text, int numPartitions)
{
if (config.get("mapred.job.tracker").equals("local") || determineIntervals) {
if ("local".equals(config.get("mapred.job.tracker")) || determineIntervals) {
return 0;
} else {
return reducerLookup.get(interval);

View File

@ -31,7 +31,7 @@ import java.util.List;
public class TaskConfig
{
public static final List<String> DEFAULT_DEFAULT_HADOOP_COORDINATES = ImmutableList.of(
"org.apache.hadoop:hadoop-client:2.7.3"
"org.apache.hadoop:hadoop-client:2.8.3"
);
private static final Period DEFAULT_DIRECTORY_LOCK_TIMEOUT = new Period("PT10M");

View File

@ -35,7 +35,6 @@ import javax.annotation.Nullable;
import java.io.File;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLClassLoader;
@ -127,7 +126,6 @@ public abstract class HadoopTask extends AbstractTask
*
* @param toolbox The toolbox to pull the default coordinates from if not present in the task
* @return An isolated URLClassLoader not tied by parent chain to the ApplicationClassLoader
* @throws MalformedURLException from Initialization.getClassLoaderForExtension
*/
protected ClassLoader buildClassLoader(final TaskToolbox toolbox)
{

View File

@ -74,12 +74,12 @@ public class EC2AutoScalerSerdeTest
}
);
final EC2AutoScaler autoScaler = objectMapper.readValue(json, EC2AutoScaler.class);
final EC2AutoScaler autoScaler = (EC2AutoScaler) objectMapper.readValue(json, AutoScaler.class);
verifyAutoScaler(autoScaler);
final EC2AutoScaler roundTripAutoScaler = objectMapper.readValue(
final EC2AutoScaler roundTripAutoScaler = (EC2AutoScaler) objectMapper.readValue(
objectMapper.writeValueAsBytes(autoScaler),
EC2AutoScaler.class
AutoScaler.class
);
verifyAutoScaler(roundTripAutoScaler);

View File

@ -86,7 +86,7 @@ public class JavaScriptWorkerSelectStrategyTest
STRATEGY,
mapper.readValue(
mapper.writeValueAsString(STRATEGY),
JavaScriptWorkerSelectStrategy.class
WorkerSelectStrategy.class
)
);
}
@ -108,7 +108,7 @@ public class JavaScriptWorkerSelectStrategyTest
expectedException.expectCause(CoreMatchers.<Throwable>instanceOf(IllegalStateException.class));
expectedException.expectMessage("JavaScript is disabled");
mapper.readValue(strategyString, JavaScriptWorkerSelectStrategy.class);
mapper.readValue(strategyString, WorkerSelectStrategy.class);
}
@Test

93
pom.xml
View File

@ -68,8 +68,8 @@
<guice.version>4.1.0</guice.version>
<jetty.version>9.3.19.v20170502</jetty.version>
<jersey.version>1.19.3</jersey.version>
<!-- Watch out for Hadoop compatibility when updating to >= 2.5; see https://github.com/druid-io/druid/pull/1669 -->
<jackson.version>2.4.6</jackson.version>
<!-- jackson 2.7.x causes injection error and 2.8.x can't be used because avatica is using 2.6.3 -->
<jackson.version>2.6.7</jackson.version>
<log4j.version>2.5</log4j.version>
<!-- HttpClient has not yet been ported to Netty 4.x -->
<netty3.version>3.10.6.Final</netty3.version>
@ -78,12 +78,10 @@
<netty4.version>4.0.52.Final</netty4.version>
<slf4j.version>1.7.12</slf4j.version>
<!-- If compiling with different hadoop version also modify default hadoop coordinates in TaskConfig.java -->
<hadoop.compile.version>2.7.3</hadoop.compile.version>
<hadoop.compile.version>2.8.3</hadoop.compile.version>
<hive.version>2.0.0</hive.version>
<powermock.version>1.6.6</powermock.version>
<!-- Cannot update to AWS SDK 1.11+ because of Jackson incompatibility.
Need to update Druid to use Jackson 2.6+ -->
<aws.sdk.version>1.10.77</aws.sdk.version>
<aws.sdk.bundle.version>1.11.199</aws.sdk.bundle.version>
<caffeine.version>2.5.5</caffeine.version>
<!-- When upgrading ZK, edit docs and integration tests as well (integration-tests/docker-base/setup.sh) -->
<zookeeper.version>3.4.11</zookeeper.version>
@ -189,49 +187,8 @@
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-ec2</artifactId>
<version>${aws.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-s3</artifactId>
<version>${aws.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>javax.mail</groupId>
<artifactId>mail</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws.sdk.bundle.version}</version>
</dependency>
<dependency>
<groupId>com.ning</groupId>
@ -612,49 +569,15 @@
<artifactId>aether-api</artifactId>
<version>0.9.0.M2</version>
</dependency>
<dependency>
<groupId>net.java.dev.jets3t</groupId>
<artifactId>jets3t</artifactId>
<version>0.9.4</version>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- The htttpcomponents artifacts have non-matching release cadence -->
<!-- Watch out for compatibility issues between aws-java-sdk 1.10.x and httpclient > 4.5.1 and
httpcore > 4.4.3; see https://github.com/druid-io/druid/issues/4456 -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.3</version>
<version>4.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>

View File

@ -74,9 +74,9 @@ public class DefaultLimitSpecTest
//defaults
String json = "{\"type\": \"default\"}";
DefaultLimitSpec spec = mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)),
DefaultLimitSpec.class
DefaultLimitSpec spec = (DefaultLimitSpec) mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)),
LimitSpec.class
);
Assert.assertEquals(
@ -90,9 +90,9 @@ public class DefaultLimitSpecTest
+ " \"columns\":[{\"dimension\":\"d\",\"direction\":\"DESCENDING\", \"dimensionOrder\":\"numeric\"}],\n"
+ " \"limit\":10\n"
+ "}";
spec = mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)),
DefaultLimitSpec.class
spec = (DefaultLimitSpec) mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)),
LimitSpec.class
);
Assert.assertEquals(
new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec("d", OrderByColumnSpec.Direction.DESCENDING,
@ -106,9 +106,9 @@ public class DefaultLimitSpecTest
+ " \"limit\":10\n"
+ "}";
spec = mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)),
DefaultLimitSpec.class
spec = (DefaultLimitSpec) mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)),
LimitSpec.class
);
Assert.assertEquals(
@ -122,9 +122,9 @@ public class DefaultLimitSpecTest
+ " \"columns\":[{\"dimension\":\"d\"}],\n"
+ " \"limit\":10\n"
+ "}";
spec = mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)),
DefaultLimitSpec.class
spec = (DefaultLimitSpec) mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)),
LimitSpec.class
);
Assert.assertEquals(
new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec("d", OrderByColumnSpec.Direction.ASCENDING,
@ -137,9 +137,9 @@ public class DefaultLimitSpecTest
+ " \"columns\":[\"d\"],\n"
+ " \"limit\":10\n"
+ "}";
spec = mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, DefaultLimitSpec.class)),
DefaultLimitSpec.class
spec = (DefaultLimitSpec) mapper.readValue(
mapper.writeValueAsString(mapper.readValue(json, LimitSpec.class)),
LimitSpec.class
);
Assert.assertEquals(
new DefaultLimitSpec(ImmutableList.of(new OrderByColumnSpec("d", OrderByColumnSpec.Direction.ASCENDING,

View File

@ -104,8 +104,8 @@ public class AlphaNumericTopNMetricSpecTest
+ " \"previousStop\": \"test\"\n"
+ "}";
ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), AlphaNumericTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), AlphaNumericTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class);
Assert.assertEquals(expectedMetricSpec, actualMetricSpec);
Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1);
}

View File

@ -44,8 +44,8 @@ public class DimensionTopNMetricSpecTest
+ " \"previousStop\": \"test\"\n"
+ "}";
ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), DimensionTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), DimensionTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class);
Assert.assertEquals(expectedMetricSpec, actualMetricSpec);
Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1);
}
@ -65,8 +65,8 @@ public class DimensionTopNMetricSpecTest
+ " \"previousStop\": \"test\"\n"
+ "}";
ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), DimensionTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), DimensionTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class);
Assert.assertEquals(expectedMetricSpec, actualMetricSpec);
Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1);
}
@ -86,8 +86,8 @@ public class DimensionTopNMetricSpecTest
+ " \"previousStop\": \"test\"\n"
+ "}";
ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), DimensionTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), DimensionTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class);
Assert.assertEquals(expectedMetricSpec, actualMetricSpec);
Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1);
}
@ -107,8 +107,8 @@ public class DimensionTopNMetricSpecTest
+ " \"previousStop\": \"test\"\n"
+ "}";
ObjectMapper jsonMapper = TestHelper.makeJsonMapper();
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), DimensionTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), DimensionTopNMetricSpec.class);
TopNMetricSpec actualMetricSpec = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec, TopNMetricSpec.class)), TopNMetricSpec.class);
TopNMetricSpec actualMetricSpec1 = jsonMapper.readValue(jsonMapper.writeValueAsString(jsonMapper.readValue(jsonSpec1, TopNMetricSpec.class)), TopNMetricSpec.class);
Assert.assertEquals(expectedMetricSpec, actualMetricSpec);
Assert.assertEquals(expectedMetricSpec1, actualMetricSpec1);
}

View File

@ -70,6 +70,10 @@
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>

View File

@ -27,6 +27,8 @@ import com.google.inject.Module;
import com.google.inject.Provides;
import io.druid.common.aws.AWSCredentialsConfig;
import io.druid.common.aws.AWSCredentialsUtils;
import io.druid.common.aws.AWSEndpointConfig;
import io.druid.common.aws.AWSProxyConfig;
/**
*/
@ -36,6 +38,8 @@ public class AWSModule implements Module
public void configure(Binder binder)
{
JsonConfigProvider.bind(binder, "druid.s3", AWSCredentialsConfig.class);
JsonConfigProvider.bind(binder, "druid.s3.proxy", AWSProxyConfig.class);
JsonConfigProvider.bind(binder, "druid.s3.endpoint", AWSEndpointConfig.class);
}
@Provides

View File

@ -23,11 +23,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.net.HttpHeaders;
import io.druid.data.input.impl.prefetch.PrefetchableTextFilesFirehoseFactory;
import io.druid.java.util.common.CompressionUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import org.apache.http.HttpHeaders;
import java.io.IOException;
import java.io.InputStream;

View File

@ -58,6 +58,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response.Status;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
@ -465,13 +466,11 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements Qu
TimeUnit.NANOSECONDS.toMillis(requestTimeNs),
"success",
success
&& result.getResponse().getStatus() == javax.ws.rs.core.Response.Status.OK.getStatusCode()
&& result.getResponse().getStatus() == Status.OK.getStatusCode()
)
)
)
);
}
catch (Exception e) {
log.error(e, "Unable to log query [%s]!", query);

View File

@ -60,7 +60,6 @@ import io.druid.server.initialization.TLSServerConfig;
import io.druid.server.metrics.DataSourceTaskIdHolder;
import io.druid.server.metrics.MetricsModule;
import io.druid.server.metrics.MonitorsConfig;
import org.apache.http.HttpVersion;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
@ -90,6 +89,7 @@ public class JettyServerModule extends JerseyServletModule
private static final Logger log = new Logger(JettyServerModule.class);
private static final AtomicInteger activeConnections = new AtomicInteger();
private static final String HTTP_1_1_STRING = "HTTP/1.1";
@Override
protected void configureServlets()
@ -268,7 +268,7 @@ public class JettyServerModule extends JerseyServletModule
httpsConfiguration.setRequestHeaderSize(config.getMaxRequestHeaderSize());
final ServerConnector connector = new ServerConnector(
server,
new SslConnectionFactory(sslContextFactory, HttpVersion.HTTP_1_1.toString()),
new SslConnectionFactory(sslContextFactory, HTTP_1_1_STRING),
new HttpConnectionFactory(httpsConfiguration)
);
connector.setPort(node.getTlsPort());

View File

@ -21,6 +21,7 @@ package io.druid.query.dimension;
import com.fasterxml.jackson.databind.InjectableValues;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.druid.jackson.DefaultObjectMapper;
@ -70,12 +71,13 @@ public class LookupDimensionSpecTest
public void testSerDesr(DimensionSpec lookupDimSpec) throws IOException
{
ObjectMapper mapper = new DefaultObjectMapper();
mapper.registerSubtypes(new NamedType(LookupDimensionSpec.class, "lookup"));
InjectableValues injectableValues = new InjectableValues.Std().addValue(
LookupReferencesManager.class,
LOOKUP_REF_MANAGER
);
String serLookup = mapper.writeValueAsString(lookupDimSpec);
Assert.assertEquals(lookupDimSpec, mapper.reader(LookupDimensionSpec.class).with(injectableValues).readValue(serLookup));
Assert.assertEquals(lookupDimSpec, mapper.reader(DimensionSpec.class).with(injectableValues).readValue(serLookup));
}
private Object[] parametersForTestSerDesr()