From 839b690ed5edc2ac4984640d58c005bb63cd8a07 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Sat, 11 Feb 2017 13:59:03 -0800 Subject: [PATCH] HADOOP-13075. Add support for SSE-KMS and SSE-C in s3a filesystem. (Steve Moist via lei) --- hadoop-tools/hadoop-aws/pom.xml | 2 + .../org/apache/hadoop/fs/s3a/Constants.java | 22 ++- .../hadoop/fs/s3a/S3ABlockOutputStream.java | 1 + .../hadoop/fs/s3a/S3AEncryptionMethods.java | 61 +++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 131 +++++++++++++-- .../apache/hadoop/fs/s3a/S3AInputStream.java | 24 ++- .../org/apache/hadoop/fs/s3a/S3AUtils.java | 10 ++ .../hadoop/fs/s3a/S3ObjectAttributes.java | 59 +++++++ .../site/markdown/tools/hadoop-aws/index.md | 29 +++- ...on.java => AbstractTestS3AEncryption.java} | 47 +++++- ...TestS3AEncryptionAlgorithmPropagation.java | 76 --------- ...ITestS3AEncryptionAlgorithmValidation.java | 152 ++++++++++++++++++ .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 90 +++++++++++ ...estS3AEncryptionSSECBlockOutputStream.java | 46 ++++++ .../ITestS3AEncryptionSSEKMSDefaultKey.java | 57 +++++++ ...TestS3AEncryptionSSEKMSUserDefinedKey.java | 48 ++++++ ...SSEKMSUserDefinedKeyBlockOutputStream.java | 52 ++++++ .../fs/s3a/ITestS3AEncryptionSSES3.java | 43 +++++ ...tS3AEncryptionSSES3BlockOutputStream.java} | 11 +- .../hadoop/fs/s3a/TestS3AGetFileStatus.java | 57 +++++-- 20 files changed, 904 insertions(+), 114 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java create mode 100644 hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/{ITestS3AEncryption.java => AbstractTestS3AEncryption.java} (66%) delete mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSECBlockOutputStream.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream.java create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java rename hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/{ITestS3AEncryptionBlockOutputStream.java => ITestS3AEncryptionSSES3BlockOutputStream.java} (77%) diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 6fd503ce99b..1f64b022293 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -183,6 +183,7 @@ **/ITestJets3tNativeS3FileSystemContract.java **/ITest*Root*.java **/ITestS3AFileContextStatistics.java + **/ITestS3AEncryptionSSE*.java **/ITestS3AHuge*.java @@ -211,6 +212,7 @@ **/ITest*Root*.java **/ITestS3AFileContextStatistics.java **/ITestS3AHuge*.java + **/ITestS3AEncryptionSSE*.java diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 45d974c7ed1..414f9517939 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -216,17 +216,28 @@ public final class Constants { "fs.s3a.multipart.purge.age"; public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400; - // s3 server-side encryption + // s3 server-side encryption, see S3AEncryptionMethods for valid options public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = "fs.s3a.server-side-encryption-algorithm"; /** * The standard encryption algorithm AWS supports. * Different implementations may support others (or none). + * Use the S3AEncryptionMethods instead when configuring + * which Server Side Encryption to use. */ + @Deprecated public static final String SERVER_SIDE_ENCRYPTION_AES256 = "AES256"; + /** + * Used to specify which AWS KMS key to use if + * SERVER_SIDE_ENCRYPTION_ALGORITHM is AWS_KMS (will default to aws/s3 + * master key if left blank) or with SSE_C, the actual AES 256 key. + */ + public static final String SERVER_SIDE_ENCRYPTION_KEY = + "fs.s3a.server-side-encryption-key"; + //override signature algorithm used for signing requests public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm"; @@ -296,4 +307,13 @@ public final class Constants { */ @InterfaceAudience.Private public static final int MAX_MULTIPART_COUNT = 10000; + + @InterfaceAudience.Private + public static final String SSE_C_NO_KEY_ERROR = S3AEncryptionMethods.SSE_C + .getMethod() +" is enabled and no encryption key is provided."; + + + @InterfaceAudience.Private + public static final String SSE_S3_WITH_KEY_ERROR = S3AEncryptionMethods.SSE_S3 + .getMethod() +" is configured and an " + "encryption key is provided"; } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java index b66a23ff6f3..89b9b29726d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java @@ -382,6 +382,7 @@ class S3ABlockOutputStream extends OutputStream { writeOperationHelper.newPutRequest( block.startUpload(), size); + fs.setOptionalPutRequestParameters(putObjectRequest); long transferQueueTime = now(); BlockUploadProgress callback = new BlockUploadProgress( diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java new file mode 100644 index 00000000000..1b3d86ca017 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AEncryptionMethods.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; + +import org.apache.commons.lang3.StringUtils; + +/** + * This enum is to centralize the encryption methods and + * the value required in the configuration. + */ +public enum S3AEncryptionMethods { + + SSE_S3("AES256"), + SSE_KMS("SSE-KMS"), + SSE_C("SSE-C"), + NONE(""); + + private String method; + + S3AEncryptionMethods(String method) { + this.method = method; + } + + public String getMethod() { + return method; + } + + public static S3AEncryptionMethods getMethod(String name) throws IOException { + if(StringUtils.isBlank(name)) { + return NONE; + } + switch(name) { + case "AES256": + return SSE_S3; + case "SSE-KMS": + return SSE_KMS; + case "SSE-C": + return SSE_C; + default: + throw new IOException("Unknown Server Side algorithm "+name); + } + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 8152bf80d34..bffc210b037 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -43,6 +43,7 @@ import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; @@ -51,6 +52,8 @@ import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; +import com.amazonaws.services.s3.model.SSECustomerKey; import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.transfer.Copy; @@ -135,7 +138,7 @@ public class S3AFileSystem extends FileSystem { LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress"); private LocalDirAllocator directoryAllocator; private CannedAccessControlList cannedACL; - private String serverSideEncryptionAlgorithm; + private S3AEncryptionMethods serverSideEncryptionAlgorithm; private S3AInstrumentation instrumentation; private S3AStorageStatistics storageStatistics; private long readAhead; @@ -227,8 +230,17 @@ public class S3AFileSystem extends FileSystem { initMultipartUploads(conf); - serverSideEncryptionAlgorithm = - conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM); + serverSideEncryptionAlgorithm = S3AEncryptionMethods.getMethod( + conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM)); + if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) && + StringUtils.isBlank(getServerSideEncryptionKey(getConf()))) { + throw new IOException(Constants.SSE_C_NO_KEY_ERROR); + } + if(S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm) && + StringUtils.isNotBlank(getServerSideEncryptionKey( + getConf()))) { + throw new IOException(Constants.SSE_S3_WITH_KEY_ERROR); + } LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm); inputPolicy = S3AInputPolicy.getPolicy( conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL)); @@ -514,9 +526,18 @@ public class S3AFileSystem extends FileSystem { + " because it is a directory"); } - return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), - fileStatus.getLen(), s3, statistics, instrumentation, readAhead, - inputPolicy)); + return new FSDataInputStream( + new S3AInputStream(new S3ObjectAttributes( + bucket, + pathToKey(f), + serverSideEncryptionAlgorithm, + getServerSideEncryptionKey(getConf())), + fileStatus.getLen(), + s3, + statistics, + instrumentation, + readAhead, + inputPolicy)); } /** @@ -892,7 +913,14 @@ public class S3AFileSystem extends FileSystem { */ protected ObjectMetadata getObjectMetadata(String key) { incrementStatistic(OBJECT_METADATA_REQUESTS); - ObjectMetadata meta = s3.getObjectMetadata(bucket, key); + GetObjectMetadataRequest request = + new GetObjectMetadataRequest(bucket, key); + //SSE-C requires to be filled in if enabled for object metadata + if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) && + StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))){ + request.setSSECustomerKey(generateSSECustomerKey()); + } + ObjectMetadata meta = s3.getObjectMetadata(request); incrementReadOperations(); return meta; } @@ -986,6 +1014,7 @@ public class S3AFileSystem extends FileSystem { ObjectMetadata metadata, File srcfile) { PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile); + setOptionalPutRequestParameters(putObjectRequest); putObjectRequest.setCannedAcl(cannedACL); putObjectRequest.setMetadata(metadata); return putObjectRequest; @@ -1004,6 +1033,7 @@ public class S3AFileSystem extends FileSystem { ObjectMetadata metadata, InputStream inputStream) { PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, inputStream, metadata); + setOptionalPutRequestParameters(putObjectRequest); putObjectRequest.setCannedAcl(cannedACL); return putObjectRequest; } @@ -1016,9 +1046,7 @@ public class S3AFileSystem extends FileSystem { */ public ObjectMetadata newObjectMetadata() { final ObjectMetadata om = new ObjectMetadata(); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - om.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } + setOptionalObjectMetadata(om); return om; } @@ -1752,11 +1780,10 @@ public class S3AFileSystem extends FileSystem { try { ObjectMetadata srcom = getObjectMetadata(srcKey); ObjectMetadata dstom = cloneObjectMetadata(srcom); - if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { - dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm); - } + setOptionalObjectMetadata(dstom); CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey); + setOptionalCopyObjectRequestParameters(copyObjectRequest); copyObjectRequest.setCannedAccessControlList(cannedACL); copyObjectRequest.setNewObjectMetadata(dstom); @@ -1788,6 +1815,83 @@ public class S3AFileSystem extends FileSystem { } } + protected void setOptionalMultipartUploadRequestParameters( + InitiateMultipartUploadRequest req) { + switch (serverSideEncryptionAlgorithm) { + case SSE_KMS: + req.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams()); + break; + case SSE_C: + if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) { + //at the moment, only supports copy using the same key + req.setSSECustomerKey(generateSSECustomerKey()); + } + break; + default: + } + } + + + protected void setOptionalCopyObjectRequestParameters( + CopyObjectRequest copyObjectRequest) throws IOException { + switch (serverSideEncryptionAlgorithm) { + case SSE_KMS: + copyObjectRequest.setSSEAwsKeyManagementParams( + generateSSEAwsKeyParams() + ); + break; + case SSE_C: + if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) { + //at the moment, only supports copy using the same key + SSECustomerKey customerKey = generateSSECustomerKey(); + copyObjectRequest.setSourceSSECustomerKey(customerKey); + copyObjectRequest.setDestinationSSECustomerKey(customerKey); + } + break; + default: + } + } + + protected void setOptionalPutRequestParameters(PutObjectRequest request) { + switch (serverSideEncryptionAlgorithm) { + case SSE_KMS: + request.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams()); + break; + case SSE_C: + if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) { + request.setSSECustomerKey(generateSSECustomerKey()); + } + break; + default: + } + } + + private void setOptionalObjectMetadata(ObjectMetadata metadata) { + if (S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm)) { + metadata.setSSEAlgorithm(serverSideEncryptionAlgorithm.getMethod()); + } + } + + private SSEAwsKeyManagementParams generateSSEAwsKeyParams() { + //Use specified key, otherwise default to default master aws/s3 key by AWS + SSEAwsKeyManagementParams sseAwsKeyManagementParams = + new SSEAwsKeyManagementParams(); + if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) { + sseAwsKeyManagementParams = + new SSEAwsKeyManagementParams( + getServerSideEncryptionKey(getConf()) + ); + } + return sseAwsKeyManagementParams; + } + + private SSECustomerKey generateSSECustomerKey() { + SSECustomerKey customerKey = new SSECustomerKey( + getServerSideEncryptionKey(getConf()) + ); + return customerKey; + } + /** * Perform post-write actions. * @param key key written to @@ -2240,6 +2344,7 @@ public class S3AFileSystem extends FileSystem { key, newObjectMetadata(-1)); initiateMPURequest.setCannedACL(cannedACL); + setOptionalMultipartUploadRequestParameters(initiateMPURequest); try { return s3.initiateMultipartUpload(initiateMPURequest) .getUploadId(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 3c4093d8cd1..7d322a50f7d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -22,6 +22,7 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.amazonaws.services.s3.model.SSECustomerKey; import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; @@ -36,6 +37,7 @@ import org.slf4j.Logger; import java.io.EOFException; import java.io.IOException; +import static org.apache.commons.lang3.StringUtils.isNotEmpty; import static org.apache.hadoop.fs.s3a.S3AUtils.*; /** @@ -78,6 +80,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { private final String uri; public static final Logger LOG = S3AFileSystem.LOG; private final S3AInstrumentation.InputStreamStatistics streamStatistics; + private S3AEncryptionMethods serverSideEncryptionAlgorithm; + private String serverSideEncryptionKey; private final S3AInputPolicy inputPolicy; private long readahead = Constants.DEFAULT_READAHEAD_RANGE; @@ -98,24 +102,26 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { */ private long contentRangeStart; - public S3AInputStream(String bucket, - String key, + public S3AInputStream(S3ObjectAttributes s3Attributes, long contentLength, AmazonS3 client, FileSystem.Statistics stats, S3AInstrumentation instrumentation, long readahead, S3AInputPolicy inputPolicy) { - Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket"); - Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key"); - Preconditions.checkArgument(contentLength >= 0 , "Negative content length"); - this.bucket = bucket; - this.key = key; + Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), "No Bucket"); + Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key"); + Preconditions.checkArgument(contentLength >= 0, "Negative content length"); + this.bucket = s3Attributes.getBucket(); + this.key = s3Attributes.getKey(); this.contentLength = contentLength; this.client = client; this.stats = stats; this.uri = "s3a://" + this.bucket + "/" + this.key; this.streamStatistics = instrumentation.newInputStreamStatistics(); + this.serverSideEncryptionAlgorithm = + s3Attributes.getServerSideEncryptionAlgorithm(); + this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey(); this.inputPolicy = inputPolicy; setReadahead(readahead); } @@ -145,6 +151,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead { try { GetObjectRequest request = new GetObjectRequest(bucket, key) .withRange(targetPos, contentRangeFinish); + if (S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) && + StringUtils.isNotBlank(serverSideEncryptionKey)){ + request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey)); + } wrappedStream = client.getObject(request).getObjectContent(); contentRangeStart = targetPos; if (wrappedStream == null) { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java index c4ff638f923..53112118bf1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java @@ -723,4 +723,14 @@ public final class S3AUtils { "patch of " + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH); } } + + static String getServerSideEncryptionKey(Configuration conf) { + try { + return getPassword(conf, Constants.SERVER_SIDE_ENCRYPTION_KEY, + conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY)); + } catch (IOException e) { + LOG.error("Cannot retrieve SERVER_SIDE_ENCRYPTION_KEY", e); + } + return null; + } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java new file mode 100644 index 00000000000..7c73a23f792 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ObjectAttributes.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +/** + * This class is only a holder for bucket, key, SSE Algorithm and SSE key + * attributes. It is only used in {@link S3AInputStream} + * as a way to reduce parameters being passed + * to the constructor of such class. + */ +class S3ObjectAttributes { + private String bucket; + private String key; + private S3AEncryptionMethods serverSideEncryptionAlgorithm; + private String serverSideEncryptionKey; + + public S3ObjectAttributes( + String bucket, + String key, + S3AEncryptionMethods serverSideEncryptionAlgorithm, + String serverSideEncryptionKey) { + this.bucket = bucket; + this.key = key; + this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm; + this.serverSideEncryptionKey = serverSideEncryptionKey; + } + + public String getBucket() { + return bucket; + } + + public String getKey() { + return key; + } + + public S3AEncryptionMethods getServerSideEncryptionAlgorithm() { + return serverSideEncryptionAlgorithm; + } + + public String getServerSideEncryptionKey() { + return serverSideEncryptionKey; + } +} diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index d804a596207..2471a521316 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -840,10 +840,20 @@ from placing its declaration on the command line. fs.s3a.server-side-encryption-algorithm Specify a server-side encryption algorithm for s3a: file system. - Unset by default, and the only other currently allowable value is AES256. + Unset by default. It supports the following values: 'AES256' (for SSE-S3), 'SSE-KMS' + and 'SSE-C' + + fs.s3a.server-side-encryption-key + Specific encryption key to use if fs.s3a.server-side-encryption-algorithm + has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property + should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty, + you'll be using your default's S3 KMS key, otherwise you should set this property to + the specific KMS key id. + + fs.s3a.buffer.dir ${hadoop.tmp.dir}/s3a @@ -2160,6 +2170,23 @@ that the file `contract-test-options.xml` does not contain any secret credentials itself. As the auth keys XML file is kept out of the source code tree, it is not going to get accidentally committed. +### Configuring S3a Encryption + +For S3a encryption tests to run correctly, the +`fs.s3a.server-side-encryption-key` must be configured in the s3a contract xml +file with a AWS KMS encryption key arn as this value is different for each AWS +KMS. + +Example: + + + fs.s3a.server-side-encryption-key + arn:aws:kms:us-west-2:360379543683:key/071a86ff-8881-4ba0-9230-95af6d01ca01 + + +You can also force all the tests to run with a specific SSE encryption method +by configuring the property `fs.s3a.server-side-encryption-algorithm` in the s3a +contract file. ### Running the Tests diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java similarity index 66% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java index 8432789979e..515094295b6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractTestS3AEncryption.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.services.s3.model.ObjectMetadata; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.net.util.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.ContractTestUtils; @@ -34,15 +36,14 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; * are made for different file sizes as there have been reports that the * file length may be rounded up to match word boundaries. */ -public class ITestS3AEncryption extends AbstractS3ATestBase { - private static final String AES256 = Constants.SERVER_SIDE_ENCRYPTION_AES256; +public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase { @Override protected Configuration createConfiguration() { Configuration conf = super.createConfiguration(); S3ATestUtils.disableFilesystemCaching(conf); conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, - AES256); + getSSEAlgorithm().getMethod()); return conf; } @@ -80,7 +81,7 @@ public class ITestS3AEncryption extends AbstractS3ATestBase { rm(getFileSystem(), path, false, false); } - private String createFilename(int len) { + protected String createFilename(int len) { return String.format("%s-%04x", methodName.getMethodName(), len); } @@ -89,9 +90,43 @@ public class ITestS3AEncryption extends AbstractS3ATestBase { * @param path path * @throws IOException on a failure */ - private void assertEncrypted(Path path) throws IOException { + protected void assertEncrypted(Path path) throws IOException { ObjectMetadata md = getFileSystem().getObjectMetadata(path); - assertEquals(AES256, md.getSSEAlgorithm()); + switch(getSSEAlgorithm()) { + case SSE_C: + assertEquals("AES256", md.getSSECustomerAlgorithm()); + String md5Key = convertKeyToMd5(); + assertEquals(md5Key, md.getSSECustomerKeyMd5()); + break; + case SSE_KMS: + assertEquals("aws:kms", md.getSSEAlgorithm()); + //S3 will return full arn of the key, so specify global arn in properties + assertEquals(this.getConfiguration(). + getTrimmed(Constants.SERVER_SIDE_ENCRYPTION_KEY), + md.getSSEAwsKmsKeyId()); + break; + default: + assertEquals("AES256", md.getSSEAlgorithm()); + } } + /** + * Decodes the SERVER_SIDE_ENCRYPTION_KEY from base64 into an AES key, then + * gets the md5 of it, then encodes it in base64 so it will match the version + * that AWS returns to us. + * + * @return md5'd base64 encoded representation of the server side encryption + * key + */ + private String convertKeyToMd5() { + String base64Key = getConfiguration().getTrimmed( + Constants.SERVER_SIDE_ENCRYPTION_KEY + ); + byte[] key = Base64.decodeBase64(base64Key); + byte[] md5 = DigestUtils.md5(key); + return Base64.encodeBase64String(md5).trim(); + } + + protected abstract S3AEncryptionMethods getSSEAlgorithm(); + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java deleted file mode 100644 index 96deb255864..00000000000 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.fs.s3a; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.junit.Test; - -import java.io.IOException; - -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; - -/** - * Test whether or not encryption settings propagate by choosing an invalid - * one. We expect the write to fail with a 400 bad request error - */ -public class ITestS3AEncryptionAlgorithmPropagation - extends AbstractS3ATestBase { - - @Override - protected Configuration createConfiguration() { - Configuration conf = super.createConfiguration(); - S3ATestUtils.disableFilesystemCaching(conf); - conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, - "DES"); - return conf; - } - - @Test - public void testEncrypt0() throws Throwable { - writeThenReadFileToFailure(0); - } - - @Test - public void testEncrypt256() throws Throwable { - writeThenReadFileToFailure(256); - } - - /** - * Make this a no-op so test setup doesn't fail. - * @param path path path - * @throws IOException on any failure - */ - @Override - protected void mkdirs(Path path) throws IOException { - - } - - protected void writeThenReadFileToFailure(int len) throws IOException { - skipIfEncryptionTestsDisabled(getConfiguration()); - describe("Create an encrypted file of size " + len); - try { - writeThenReadFile(methodName.getMethodName() + '-' + len, len); - fail("Expected an exception about an illegal encryption algorithm"); - } catch (AWSS3IOException e) { - assertStatusCode(e, 400); - } - } - -} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java new file mode 100644 index 00000000000..0cd8ff4ee3f --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmValidation.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Test whether or not encryption settings propagate by choosing an invalid + * one. We expect the S3AFileSystem to fail to initialize. + */ +@Ignore +public class ITestS3AEncryptionAlgorithmValidation + extends AbstractS3ATestBase { + + @Test + public void testEncryptionAlgorithmSetToDES() throws Throwable { + //skip tests if they aren't enabled + assumeEnabled(); + intercept(IOException.class, "Unknown Server Side algorithm DES", () -> { + + Configuration conf = super.createConfiguration(); + //DES is an invalid encryption algorithm + conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, "DES"); + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + //extract the test FS + FileSystem fileSystem = contract.getTestFileSystem(); + assertNotNull("null filesystem", fileSystem); + URI fsURI = fileSystem.getUri(); + LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem); + assertEquals("wrong filesystem of " + fsURI, + contract.getScheme(), fsURI.getScheme()); + fileSystem.initialize(fsURI, conf); + throw new Exception("Do not reach here"); + }); + } + + @Test + public void testEncryptionAlgorithmSSECWithNoEncryptionKey() throws + Throwable { + //skip tests if they aren't enabled + assumeEnabled(); + intercept(IllegalArgumentException.class, "The value of property " + + Constants.SERVER_SIDE_ENCRYPTION_KEY + " must not be null", () -> { + + Configuration conf = super.createConfiguration(); + //SSE-C must be configured with an encryption key + conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, + S3AEncryptionMethods.SSE_C.getMethod()); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, null); + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + //extract the test FS + FileSystem fileSystem = contract.getTestFileSystem(); + assertNotNull("null filesystem", fileSystem); + URI fsURI = fileSystem.getUri(); + LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem); + assertEquals("wrong filesystem of " + fsURI, + contract.getScheme(), fsURI.getScheme()); + fileSystem.initialize(fsURI, conf); + throw new Exception("Do not reach here"); + }); + } + + @Test + public void testEncryptionAlgorithmSSECWithBlankEncryptionKey() throws + Throwable { + intercept(IOException.class, Constants.SSE_C_NO_KEY_ERROR, () -> { + + Configuration conf = super.createConfiguration(); + //SSE-C must be configured with an encryption key + conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, + S3AEncryptionMethods.SSE_C.getMethod()); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, ""); + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + //extract the test FS + FileSystem fileSystem = contract.getTestFileSystem(); + assertNotNull("null filesystem", fileSystem); + URI fsURI = fileSystem.getUri(); + LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem); + assertEquals("wrong filesystem of " + fsURI, + contract.getScheme(), fsURI.getScheme()); + fileSystem.initialize(fsURI, conf); + throw new Exception("Do not reach here"); + }); + } + + @Test + public void testEncryptionAlgorithmSSES3WithEncryptionKey() throws + Throwable { + //skip tests if they aren't enabled + assumeEnabled(); + intercept(IOException.class, Constants.SSE_S3_WITH_KEY_ERROR, () -> { + + Configuration conf = super.createConfiguration(); + //SSE-S3 cannot be configured with an encryption key + conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, + S3AEncryptionMethods.SSE_S3.getMethod()); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, + "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs="); + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + //extract the test FS + FileSystem fileSystem = contract.getTestFileSystem(); + assertNotNull("null filesystem", fileSystem); + URI fsURI = fileSystem.getUri(); + LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem); + assertEquals("wrong filesystem of " + fsURI, + contract.getScheme(), fsURI.getScheme()); + fileSystem.initialize(fsURI, conf); + throw new Exception("Do not reach here"); + }); + } + + /** + * Make this a no-op so test setup doesn't fail. + * @param path path path + * @throws IOException on any failure + */ + @Override + protected void mkdirs(Path path) throws IOException { + + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java new file mode 100644 index 00000000000..71586b89849 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; +import static org.apache.hadoop.fs.contract.ContractTestUtils.rm; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.contract.s3a.S3AContract; +import org.junit.Test; + +/** + * Concrete class that extends {@link AbstractTestS3AEncryption} + * and tests SSE-C encryption. + */ +public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, + getSSEAlgorithm().getMethod()); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, + "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs="); + return conf; + } + + /** + * This will create and write to a file using encryption key A, then attempt + * to read from it again with encryption key B. This will not work as it + * cannot decrypt the file. + * @throws Exception + */ + @Test + public void testCreateFileAndReadWithDifferentEncryptionKey() throws + Exception { + final Path[] path = new Path[1]; + intercept(java.nio.file.AccessDeniedException.class, + "Forbidden (Service: Amazon S3; Status Code: 403;", () -> { + + int len = 2048; + skipIfEncryptionTestsDisabled(getConfiguration()); + describe("Create an encrypted file of size " + len); + String src = createFilename(len); + path[0] = writeThenReadFile(src, len); + + Configuration conf = this.createConfiguration(); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, + "kX7SdwVc/1VXJr76kfKnkQ3ONYhxianyL2+C3rPVT9s="); + + S3AContract contract = (S3AContract) createContract(conf); + contract.init(); + //skip tests if they aren't enabled + assumeEnabled(); + //extract the test FS + FileSystem fileSystem = contract.getTestFileSystem(); + byte[] data = dataset(len, 'a', 'z'); + ContractTestUtils.verifyFileContents(fileSystem, path[0], data); + throw new Exception("Fail"); + }); + rm(getFileSystem(), path[0], false, false); + } + + @Override + protected S3AEncryptionMethods getSSEAlgorithm() { + return S3AEncryptionMethods.SSE_C; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSECBlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSECBlockOutputStream.java new file mode 100644 index 00000000000..afa04412935 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSECBlockOutputStream.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; + +/** + * Run the encryption tests against the Fast output stream. + * This verifies that both file writing paths can encrypt their data. + */ + +public class ITestS3AEncryptionSSECBlockOutputStream + extends AbstractTestS3AEncryption { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setBoolean(Constants.FAST_UPLOAD, true); + conf.set(Constants.FAST_UPLOAD_BUFFER, + Constants.FAST_UPLOAD_BYTEBUFFER); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, + "4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs="); + return conf; + } + + @Override + protected S3AEncryptionMethods getSSEAlgorithm() { + return S3AEncryptionMethods.SSE_C; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java new file mode 100644 index 00000000000..8b68fcfb880 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSDefaultKey.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.hamcrest.CoreMatchers.containsString; + +import java.io.IOException; + +import com.amazonaws.services.s3.model.ObjectMetadata; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * Concrete class that extends {@link AbstractTestS3AEncryption} + * and tests SSE-KMS encryption when no KMS encryption key is provided and AWS + * uses the default. Since this resource changes for every account and region, + * there is no good way to explicitly set this value to do a equality check + * in the response. + */ +public class ITestS3AEncryptionSSEKMSDefaultKey + extends AbstractTestS3AEncryption { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, ""); + return conf; + } + + @Override + protected S3AEncryptionMethods getSSEAlgorithm() { + return S3AEncryptionMethods.SSE_KMS; + } + + @Override + protected void assertEncrypted(Path path) throws IOException { + ObjectMetadata md = getFileSystem().getObjectMetadata(path); + assertEquals("aws:kms", md.getSSEAlgorithm()); + assertThat(md.getSSEAwsKmsKeyId(), containsString("arn:aws:kms:")); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java new file mode 100644 index 00000000000..50c9fb554e2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKey.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +/** + * Concrete class that extends {@link AbstractTestS3AEncryption} + * and tests SSE-KMS encryption. This requires the SERVER_SIDE_ENCRYPTION_KEY + * to be set in auth-keys.xml for it to run. + */ +public class ITestS3AEncryptionSSEKMSUserDefinedKey + extends AbstractTestS3AEncryption { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + if(StringUtils.isBlank(conf.get(Constants.SERVER_SIDE_ENCRYPTION_KEY))){ + skip(Constants.SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " + + S3AEncryptionMethods.SSE_KMS.getMethod()); + } + return conf; + } + + @Override + protected S3AEncryptionMethods getSSEAlgorithm() { + return S3AEncryptionMethods.SSE_KMS; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream.java new file mode 100644 index 00000000000..8ce3a137914 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +/** + * Run the encryption tests against the Fast output stream. + * This verifies that both file writing paths can encrypt their data. This + * requires the SERVER_SIDE_ENCRYPTION_KEY to be set in auth-keys.xml for it + * to run. + */ +public class ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream + extends AbstractTestS3AEncryption { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + if(StringUtils.isBlank(conf.get(Constants.SERVER_SIDE_ENCRYPTION_KEY))){ + skip(Constants.SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " + + S3AEncryptionMethods.SSE_KMS.getMethod()); + } + conf.setBoolean(Constants.FAST_UPLOAD, true); + conf.set(Constants.FAST_UPLOAD_BUFFER, + Constants.FAST_UPLOAD_BYTEBUFFER); + return conf; + } + + @Override + protected S3AEncryptionMethods getSSEAlgorithm() { + return S3AEncryptionMethods.SSE_KMS; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java new file mode 100644 index 00000000000..33a252a68b3 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; + +/** + * Concrete class that extends {@link AbstractTestS3AEncryption} + * and tests SSE-S3 encryption. + */ +public class ITestS3AEncryptionSSES3 extends AbstractTestS3AEncryption { + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + S3ATestUtils.disableFilesystemCaching(conf); + //must specify encryption key as empty because SSE-S3 does not allow it, + //nor can it be null. + conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, ""); + return conf; + } + + @Override + protected S3AEncryptionMethods getSSEAlgorithm() { + return S3AEncryptionMethods.SSE_S3; + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3BlockOutputStream.java similarity index 77% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3BlockOutputStream.java index 5239f30b4a1..407601f1a03 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionBlockOutputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSES3BlockOutputStream.java @@ -23,7 +23,8 @@ import org.apache.hadoop.conf.Configuration; /** * Run the encryption tests against the block output stream. */ -public class ITestS3AEncryptionBlockOutputStream extends ITestS3AEncryption { +public class ITestS3AEncryptionSSES3BlockOutputStream + extends AbstractTestS3AEncryption { @Override protected Configuration createConfiguration() { @@ -31,6 +32,14 @@ public class ITestS3AEncryptionBlockOutputStream extends ITestS3AEncryption { conf.setBoolean(Constants.FAST_UPLOAD, true); conf.set(Constants.FAST_UPLOAD_BUFFER, Constants.FAST_UPLOAD_BYTEBUFFER); + //must specify encryption key as empty because SSE-S3 does not allow it, + //nor can it be null. + conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, ""); return conf; } + + @Override + protected S3AEncryptionMethods getSSEAlgorithm() { + return S3AEncryptionMethods.SSE_S3; + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java index f9e9c6bc74a..a5dc01ab87a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java @@ -26,6 +26,7 @@ import java.io.FileNotFoundException; import java.util.Collections; import java.util.Date; +import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; @@ -34,6 +35,9 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; import org.junit.Test; /** @@ -48,7 +52,8 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest { ObjectMetadata meta = new ObjectMetadata(); meta.setContentLength(1L); meta.setLastModified(new Date(2L)); - when(s3.getObjectMetadata(BUCKET, key)).thenReturn(meta); + when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key)))) + .thenReturn(meta); FileStatus stat = fs.getFileStatus(path); assertNotNull(stat); assertEquals(fs.makeQualified(path), stat.getPath()); @@ -61,10 +66,13 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest { public void testFakeDirectory() throws Exception { Path path = new Path("/dir"); String key = path.toUri().getPath().substring(1); - when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key)))) + .thenThrow(NOT_FOUND); ObjectMetadata meta = new ObjectMetadata(); meta.setContentLength(0L); - when(s3.getObjectMetadata(BUCKET, key + "/")).thenReturn(meta); + when(s3.getObjectMetadata(argThat( + correctGetMetadataRequest(BUCKET, key + "/")) + )).thenReturn(meta); FileStatus stat = fs.getFileStatus(path); assertNotNull(stat); assertEquals(fs.makeQualified(path), stat.getPath()); @@ -75,8 +83,11 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest { public void testImplicitDirectory() throws Exception { Path path = new Path("/dir"); String key = path.toUri().getPath().substring(1); - when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); - when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key)))) + .thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(argThat( + correctGetMetadataRequest(BUCKET, key + "/")) + )).thenThrow(NOT_FOUND); ObjectListing objects = mock(ObjectListing.class); when(objects.getCommonPrefixes()).thenReturn( Collections.singletonList("dir/")); @@ -93,8 +104,11 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest { public void testRoot() throws Exception { Path path = new Path("/"); String key = path.toUri().getPath().substring(1); - when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); - when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key)))) + .thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(argThat( + correctGetMetadataRequest(BUCKET, key + "/") + ))).thenThrow(NOT_FOUND); ObjectListing objects = mock(ObjectListing.class); when(objects.getCommonPrefixes()).thenReturn( Collections.emptyList()); @@ -112,8 +126,11 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest { public void testNotFound() throws Exception { Path path = new Path("/dir"); String key = path.toUri().getPath().substring(1); - when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); - when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key)))) + .thenThrow(NOT_FOUND); + when(s3.getObjectMetadata(argThat( + correctGetMetadataRequest(BUCKET, key + "/") + ))).thenThrow(NOT_FOUND); ObjectListing objects = mock(ObjectListing.class); when(objects.getCommonPrefixes()).thenReturn( Collections.emptyList()); @@ -123,4 +140,26 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest { exception.expect(FileNotFoundException.class); fs.getFileStatus(path); } + + private Matcher correctGetMetadataRequest( + String bucket, String key) { + return new BaseMatcher() { + + @Override + public void describeTo(Description description) { + description.appendText("bucket and key match"); + } + + @Override + public boolean matches(Object o) { + if(o instanceof GetObjectMetadataRequest) { + GetObjectMetadataRequest getObjectMetadataRequest = + (GetObjectMetadataRequest)o; + return getObjectMetadataRequest.getBucketName().equals(bucket) + && getObjectMetadataRequest.getKey().equals(key); + } + return false; + } + }; + } }