mirror of https://github.com/apache/druid.git
Support server-side encryption for s3 (#5740)
* Support server-side encryption for s3 * fix teamcity * typo * address comments * Refactoring configuration injection * fix doc * fix doc
This commit is contained in:
parent
5cbfb95e1f
commit
67ff7dacbd
|
@ -18,6 +18,9 @@ S3-compatible deep storage is basically either S3 or something like Google Stora
|
|||
|`druid.s3.secretKey`|S3 secret key.|Must be set.|
|
||||
|`druid.storage.bucket`|Bucket to store in.|Must be set.|
|
||||
|`druid.storage.baseKey`|Base key prefix to use, i.e. what directory.|Must be set.|
|
||||
|`druid.storage.sse.type`|Server-side encryption type. Should be one of `s3`, `kms`, and `custom`. See the below [Server-side encryption section](#server-side-encryption) for more details.|None|
|
||||
|`druid.storage.sse.kms.keyId`|AWS KMS key ID. Can be empty if `druid.storage.sse.type` is `kms`.|None|
|
||||
|`druid.storage.sse.custom.base64EncodedKey`|Base64-encoded key. Should be specified if `druid.storage.sse.type` is `custom`.|None|
|
||||
|`druid.s3.disableChunkedEncoding`|Disables chunked encoding. See [AWS document](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/AmazonS3Builder.html#disableChunkedEncoding--) for details.|false|
|
||||
|`druid.s3.enablePathStyleAccess`|Enables path style access. See [AWS document](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/AmazonS3Builder.html#enablePathStyleAccess--) for details.|false|
|
||||
|`druid.s3.forceGlobalBucketAccessEnabled`|Enables global bucket access. See [AWS document](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/services/s3/AmazonS3Builder.html#setForceGlobalBucketAccessEnabled-java.lang.Boolean-) for details.|false|
|
||||
|
@ -28,6 +31,15 @@ S3-compatible deep storage is basically either S3 or something like Google Stora
|
|||
|`druid.s3.proxy.username`|User name to use when connecting through a proxy.|None|
|
||||
|`druid.s3.proxy.password`|Password to use when connecting through a proxy.|None|
|
||||
|
||||
## Server-side encryption
|
||||
|
||||
You can enable [server-side encryption](https://docs.aws.amazon.com/AmazonS3/latest/dev/serv-side-encryption.html) by setting
|
||||
`druid.storage.sse.type` to a supported type of server-side encryption. The current supported types are:
|
||||
|
||||
- s3: [Server-side encryption with S3-managed encryption keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingServerSideEncryption.html)
|
||||
- kms: [Server-side encryption with AWS KMS–Managed Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingKMSEncryption.html)
|
||||
- custom: [Server-side encryption with Customer-Provided Encryption Keys](https://docs.aws.amazon.com/AmazonS3/latest/dev/ServerSideEncryptionCustomerKeys.html)
|
||||
|
||||
## StaticS3Firehose
|
||||
|
||||
This firehose ingests events from a predefined list of S3 objects.
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.firehose.s3;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
|
@ -37,6 +36,7 @@ import io.druid.java.util.common.IAE;
|
|||
import io.druid.java.util.common.IOE;
|
||||
import io.druid.java.util.common.ISE;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.storage.s3.ServerSideEncryptingAmazonS3;
|
||||
import io.druid.storage.s3.S3Utils;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -57,13 +57,13 @@ public class StaticS3FirehoseFactory extends PrefetchableTextFilesFirehoseFactor
|
|||
private static final Logger log = new Logger(StaticS3FirehoseFactory.class);
|
||||
private static final int MAX_LISTING_LENGTH = 1024;
|
||||
|
||||
private final AmazonS3 s3Client;
|
||||
private final ServerSideEncryptingAmazonS3 s3Client;
|
||||
private final List<URI> uris;
|
||||
private final List<URI> prefixes;
|
||||
|
||||
@JsonCreator
|
||||
public StaticS3FirehoseFactory(
|
||||
@JacksonInject("s3Client") AmazonS3 s3Client,
|
||||
@JacksonInject("s3Client") ServerSideEncryptingAmazonS3 s3Client,
|
||||
@JsonProperty("uris") List<URI> uris,
|
||||
@JsonProperty("prefixes") List<URI> prefixes,
|
||||
@JsonProperty("maxCacheCapacityBytes") Long maxCacheCapacityBytes,
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
||||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.SSECustomerKey;
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
|
||||
class CustomServerSideEncryption implements ServerSideEncryption
|
||||
{
|
||||
private final SSECustomerKey key;
|
||||
|
||||
@JsonCreator
|
||||
CustomServerSideEncryption(@JacksonInject S3SSECustomConfig config)
|
||||
{
|
||||
this.key = new SSECustomerKey(config.getBase64EncodedKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutObjectRequest decorate(PutObjectRequest request)
|
||||
{
|
||||
return request.withSSECustomerKey(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetObjectRequest decorate(GetObjectRequest request)
|
||||
{
|
||||
return request.withSSECustomerKey(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetObjectMetadataRequest decorate(GetObjectMetadataRequest request)
|
||||
{
|
||||
return request.withSSECustomerKey(key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CopyObjectRequest decorate(CopyObjectRequest request)
|
||||
{
|
||||
// Note: users might want to use a different key when they copy existing objects. This might additionally need to
|
||||
// manage key history or support updating keys at run time, either of which requires a huge refactoring. We simply
|
||||
// don't support changing keys for now.
|
||||
return request.withSourceSSECustomerKey(key)
|
||||
.withDestinationSSECustomerKey(key);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
|
||||
import com.fasterxml.jackson.annotation.JacksonInject;
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
class KmsServerSideEncryption implements ServerSideEncryption
|
||||
{
|
||||
@Nullable
|
||||
private final String keyId;
|
||||
|
||||
@JsonCreator
|
||||
KmsServerSideEncryption(@JacksonInject S3SSEKmsConfig config)
|
||||
{
|
||||
this.keyId = config.getKeyId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutObjectRequest decorate(PutObjectRequest request)
|
||||
{
|
||||
return request.withSSEAwsKeyManagementParams(
|
||||
keyId == null ? new SSEAwsKeyManagementParams() : new SSEAwsKeyManagementParams(keyId)
|
||||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CopyObjectRequest decorate(CopyObjectRequest request)
|
||||
{
|
||||
return request.withSSEAwsKeyManagementParams(
|
||||
keyId == null ? new SSEAwsKeyManagementParams() : new SSEAwsKeyManagementParams(keyId)
|
||||
);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.druid.storage.s3;
|
||||
|
||||
public class NoopServerSideEncryption implements ServerSideEncryption
|
||||
{
|
||||
}
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Objects;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
|
@ -40,7 +39,7 @@ public class S3DataSegmentArchiver extends S3DataSegmentMover implements DataSeg
|
|||
@Inject
|
||||
public S3DataSegmentArchiver(
|
||||
@Json ObjectMapper mapper,
|
||||
AmazonS3 s3Client,
|
||||
ServerSideEncryptingAmazonS3 s3Client,
|
||||
S3DataSegmentArchiverConfig archiveConfig,
|
||||
S3DataSegmentPusherConfig restoreConfig
|
||||
)
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
import com.amazonaws.services.s3.model.S3ObjectInputStream;
|
||||
|
@ -47,13 +46,13 @@ public class S3DataSegmentFinder implements DataSegmentFinder
|
|||
{
|
||||
private static final Logger log = new Logger(S3DataSegmentFinder.class);
|
||||
|
||||
private final AmazonS3 s3Client;
|
||||
private final ServerSideEncryptingAmazonS3 s3Client;
|
||||
private final ObjectMapper jsonMapper;
|
||||
private final S3DataSegmentPusherConfig config;
|
||||
|
||||
@Inject
|
||||
public S3DataSegmentFinder(
|
||||
AmazonS3 s3Client,
|
||||
ServerSideEncryptingAmazonS3 s3Client,
|
||||
S3DataSegmentPusherConfig config,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.google.inject.Inject;
|
||||
import io.druid.java.util.common.MapUtils;
|
||||
import io.druid.java.util.common.logger.Logger;
|
||||
|
@ -36,12 +35,10 @@ public class S3DataSegmentKiller implements DataSegmentKiller
|
|||
{
|
||||
private static final Logger log = new Logger(S3DataSegmentKiller.class);
|
||||
|
||||
private final AmazonS3 s3Client;
|
||||
private final ServerSideEncryptingAmazonS3 s3Client;
|
||||
|
||||
@Inject
|
||||
public S3DataSegmentKiller(
|
||||
AmazonS3 s3Client
|
||||
)
|
||||
public S3DataSegmentKiller(ServerSideEncryptingAmazonS3 s3Client)
|
||||
{
|
||||
this.s3Client = s3Client;
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
|
@ -49,12 +48,12 @@ public class S3DataSegmentMover implements DataSegmentMover
|
|||
{
|
||||
private static final Logger log = new Logger(S3DataSegmentMover.class);
|
||||
|
||||
private final AmazonS3 s3Client;
|
||||
private final ServerSideEncryptingAmazonS3 s3Client;
|
||||
private final S3DataSegmentPusherConfig config;
|
||||
|
||||
@Inject
|
||||
public S3DataSegmentMover(
|
||||
AmazonS3 s3Client,
|
||||
ServerSideEncryptingAmazonS3 s3Client,
|
||||
S3DataSegmentPusherConfig config
|
||||
)
|
||||
{
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
|
@ -34,7 +33,6 @@ import io.druid.java.util.common.CompressionUtils;
|
|||
import io.druid.java.util.common.FileUtils;
|
||||
import io.druid.java.util.common.IAE;
|
||||
import io.druid.java.util.common.IOE;
|
||||
import io.druid.java.util.common.MapUtils;
|
||||
import io.druid.java.util.common.RE;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
import io.druid.java.util.common.UOE;
|
||||
|
@ -42,7 +40,6 @@ import io.druid.java.util.common.io.Closer;
|
|||
import io.druid.java.util.common.logger.Logger;
|
||||
import io.druid.segment.loading.SegmentLoadingException;
|
||||
import io.druid.segment.loading.URIDataPuller;
|
||||
import io.druid.timeline.DataSegment;
|
||||
|
||||
import javax.tools.FileObject;
|
||||
import java.io.File;
|
||||
|
@ -53,7 +50,6 @@ import java.io.OutputStream;
|
|||
import java.io.Reader;
|
||||
import java.io.Writer;
|
||||
import java.net.URI;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* A data segment puller that also hanldes URI data pulls.
|
||||
|
@ -62,7 +58,110 @@ public class S3DataSegmentPuller implements URIDataPuller
|
|||
{
|
||||
public static final int DEFAULT_RETRY_COUNT = 3;
|
||||
|
||||
private static FileObject buildFileObject(final URI uri, final AmazonS3 s3Client) throws AmazonServiceException
|
||||
public static final String scheme = S3StorageDruidModule.SCHEME;
|
||||
|
||||
private static final Logger log = new Logger(S3DataSegmentPuller.class);
|
||||
|
||||
protected static final String BUCKET = "bucket";
|
||||
protected static final String KEY = "key";
|
||||
|
||||
protected final ServerSideEncryptingAmazonS3 s3Client;
|
||||
|
||||
@Inject
|
||||
public S3DataSegmentPuller(ServerSideEncryptingAmazonS3 s3Client)
|
||||
{
|
||||
this.s3Client = s3Client;
|
||||
}
|
||||
|
||||
FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir) throws SegmentLoadingException
|
||||
{
|
||||
|
||||
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
|
||||
|
||||
if (!isObjectInBucket(s3Coords)) {
|
||||
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
|
||||
}
|
||||
|
||||
try {
|
||||
org.apache.commons.io.FileUtils.forceMkdir(outDir);
|
||||
|
||||
final URI uri = URI.create(StringUtils.format("s3://%s/%s", s3Coords.bucket, s3Coords.path));
|
||||
final ByteSource byteSource = new ByteSource()
|
||||
{
|
||||
@Override
|
||||
public InputStream openStream() throws IOException
|
||||
{
|
||||
try {
|
||||
return buildFileObject(uri).openInputStream();
|
||||
}
|
||||
catch (AmazonServiceException e) {
|
||||
if (e.getCause() != null) {
|
||||
if (S3Utils.S3RETRY.apply(e)) {
|
||||
throw new IOException("Recoverable exception", e);
|
||||
}
|
||||
}
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
if (CompressionUtils.isZip(s3Coords.path)) {
|
||||
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
|
||||
byteSource,
|
||||
outDir,
|
||||
S3Utils.S3RETRY,
|
||||
false
|
||||
);
|
||||
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath());
|
||||
return result;
|
||||
}
|
||||
if (CompressionUtils.isGz(s3Coords.path)) {
|
||||
final String fname = Files.getNameWithoutExtension(uri.getPath());
|
||||
final File outFile = new File(outDir, fname);
|
||||
|
||||
final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile, S3Utils.S3RETRY);
|
||||
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath());
|
||||
return result;
|
||||
}
|
||||
throw new IAE("Do not know how to load file type at [%s]", uri.toString());
|
||||
}
|
||||
catch (Exception e) {
|
||||
try {
|
||||
org.apache.commons.io.FileUtils.deleteDirectory(outDir);
|
||||
}
|
||||
catch (IOException ioe) {
|
||||
log.warn(
|
||||
ioe,
|
||||
"Failed to remove output directory [%s] for segment pulled from [%s]",
|
||||
outDir.getAbsolutePath(),
|
||||
s3Coords.toString()
|
||||
);
|
||||
}
|
||||
throw new SegmentLoadingException(e, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public static URI checkURI(URI uri)
|
||||
{
|
||||
if (uri.getScheme().equalsIgnoreCase(scheme)) {
|
||||
uri = URI.create("s3" + uri.toString().substring(scheme.length()));
|
||||
} else if (!uri.getScheme().equalsIgnoreCase("s3")) {
|
||||
throw new IAE("Don't know how to load scheme for URI [%s]", uri.toString());
|
||||
}
|
||||
return uri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getInputStream(URI uri) throws IOException
|
||||
{
|
||||
try {
|
||||
return buildFileObject(uri).openInputStream();
|
||||
}
|
||||
catch (AmazonServiceException e) {
|
||||
throw new IOE(e, "Could not load URI [%s]", uri);
|
||||
}
|
||||
}
|
||||
|
||||
private FileObject buildFileObject(final URI uri) throws AmazonServiceException
|
||||
{
|
||||
final S3Coords coords = new S3Coords(checkURI(uri));
|
||||
final S3ObjectSummary objectSummary = S3Utils.getSingleObjectSummary(s3Client, coords.bucket, coords.path);
|
||||
|
@ -154,111 +253,6 @@ public class S3DataSegmentPuller implements URIDataPuller
|
|||
};
|
||||
}
|
||||
|
||||
public static final String scheme = S3StorageDruidModule.SCHEME;
|
||||
|
||||
private static final Logger log = new Logger(S3DataSegmentPuller.class);
|
||||
|
||||
protected static final String BUCKET = "bucket";
|
||||
protected static final String KEY = "key";
|
||||
|
||||
protected final AmazonS3 s3Client;
|
||||
|
||||
@Inject
|
||||
public S3DataSegmentPuller(
|
||||
AmazonS3 s3Client
|
||||
)
|
||||
{
|
||||
this.s3Client = s3Client;
|
||||
}
|
||||
|
||||
FileUtils.FileCopyResult getSegmentFiles(final S3Coords s3Coords, final File outDir) throws SegmentLoadingException
|
||||
{
|
||||
|
||||
log.info("Pulling index at path[%s] to outDir[%s]", s3Coords, outDir);
|
||||
|
||||
if (!isObjectInBucket(s3Coords)) {
|
||||
throw new SegmentLoadingException("IndexFile[%s] does not exist.", s3Coords);
|
||||
}
|
||||
|
||||
try {
|
||||
org.apache.commons.io.FileUtils.forceMkdir(outDir);
|
||||
|
||||
final URI uri = URI.create(StringUtils.format("s3://%s/%s", s3Coords.bucket, s3Coords.path));
|
||||
final ByteSource byteSource = new ByteSource()
|
||||
{
|
||||
@Override
|
||||
public InputStream openStream() throws IOException
|
||||
{
|
||||
try {
|
||||
return buildFileObject(uri, s3Client).openInputStream();
|
||||
}
|
||||
catch (AmazonServiceException e) {
|
||||
if (e.getCause() != null) {
|
||||
if (S3Utils.S3RETRY.apply(e)) {
|
||||
throw new IOException("Recoverable exception", e);
|
||||
}
|
||||
}
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
if (CompressionUtils.isZip(s3Coords.path)) {
|
||||
final FileUtils.FileCopyResult result = CompressionUtils.unzip(
|
||||
byteSource,
|
||||
outDir,
|
||||
S3Utils.S3RETRY,
|
||||
false
|
||||
);
|
||||
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outDir.getAbsolutePath());
|
||||
return result;
|
||||
}
|
||||
if (CompressionUtils.isGz(s3Coords.path)) {
|
||||
final String fname = Files.getNameWithoutExtension(uri.getPath());
|
||||
final File outFile = new File(outDir, fname);
|
||||
|
||||
final FileUtils.FileCopyResult result = CompressionUtils.gunzip(byteSource, outFile, S3Utils.S3RETRY);
|
||||
log.info("Loaded %d bytes from [%s] to [%s]", result.size(), s3Coords.toString(), outFile.getAbsolutePath());
|
||||
return result;
|
||||
}
|
||||
throw new IAE("Do not know how to load file type at [%s]", uri.toString());
|
||||
}
|
||||
catch (Exception e) {
|
||||
try {
|
||||
org.apache.commons.io.FileUtils.deleteDirectory(outDir);
|
||||
}
|
||||
catch (IOException ioe) {
|
||||
log.warn(
|
||||
ioe,
|
||||
"Failed to remove output directory [%s] for segment pulled from [%s]",
|
||||
outDir.getAbsolutePath(),
|
||||
s3Coords.toString()
|
||||
);
|
||||
}
|
||||
throw new SegmentLoadingException(e, e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public static URI checkURI(URI uri)
|
||||
{
|
||||
if (uri.getScheme().equalsIgnoreCase(scheme)) {
|
||||
uri = URI.create("s3" + uri.toString().substring(scheme.length()));
|
||||
} else if (!uri.getScheme().equalsIgnoreCase("s3")) {
|
||||
throw new IAE("Don't know how to load scheme for URI [%s]", uri.toString());
|
||||
}
|
||||
return uri;
|
||||
}
|
||||
|
||||
@Override
|
||||
public InputStream getInputStream(URI uri) throws IOException
|
||||
{
|
||||
try {
|
||||
return buildFileObject(uri, s3Client).openInputStream();
|
||||
}
|
||||
catch (AmazonServiceException e) {
|
||||
throw new IOE(e, "Could not load URI [%s]", uri);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Predicate<Throwable> shouldRetryPredicate()
|
||||
{
|
||||
|
@ -343,16 +337,6 @@ public class S3DataSegmentPuller implements URIDataPuller
|
|||
this.path = path;
|
||||
}
|
||||
|
||||
public S3Coords(DataSegment segment)
|
||||
{
|
||||
Map<String, Object> loadSpec = segment.getLoadSpec();
|
||||
bucket = MapUtils.getString(loadSpec, BUCKET);
|
||||
path = MapUtils.getString(loadSpec, KEY);
|
||||
if (path.startsWith("/")) {
|
||||
path = path.substring(1);
|
||||
}
|
||||
}
|
||||
|
||||
public S3Coords(String bucket, String key)
|
||||
{
|
||||
this.bucket = bucket;
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.base.Throwables;
|
||||
|
@ -45,12 +44,16 @@ public class S3DataSegmentPusher implements DataSegmentPusher
|
|||
{
|
||||
private static final EmittingLogger log = new EmittingLogger(S3DataSegmentPusher.class);
|
||||
|
||||
private final AmazonS3 s3Client;
|
||||
private final ServerSideEncryptingAmazonS3 s3Client;
|
||||
private final S3DataSegmentPusherConfig config;
|
||||
private final ObjectMapper jsonMapper;
|
||||
|
||||
@Inject
|
||||
public S3DataSegmentPusher(AmazonS3 s3Client, S3DataSegmentPusherConfig config, ObjectMapper jsonMapper)
|
||||
public S3DataSegmentPusher(
|
||||
ServerSideEncryptingAmazonS3 s3Client,
|
||||
S3DataSegmentPusherConfig config,
|
||||
ObjectMapper jsonMapper
|
||||
)
|
||||
{
|
||||
this.s3Client = s3Client;
|
||||
this.config = config;
|
||||
|
@ -104,9 +107,8 @@ public class S3DataSegmentPusher implements DataSegmentPusher
|
|||
try {
|
||||
return S3Utils.retryS3Operation(
|
||||
() -> {
|
||||
uploadFileIfPossible(s3Client, config.getBucket(), s3Path, zipOutFile);
|
||||
uploadFileIfPossible(config.getBucket(), s3Path, zipOutFile);
|
||||
uploadFileIfPossible(
|
||||
s3Client,
|
||||
config.getBucket(),
|
||||
S3Utils.descriptorPathForSegmentPath(s3Path),
|
||||
descriptorFile
|
||||
|
@ -155,7 +157,7 @@ public class S3DataSegmentPusher implements DataSegmentPusher
|
|||
);
|
||||
}
|
||||
|
||||
private void uploadFileIfPossible(AmazonS3 s3Client, String bucket, String key, File file)
|
||||
private void uploadFileIfPossible(String bucket, String key, File file)
|
||||
{
|
||||
final PutObjectRequest indexFilePutRequest = new PutObjectRequest(bucket, key, file);
|
||||
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class S3SSECustomConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private String base64EncodedKey;
|
||||
|
||||
public String getBase64EncodedKey()
|
||||
{
|
||||
return base64EncodedKey;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class S3SSEKmsConfig
|
||||
{
|
||||
@JsonProperty
|
||||
private String keyId;
|
||||
|
||||
public String getKeyId()
|
||||
{
|
||||
return keyId;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
|
||||
class S3ServerSideEncryption implements ServerSideEncryption
|
||||
{
|
||||
@Override
|
||||
public PutObjectRequest decorate(PutObjectRequest request)
|
||||
{
|
||||
final ObjectMetadata objectMetadata = request.getMetadata() == null ?
|
||||
new ObjectMetadata() :
|
||||
request.getMetadata().clone();
|
||||
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
||||
return request.withMetadata(objectMetadata);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CopyObjectRequest decorate(CopyObjectRequest request)
|
||||
{
|
||||
final ObjectMetadata objectMetadata = request.getNewObjectMetadata() == null ?
|
||||
new ObjectMetadata() :
|
||||
request.getNewObjectMetadata().clone();
|
||||
objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION);
|
||||
return request.withNewObjectMetadata(objectMetadata);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonCreator;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
/**
|
||||
* General configurations for Amazon S3 storage.
|
||||
*/
|
||||
public class S3StorageConfig
|
||||
{
|
||||
/**
|
||||
* Server-side encryption type. We use a short name to match the configuration prefix with {@link S3SSEKmsConfig} and
|
||||
* {@link S3SSECustomConfig}.
|
||||
*
|
||||
* @see S3StorageDruidModule#configure
|
||||
*/
|
||||
@JsonProperty("sse")
|
||||
private final ServerSideEncryption serverSideEncryption;
|
||||
|
||||
@JsonCreator
|
||||
public S3StorageConfig(
|
||||
@JsonProperty("sse") ServerSideEncryption serverSideEncryption
|
||||
)
|
||||
{
|
||||
this.serverSideEncryption = serverSideEncryption == null ? new NoopServerSideEncryption() : serverSideEncryption;
|
||||
}
|
||||
|
||||
@JsonProperty("sse")
|
||||
public ServerSideEncryption getServerSideEncryption()
|
||||
{
|
||||
return serverSideEncryption;
|
||||
}
|
||||
}
|
|
@ -23,7 +23,6 @@ import com.amazonaws.ClientConfiguration;
|
|||
import com.amazonaws.ClientConfigurationFactory;
|
||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
|
||||
import com.fasterxml.jackson.core.Version;
|
||||
|
@ -103,6 +102,9 @@ public class S3StorageDruidModule implements DruidModule
|
|||
Binders.dataSegmentFinderBinder(binder).addBinding("s3").to(S3DataSegmentFinder.class).in(LazySingleton.class);
|
||||
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentPusherConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.storage", S3DataSegmentArchiverConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.storage", S3StorageConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.storage.sse.kms", S3SSEKmsConfig.class);
|
||||
JsonConfigProvider.bind(binder, "druid.storage.sse.custom", S3SSECustomConfig.class);
|
||||
|
||||
Binders.taskLogsBinder(binder).addBinding("s3").to(S3TaskLogs.class);
|
||||
JsonConfigProvider.bind(binder, "druid.indexer.logs", S3TaskLogsConfig.class);
|
||||
|
@ -111,11 +113,12 @@ public class S3StorageDruidModule implements DruidModule
|
|||
|
||||
@Provides
|
||||
@LazySingleton
|
||||
public AmazonS3 getAmazonS3Client(
|
||||
public ServerSideEncryptingAmazonS3 getAmazonS3Client(
|
||||
AWSCredentialsProvider provider,
|
||||
AWSProxyConfig proxyConfig,
|
||||
AWSEndpointConfig endpointConfig,
|
||||
AWSClientConfig clientConfig
|
||||
AWSClientConfig clientConfig,
|
||||
S3StorageConfig storageConfig
|
||||
)
|
||||
{
|
||||
final ClientConfiguration configuration = new ClientConfigurationFactory().getConfig();
|
||||
|
@ -133,7 +136,10 @@ public class S3StorageDruidModule implements DruidModule
|
|||
);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
return new ServerSideEncryptingAmazonS3(
|
||||
builder.build(),
|
||||
storageConfig.getServerSideEncryption()
|
||||
);
|
||||
}
|
||||
|
||||
private static ClientConfiguration setProxyConfig(ClientConfiguration conf, AWSProxyConfig proxyConfig)
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
|
@ -44,14 +43,14 @@ public class S3TaskLogs implements TaskLogs
|
|||
{
|
||||
private static final Logger log = new Logger(S3TaskLogs.class);
|
||||
|
||||
private final AmazonS3 service;
|
||||
private final ServerSideEncryptingAmazonS3 service;
|
||||
private final S3TaskLogsConfig config;
|
||||
|
||||
@Inject
|
||||
public S3TaskLogs(S3TaskLogsConfig config, AmazonS3 service)
|
||||
public S3TaskLogs(ServerSideEncryptingAmazonS3 service, S3TaskLogsConfig config)
|
||||
{
|
||||
this.config = config;
|
||||
this.service = service;
|
||||
this.config = config;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
import com.google.common.base.Throwables;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -41,7 +40,7 @@ public class S3TimestampVersionedDataFinder extends S3DataSegmentPuller implemen
|
|||
private static final int MAX_LISTING_KEYS = 1000;
|
||||
|
||||
@Inject
|
||||
public S3TimestampVersionedDataFinder(AmazonS3 s3Client)
|
||||
public S3TimestampVersionedDataFinder(ServerSideEncryptingAmazonS3 s3Client)
|
||||
{
|
||||
super(s3Client);
|
||||
}
|
||||
|
|
|
@ -20,7 +20,6 @@
|
|||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.AccessControlList;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.CanonicalGrantee;
|
||||
|
@ -83,7 +82,11 @@ public class S3Utils
|
|||
return RetryUtils.retry(f, S3RETRY, maxTries);
|
||||
}
|
||||
|
||||
static boolean isObjectInBucketIgnoringPermission(AmazonS3 s3Client, String bucketName, String objectKey)
|
||||
static boolean isObjectInBucketIgnoringPermission(
|
||||
ServerSideEncryptingAmazonS3 s3Client,
|
||||
String bucketName,
|
||||
String objectKey
|
||||
)
|
||||
{
|
||||
try {
|
||||
return s3Client.doesObjectExist(bucketName, objectKey);
|
||||
|
@ -99,7 +102,7 @@ public class S3Utils
|
|||
}
|
||||
|
||||
public static Iterator<S3ObjectSummary> objectSummaryIterator(
|
||||
final AmazonS3 s3Client,
|
||||
final ServerSideEncryptingAmazonS3 s3Client,
|
||||
final String bucket,
|
||||
final String prefix,
|
||||
final int numMaxKeys
|
||||
|
@ -191,7 +194,7 @@ public class S3Utils
|
|||
return filename;
|
||||
}
|
||||
|
||||
static AccessControlList grantFullControlToBucketOwner(AmazonS3 s3Client, String bucket)
|
||||
static AccessControlList grantFullControlToBucketOwner(ServerSideEncryptingAmazonS3 s3Client, String bucket)
|
||||
{
|
||||
final AccessControlList acl = s3Client.getBucketAcl(bucket);
|
||||
acl.grantAllPermissions(new Grant(new CanonicalGrantee(acl.getOwner().getId()), Permission.FullControl));
|
||||
|
@ -240,7 +243,7 @@ public class S3Utils
|
|||
* @param bucket s3 bucket
|
||||
* @param key unique key for the object to be retrieved
|
||||
*/
|
||||
public static S3ObjectSummary getSingleObjectSummary(AmazonS3 s3Client, String bucket, String key)
|
||||
public static S3ObjectSummary getSingleObjectSummary(ServerSideEncryptingAmazonS3 s3Client, String bucket, String key)
|
||||
{
|
||||
final ListObjectsV2Request request = new ListObjectsV2Request()
|
||||
.withBucketName(bucket)
|
||||
|
|
|
@ -0,0 +1,121 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3;
|
||||
import com.amazonaws.services.s3.model.AccessControlList;
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
||||
import com.amazonaws.services.s3.model.CopyObjectResult;
|
||||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||
import com.amazonaws.services.s3.model.S3Object;
|
||||
import io.druid.java.util.common.StringUtils;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.InputStream;
|
||||
|
||||
/**
|
||||
* {@link AmazonS3} wrapper with {@link ServerSideEncryption}. Every {@link AmazonS3#putObject},
|
||||
* {@link AmazonS3#copyObject}, {@link AmazonS3#getObject}, and {@link AmazonS3#getObjectMetadata} methods should be
|
||||
* wrapped using ServerSideEncryption.
|
||||
*
|
||||
* Additional methods can be added to this class if needed, but subclassing AmazonS3Client is discouraged to reduce
|
||||
* human mistakes like some methods are not encoded properly.
|
||||
*/
|
||||
public class ServerSideEncryptingAmazonS3
|
||||
{
|
||||
private final AmazonS3 amazonS3;
|
||||
private final ServerSideEncryption serverSideEncryption;
|
||||
|
||||
public ServerSideEncryptingAmazonS3(AmazonS3 amazonS3, ServerSideEncryption serverSideEncryption)
|
||||
{
|
||||
this.amazonS3 = amazonS3;
|
||||
this.serverSideEncryption = serverSideEncryption;
|
||||
}
|
||||
|
||||
public boolean doesObjectExist(String bucket, String objectName)
|
||||
{
|
||||
return amazonS3.doesObjectExist(bucket, objectName);
|
||||
}
|
||||
|
||||
public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request)
|
||||
{
|
||||
return amazonS3.listObjectsV2(request);
|
||||
}
|
||||
|
||||
public AccessControlList getBucketAcl(String bucket)
|
||||
{
|
||||
return amazonS3.getBucketAcl(bucket);
|
||||
}
|
||||
|
||||
public ObjectMetadata getObjectMetadata(String bucket, String key)
|
||||
{
|
||||
final GetObjectMetadataRequest getObjectMetadataRequest = serverSideEncryption.decorate(
|
||||
new GetObjectMetadataRequest(bucket, key)
|
||||
);
|
||||
return amazonS3.getObjectMetadata(getObjectMetadataRequest);
|
||||
}
|
||||
|
||||
public S3Object getObject(String bucket, String key)
|
||||
{
|
||||
return getObject(new GetObjectRequest(bucket, key));
|
||||
}
|
||||
|
||||
public S3Object getObject(GetObjectRequest request)
|
||||
{
|
||||
return amazonS3.getObject(serverSideEncryption.decorate(request));
|
||||
}
|
||||
|
||||
public PutObjectResult putObject(String bucket, String key, String content)
|
||||
{
|
||||
final InputStream in = new ByteArrayInputStream(StringUtils.toUtf8(content));
|
||||
return putObject(new PutObjectRequest(bucket, key, in, new ObjectMetadata()));
|
||||
}
|
||||
|
||||
public PutObjectResult putObject(String bucket, String key, File file)
|
||||
{
|
||||
return putObject(new PutObjectRequest(bucket, key, file));
|
||||
}
|
||||
|
||||
public PutObjectResult putObject(String bucket, String key, InputStream in, ObjectMetadata objectMetadata)
|
||||
{
|
||||
return putObject(new PutObjectRequest(bucket, key, in, objectMetadata));
|
||||
}
|
||||
|
||||
public PutObjectResult putObject(PutObjectRequest request)
|
||||
{
|
||||
return amazonS3.putObject(serverSideEncryption.decorate(request));
|
||||
}
|
||||
|
||||
public CopyObjectResult copyObject(CopyObjectRequest request)
|
||||
{
|
||||
return amazonS3.copyObject(serverSideEncryption.decorate(request));
|
||||
}
|
||||
|
||||
public void deleteObject(String bucket, String key)
|
||||
{
|
||||
amazonS3.deleteObject(bucket, key);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
/*
|
||||
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Metamarkets licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
||||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.PutObjectRequest;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes;
|
||||
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
|
||||
import com.fasterxml.jackson.annotation.JsonTypeInfo;
|
||||
|
||||
/**
|
||||
* Server-side encryption decorator for Amazon S3.
|
||||
*/
|
||||
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
|
||||
@JsonSubTypes(value = {
|
||||
@Type(name = "noop", value = NoopServerSideEncryption.class),
|
||||
@Type(name = "s3", value = S3ServerSideEncryption.class),
|
||||
@Type(name = "kms", value = KmsServerSideEncryption.class),
|
||||
@Type(name = "custom", value = CustomServerSideEncryption.class)
|
||||
})
|
||||
public interface ServerSideEncryption
|
||||
{
|
||||
default PutObjectRequest decorate(PutObjectRequest request)
|
||||
{
|
||||
return request;
|
||||
}
|
||||
|
||||
default GetObjectRequest decorate(GetObjectRequest request)
|
||||
{
|
||||
return request;
|
||||
}
|
||||
|
||||
default GetObjectMetadataRequest decorate(GetObjectMetadataRequest request)
|
||||
{
|
||||
return request;
|
||||
}
|
||||
|
||||
default CopyObjectRequest decorate(CopyObjectRequest request)
|
||||
{
|
||||
return request;
|
||||
}
|
||||
}
|
|
@ -34,6 +34,8 @@ import com.google.inject.Guice;
|
|||
import com.google.inject.Injector;
|
||||
import com.google.inject.Provides;
|
||||
import io.druid.initialization.DruidModule;
|
||||
import io.druid.storage.s3.ServerSideEncryptingAmazonS3;
|
||||
import io.druid.storage.s3.NoopServerSideEncryption;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -45,7 +47,10 @@ import java.util.List;
|
|||
*/
|
||||
public class StaticS3FirehoseFactoryTest
|
||||
{
|
||||
private static final AmazonS3Client SERVICE = new AmazonS3Client();
|
||||
private static final ServerSideEncryptingAmazonS3 SERVICE = new ServerSideEncryptingAmazonS3(
|
||||
new AmazonS3Client(),
|
||||
new NoopServerSideEncryption()
|
||||
);
|
||||
|
||||
@Test
|
||||
public void testSerde() throws Exception
|
||||
|
@ -105,7 +110,7 @@ public class StaticS3FirehoseFactoryTest
|
|||
}
|
||||
|
||||
@Provides
|
||||
public AmazonS3 getAmazonS3Client()
|
||||
public ServerSideEncryptingAmazonS3 getAmazonS3Client()
|
||||
{
|
||||
return SERVICE;
|
||||
}
|
||||
|
|
|
@ -65,7 +65,10 @@ public class S3DataSegmentArchiverTest
|
|||
}
|
||||
};
|
||||
private static final S3DataSegmentPusherConfig PUSHER_CONFIG = new S3DataSegmentPusherConfig();
|
||||
private static final AmazonS3Client S3_SERVICE = EasyMock.createStrictMock(AmazonS3Client.class);
|
||||
private static final ServerSideEncryptingAmazonS3 S3_SERVICE = new ServerSideEncryptingAmazonS3(
|
||||
EasyMock.createStrictMock(AmazonS3Client.class),
|
||||
new NoopServerSideEncryption()
|
||||
);
|
||||
private static final S3DataSegmentPuller PULLER = new S3DataSegmentPuller(S3_SERVICE);
|
||||
private static final DataSegment SOURCE_SEGMENT = DataSegment
|
||||
.builder()
|
||||
|
|
|
@ -22,6 +22,7 @@ package io.druid.storage.s3;
|
|||
import com.amazonaws.AmazonServiceException;
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.GetObjectRequest;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
|
@ -114,7 +115,7 @@ public class S3DataSegmentFinderTest
|
|||
@Rule
|
||||
public final TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
MockAmazonS3Client mockS3Client;
|
||||
ServerSideEncryptingAmazonS3 mockS3Client;
|
||||
S3DataSegmentPusherConfig config;
|
||||
|
||||
private String bucket;
|
||||
|
@ -350,14 +351,14 @@ public class S3DataSegmentFinderTest
|
|||
return S3Utils.descriptorPathForSegmentPath(String.valueOf(segment.getLoadSpec().get("key")));
|
||||
}
|
||||
|
||||
private static class MockAmazonS3Client extends AmazonS3Client
|
||||
private static class MockAmazonS3Client extends ServerSideEncryptingAmazonS3
|
||||
{
|
||||
private final File baseDir;
|
||||
private final Map<String, Map<String, ObjectMetadata>> storage = Maps.newHashMap();
|
||||
|
||||
public MockAmazonS3Client(File baseDir)
|
||||
{
|
||||
super();
|
||||
super(new AmazonS3Client(), new NoopServerSideEncryption());
|
||||
this.baseDir = baseDir;
|
||||
}
|
||||
|
||||
|
@ -462,6 +463,12 @@ public class S3DataSegmentFinderTest
|
|||
return storageObject;
|
||||
}
|
||||
|
||||
@Override
|
||||
public S3Object getObject(GetObjectRequest request)
|
||||
{
|
||||
return getObject(request.getBucketName(), request.getKey());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PutObjectResult putObject(String bucketName, String key, String data)
|
||||
{
|
||||
|
|
|
@ -25,11 +25,9 @@ import com.amazonaws.services.s3.model.AmazonS3Exception;
|
|||
import com.amazonaws.services.s3.model.CanonicalGrantee;
|
||||
import com.amazonaws.services.s3.model.CopyObjectRequest;
|
||||
import com.amazonaws.services.s3.model.CopyObjectResult;
|
||||
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
|
||||
import com.amazonaws.services.s3.model.Grant;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.Owner;
|
||||
import com.amazonaws.services.s3.model.Permission;
|
||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||
|
@ -182,7 +180,7 @@ public class S3DataSegmentMoverTest
|
|||
), ImmutableMap.<String, Object>of("bucket", "DOES NOT EXIST", "baseKey", "baseKey2"));
|
||||
}
|
||||
|
||||
private static class MockAmazonS3Client extends AmazonS3Client
|
||||
private static class MockAmazonS3Client extends ServerSideEncryptingAmazonS3
|
||||
{
|
||||
Map<String, Set<String>> storage = Maps.newHashMap();
|
||||
boolean copied = false;
|
||||
|
@ -190,7 +188,7 @@ public class S3DataSegmentMoverTest
|
|||
|
||||
private MockAmazonS3Client()
|
||||
{
|
||||
super();
|
||||
super(new AmazonS3Client(), new NoopServerSideEncryption());
|
||||
}
|
||||
|
||||
public boolean didMove()
|
||||
|
@ -207,12 +205,6 @@ public class S3DataSegmentMoverTest
|
|||
return acl;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ObjectMetadata getObjectMetadata(GetObjectMetadataRequest getObjectMetadataRequest)
|
||||
{
|
||||
return new ObjectMetadata();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean doesObjectExist(String bucketName, String objectKey)
|
||||
{
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.AmazonS3Exception;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
|
@ -48,7 +47,6 @@ import java.util.zip.GZIPOutputStream;
|
|||
*/
|
||||
public class S3DataSegmentPullerTest
|
||||
{
|
||||
|
||||
@Rule
|
||||
public TemporaryFolder temporaryFolder = new TemporaryFolder();
|
||||
|
||||
|
@ -57,7 +55,7 @@ public class S3DataSegmentPullerTest
|
|||
{
|
||||
String bucket = "bucket";
|
||||
String keyPrefix = "prefix/dir/0";
|
||||
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
|
||||
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||
|
||||
final S3ObjectSummary objectSummary = new S3ObjectSummary();
|
||||
objectSummary.setBucketName(bucket);
|
||||
|
@ -87,7 +85,7 @@ public class S3DataSegmentPullerTest
|
|||
{
|
||||
final String bucket = "bucket";
|
||||
final String keyPrefix = "prefix/dir/0";
|
||||
final AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
|
||||
final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||
final byte[] value = bucket.getBytes("utf8");
|
||||
|
||||
final File tmpFile = temporaryFolder.newFile("gzTest.gz");
|
||||
|
@ -144,7 +142,7 @@ public class S3DataSegmentPullerTest
|
|||
{
|
||||
final String bucket = "bucket";
|
||||
final String keyPrefix = "prefix/dir/0";
|
||||
final AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
|
||||
final ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||
final byte[] value = bucket.getBytes("utf8");
|
||||
|
||||
final File tmpFile = temporaryFolder.newFile("gzTest.gz");
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.AccessControlList;
|
||||
import com.amazonaws.services.s3.model.CanonicalGrantee;
|
||||
import com.amazonaws.services.s3.model.Grant;
|
||||
|
@ -83,7 +82,7 @@ public class S3DataSegmentPusherTest
|
|||
|
||||
private void testPushInternal(boolean useUniquePath, String matcher) throws Exception
|
||||
{
|
||||
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
|
||||
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||
|
||||
final AccessControlList acl = new AccessControlList();
|
||||
acl.setOwner(new Owner("ownerId", "owner"));
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
|
||||
package io.druid.storage.s3;
|
||||
|
||||
import com.amazonaws.services.s3.AmazonS3Client;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Request;
|
||||
import com.amazonaws.services.s3.model.ListObjectsV2Result;
|
||||
import com.amazonaws.services.s3.model.S3ObjectSummary;
|
||||
|
@ -40,7 +39,7 @@ public class S3TimestampVersionedDataFinderTest
|
|||
{
|
||||
String bucket = "bucket";
|
||||
String keyPrefix = "prefix/dir/0";
|
||||
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
|
||||
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||
|
||||
S3ObjectSummary object0 = new S3ObjectSummary(), object1 = new S3ObjectSummary();
|
||||
|
||||
|
@ -82,7 +81,7 @@ public class S3TimestampVersionedDataFinderTest
|
|||
{
|
||||
String bucket = "bucket";
|
||||
String keyPrefix = "prefix/dir/0";
|
||||
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
|
||||
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||
|
||||
final ListObjectsV2Result result = new ListObjectsV2Result();
|
||||
result.setKeyCount(0);
|
||||
|
@ -110,7 +109,7 @@ public class S3TimestampVersionedDataFinderTest
|
|||
{
|
||||
String bucket = "bucket";
|
||||
String keyPrefix = "prefix/dir/0";
|
||||
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
|
||||
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||
|
||||
S3ObjectSummary object0 = new S3ObjectSummary();
|
||||
|
||||
|
@ -147,7 +146,7 @@ public class S3TimestampVersionedDataFinderTest
|
|||
{
|
||||
String bucket = "bucket";
|
||||
String keyPrefix = "prefix/dir/0";
|
||||
AmazonS3Client s3Client = EasyMock.createStrictMock(AmazonS3Client.class);
|
||||
ServerSideEncryptingAmazonS3 s3Client = EasyMock.createStrictMock(ServerSideEncryptingAmazonS3.class);
|
||||
|
||||
S3ObjectSummary object0 = new S3ObjectSummary();
|
||||
|
||||
|
@ -165,10 +164,8 @@ public class S3TimestampVersionedDataFinderTest
|
|||
.once();
|
||||
S3TimestampVersionedDataFinder finder = new S3TimestampVersionedDataFinder(s3Client);
|
||||
|
||||
|
||||
EasyMock.replay(s3Client);
|
||||
|
||||
|
||||
URI latest = finder.getLatestVersion(URI.create(StringUtils.format("s3://%s/%s", bucket, object0.getKey())), null);
|
||||
|
||||
EasyMock.verify(s3Client);
|
||||
|
|
|
@ -64,7 +64,13 @@ public class TestAWSCredentialsProvider
|
|||
assertEquals(credentials.getAWSSecretKey(), "secretKeySample");
|
||||
|
||||
// try to create
|
||||
s3Module.getAmazonS3Client(provider, new AWSProxyConfig(), new AWSEndpointConfig(), new AWSClientConfig());
|
||||
s3Module.getAmazonS3Client(
|
||||
provider,
|
||||
new AWSProxyConfig(),
|
||||
new AWSEndpointConfig(),
|
||||
new AWSClientConfig(),
|
||||
new S3StorageConfig(new NoopServerSideEncryption())
|
||||
);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -89,6 +95,12 @@ public class TestAWSCredentialsProvider
|
|||
assertEquals(sessionCredentials.getSessionToken(), "sessionTokenSample");
|
||||
|
||||
// try to create
|
||||
s3Module.getAmazonS3Client(provider, new AWSProxyConfig(), new AWSEndpointConfig(), new AWSClientConfig());
|
||||
s3Module.getAmazonS3Client(
|
||||
provider,
|
||||
new AWSProxyConfig(),
|
||||
new AWSEndpointConfig(),
|
||||
new AWSClientConfig(),
|
||||
new S3StorageConfig(new NoopServerSideEncryption())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue