HADOOP-13075. Add support for SSE-KMS and SSE-C in s3a filesystem. (Steve Moist via lei)

This commit is contained in:
Lei Xu 2017-02-11 13:59:03 -08:00
parent 649deb72fb
commit 839b690ed5
20 changed files with 904 additions and 114 deletions

View File

@ -183,6 +183,7 @@
<exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude> <exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude>
<exclude>**/ITest*Root*.java</exclude> <exclude>**/ITest*Root*.java</exclude>
<exclude>**/ITestS3AFileContextStatistics.java</exclude> <exclude>**/ITestS3AFileContextStatistics.java</exclude>
<exclude>**/ITestS3AEncryptionSSE*.java</exclude>
<include>**/ITestS3AHuge*.java</include> <include>**/ITestS3AHuge*.java</include>
</excludes> </excludes>
</configuration> </configuration>
@ -211,6 +212,7 @@
<include>**/ITest*Root*.java</include> <include>**/ITest*Root*.java</include>
<include>**/ITestS3AFileContextStatistics.java</include> <include>**/ITestS3AFileContextStatistics.java</include>
<include>**/ITestS3AHuge*.java</include> <include>**/ITestS3AHuge*.java</include>
<include>**/ITestS3AEncryptionSSE*.java</include>
</includes> </includes>
</configuration> </configuration>
</execution> </execution>

View File

@ -216,17 +216,28 @@ public final class Constants {
"fs.s3a.multipart.purge.age"; "fs.s3a.multipart.purge.age";
public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400; public static final long DEFAULT_PURGE_EXISTING_MULTIPART_AGE = 86400;
// s3 server-side encryption // s3 server-side encryption, see S3AEncryptionMethods for valid options
public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM = public static final String SERVER_SIDE_ENCRYPTION_ALGORITHM =
"fs.s3a.server-side-encryption-algorithm"; "fs.s3a.server-side-encryption-algorithm";
/** /**
* The standard encryption algorithm AWS supports. * The standard encryption algorithm AWS supports.
* Different implementations may support others (or none). * Different implementations may support others (or none).
* Use the S3AEncryptionMethods instead when configuring
* which Server Side Encryption to use.
*/ */
@Deprecated
public static final String SERVER_SIDE_ENCRYPTION_AES256 = public static final String SERVER_SIDE_ENCRYPTION_AES256 =
"AES256"; "AES256";
/**
* Used to specify which AWS KMS key to use if
* SERVER_SIDE_ENCRYPTION_ALGORITHM is AWS_KMS (will default to aws/s3
* master key if left blank) or with SSE_C, the actual AES 256 key.
*/
public static final String SERVER_SIDE_ENCRYPTION_KEY =
"fs.s3a.server-side-encryption-key";
//override signature algorithm used for signing requests //override signature algorithm used for signing requests
public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm"; public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";
@ -296,4 +307,13 @@ public final class Constants {
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public static final int MAX_MULTIPART_COUNT = 10000; public static final int MAX_MULTIPART_COUNT = 10000;
@InterfaceAudience.Private
public static final String SSE_C_NO_KEY_ERROR = S3AEncryptionMethods.SSE_C
.getMethod() +" is enabled and no encryption key is provided.";
@InterfaceAudience.Private
public static final String SSE_S3_WITH_KEY_ERROR = S3AEncryptionMethods.SSE_S3
.getMethod() +" is configured and an " + "encryption key is provided";
} }

View File

@ -382,6 +382,7 @@ class S3ABlockOutputStream extends OutputStream {
writeOperationHelper.newPutRequest( writeOperationHelper.newPutRequest(
block.startUpload(), block.startUpload(),
size); size);
fs.setOptionalPutRequestParameters(putObjectRequest);
long transferQueueTime = now(); long transferQueueTime = now();
BlockUploadProgress callback = BlockUploadProgress callback =
new BlockUploadProgress( new BlockUploadProgress(

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.IOException;
import org.apache.commons.lang3.StringUtils;
/**
* This enum is to centralize the encryption methods and
* the value required in the configuration.
*/
public enum S3AEncryptionMethods {
SSE_S3("AES256"),
SSE_KMS("SSE-KMS"),
SSE_C("SSE-C"),
NONE("");
private String method;
S3AEncryptionMethods(String method) {
this.method = method;
}
public String getMethod() {
return method;
}
public static S3AEncryptionMethods getMethod(String name) throws IOException {
if(StringUtils.isBlank(name)) {
return NONE;
}
switch(name) {
case "AES256":
return SSE_S3;
case "SSE-KMS":
return SSE_KMS;
case "SSE-C":
return SSE_C;
default:
throw new IOException("Unknown Server Side algorithm "+name);
}
}
}

View File

@ -43,6 +43,7 @@ import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectListing;
@ -51,6 +52,8 @@ import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult; import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3ObjectSummary; import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams;
import com.amazonaws.services.s3.model.SSECustomerKey;
import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult; import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Copy; import com.amazonaws.services.s3.transfer.Copy;
@ -135,7 +138,7 @@ public class S3AFileSystem extends FileSystem {
LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress"); LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
private LocalDirAllocator directoryAllocator; private LocalDirAllocator directoryAllocator;
private CannedAccessControlList cannedACL; private CannedAccessControlList cannedACL;
private String serverSideEncryptionAlgorithm; private S3AEncryptionMethods serverSideEncryptionAlgorithm;
private S3AInstrumentation instrumentation; private S3AInstrumentation instrumentation;
private S3AStorageStatistics storageStatistics; private S3AStorageStatistics storageStatistics;
private long readAhead; private long readAhead;
@ -227,8 +230,17 @@ public class S3AFileSystem extends FileSystem {
initMultipartUploads(conf); initMultipartUploads(conf);
serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm = S3AEncryptionMethods.getMethod(
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM); conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM));
if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
StringUtils.isBlank(getServerSideEncryptionKey(getConf()))) {
throw new IOException(Constants.SSE_C_NO_KEY_ERROR);
}
if(S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm) &&
StringUtils.isNotBlank(getServerSideEncryptionKey(
getConf()))) {
throw new IOException(Constants.SSE_S3_WITH_KEY_ERROR);
}
LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm); LOG.debug("Using encryption {}", serverSideEncryptionAlgorithm);
inputPolicy = S3AInputPolicy.getPolicy( inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL)); conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
@ -514,9 +526,18 @@ public class S3AFileSystem extends FileSystem {
+ " because it is a directory"); + " because it is a directory");
} }
return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f), return new FSDataInputStream(
fileStatus.getLen(), s3, statistics, instrumentation, readAhead, new S3AInputStream(new S3ObjectAttributes(
inputPolicy)); bucket,
pathToKey(f),
serverSideEncryptionAlgorithm,
getServerSideEncryptionKey(getConf())),
fileStatus.getLen(),
s3,
statistics,
instrumentation,
readAhead,
inputPolicy));
} }
/** /**
@ -892,7 +913,14 @@ public class S3AFileSystem extends FileSystem {
*/ */
protected ObjectMetadata getObjectMetadata(String key) { protected ObjectMetadata getObjectMetadata(String key) {
incrementStatistic(OBJECT_METADATA_REQUESTS); incrementStatistic(OBJECT_METADATA_REQUESTS);
ObjectMetadata meta = s3.getObjectMetadata(bucket, key); GetObjectMetadataRequest request =
new GetObjectMetadataRequest(bucket, key);
//SSE-C requires to be filled in if enabled for object metadata
if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))){
request.setSSECustomerKey(generateSSECustomerKey());
}
ObjectMetadata meta = s3.getObjectMetadata(request);
incrementReadOperations(); incrementReadOperations();
return meta; return meta;
} }
@ -986,6 +1014,7 @@ public class S3AFileSystem extends FileSystem {
ObjectMetadata metadata, File srcfile) { ObjectMetadata metadata, File srcfile) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
srcfile); srcfile);
setOptionalPutRequestParameters(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL); putObjectRequest.setCannedAcl(cannedACL);
putObjectRequest.setMetadata(metadata); putObjectRequest.setMetadata(metadata);
return putObjectRequest; return putObjectRequest;
@ -1004,6 +1033,7 @@ public class S3AFileSystem extends FileSystem {
ObjectMetadata metadata, InputStream inputStream) { ObjectMetadata metadata, InputStream inputStream) {
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
inputStream, metadata); inputStream, metadata);
setOptionalPutRequestParameters(putObjectRequest);
putObjectRequest.setCannedAcl(cannedACL); putObjectRequest.setCannedAcl(cannedACL);
return putObjectRequest; return putObjectRequest;
} }
@ -1016,9 +1046,7 @@ public class S3AFileSystem extends FileSystem {
*/ */
public ObjectMetadata newObjectMetadata() { public ObjectMetadata newObjectMetadata() {
final ObjectMetadata om = new ObjectMetadata(); final ObjectMetadata om = new ObjectMetadata();
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { setOptionalObjectMetadata(om);
om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
return om; return om;
} }
@ -1752,11 +1780,10 @@ public class S3AFileSystem extends FileSystem {
try { try {
ObjectMetadata srcom = getObjectMetadata(srcKey); ObjectMetadata srcom = getObjectMetadata(srcKey);
ObjectMetadata dstom = cloneObjectMetadata(srcom); ObjectMetadata dstom = cloneObjectMetadata(srcom);
if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) { setOptionalObjectMetadata(dstom);
dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
}
CopyObjectRequest copyObjectRequest = CopyObjectRequest copyObjectRequest =
new CopyObjectRequest(bucket, srcKey, bucket, dstKey); new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
setOptionalCopyObjectRequestParameters(copyObjectRequest);
copyObjectRequest.setCannedAccessControlList(cannedACL); copyObjectRequest.setCannedAccessControlList(cannedACL);
copyObjectRequest.setNewObjectMetadata(dstom); copyObjectRequest.setNewObjectMetadata(dstom);
@ -1788,6 +1815,83 @@ public class S3AFileSystem extends FileSystem {
} }
} }
protected void setOptionalMultipartUploadRequestParameters(
InitiateMultipartUploadRequest req) {
switch (serverSideEncryptionAlgorithm) {
case SSE_KMS:
req.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
break;
case SSE_C:
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
//at the moment, only supports copy using the same key
req.setSSECustomerKey(generateSSECustomerKey());
}
break;
default:
}
}
protected void setOptionalCopyObjectRequestParameters(
CopyObjectRequest copyObjectRequest) throws IOException {
switch (serverSideEncryptionAlgorithm) {
case SSE_KMS:
copyObjectRequest.setSSEAwsKeyManagementParams(
generateSSEAwsKeyParams()
);
break;
case SSE_C:
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
//at the moment, only supports copy using the same key
SSECustomerKey customerKey = generateSSECustomerKey();
copyObjectRequest.setSourceSSECustomerKey(customerKey);
copyObjectRequest.setDestinationSSECustomerKey(customerKey);
}
break;
default:
}
}
protected void setOptionalPutRequestParameters(PutObjectRequest request) {
switch (serverSideEncryptionAlgorithm) {
case SSE_KMS:
request.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
break;
case SSE_C:
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
request.setSSECustomerKey(generateSSECustomerKey());
}
break;
default:
}
}
private void setOptionalObjectMetadata(ObjectMetadata metadata) {
if (S3AEncryptionMethods.SSE_S3.equals(serverSideEncryptionAlgorithm)) {
metadata.setSSEAlgorithm(serverSideEncryptionAlgorithm.getMethod());
}
}
private SSEAwsKeyManagementParams generateSSEAwsKeyParams() {
//Use specified key, otherwise default to default master aws/s3 key by AWS
SSEAwsKeyManagementParams sseAwsKeyManagementParams =
new SSEAwsKeyManagementParams();
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
sseAwsKeyManagementParams =
new SSEAwsKeyManagementParams(
getServerSideEncryptionKey(getConf())
);
}
return sseAwsKeyManagementParams;
}
private SSECustomerKey generateSSECustomerKey() {
SSECustomerKey customerKey = new SSECustomerKey(
getServerSideEncryptionKey(getConf())
);
return customerKey;
}
/** /**
* Perform post-write actions. * Perform post-write actions.
* @param key key written to * @param key key written to
@ -2240,6 +2344,7 @@ public class S3AFileSystem extends FileSystem {
key, key,
newObjectMetadata(-1)); newObjectMetadata(-1));
initiateMPURequest.setCannedACL(cannedACL); initiateMPURequest.setCannedACL(cannedACL);
setOptionalMultipartUploadRequestParameters(initiateMPURequest);
try { try {
return s3.initiateMultipartUpload(initiateMPURequest) return s3.initiateMultipartUpload(initiateMPURequest)
.getUploadId(); .getUploadId();

View File

@ -22,6 +22,7 @@ import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3ObjectInputStream; import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.services.s3.model.SSECustomerKey;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -36,6 +37,7 @@ import org.slf4j.Logger;
import java.io.EOFException; import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.hadoop.fs.s3a.S3AUtils.*; import static org.apache.hadoop.fs.s3a.S3AUtils.*;
/** /**
@ -78,6 +80,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
private final String uri; private final String uri;
public static final Logger LOG = S3AFileSystem.LOG; public static final Logger LOG = S3AFileSystem.LOG;
private final S3AInstrumentation.InputStreamStatistics streamStatistics; private final S3AInstrumentation.InputStreamStatistics streamStatistics;
private S3AEncryptionMethods serverSideEncryptionAlgorithm;
private String serverSideEncryptionKey;
private final S3AInputPolicy inputPolicy; private final S3AInputPolicy inputPolicy;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE; private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
@ -98,24 +102,26 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
*/ */
private long contentRangeStart; private long contentRangeStart;
public S3AInputStream(String bucket, public S3AInputStream(S3ObjectAttributes s3Attributes,
String key,
long contentLength, long contentLength,
AmazonS3 client, AmazonS3 client,
FileSystem.Statistics stats, FileSystem.Statistics stats,
S3AInstrumentation instrumentation, S3AInstrumentation instrumentation,
long readahead, long readahead,
S3AInputPolicy inputPolicy) { S3AInputPolicy inputPolicy) {
Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()), "No Bucket");
Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key"); Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
Preconditions.checkArgument(contentLength >= 0 , "Negative content length"); Preconditions.checkArgument(contentLength >= 0, "Negative content length");
this.bucket = bucket; this.bucket = s3Attributes.getBucket();
this.key = key; this.key = s3Attributes.getKey();
this.contentLength = contentLength; this.contentLength = contentLength;
this.client = client; this.client = client;
this.stats = stats; this.stats = stats;
this.uri = "s3a://" + this.bucket + "/" + this.key; this.uri = "s3a://" + this.bucket + "/" + this.key;
this.streamStatistics = instrumentation.newInputStreamStatistics(); this.streamStatistics = instrumentation.newInputStreamStatistics();
this.serverSideEncryptionAlgorithm =
s3Attributes.getServerSideEncryptionAlgorithm();
this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
this.inputPolicy = inputPolicy; this.inputPolicy = inputPolicy;
setReadahead(readahead); setReadahead(readahead);
} }
@ -145,6 +151,10 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
try { try {
GetObjectRequest request = new GetObjectRequest(bucket, key) GetObjectRequest request = new GetObjectRequest(bucket, key)
.withRange(targetPos, contentRangeFinish); .withRange(targetPos, contentRangeFinish);
if (S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
StringUtils.isNotBlank(serverSideEncryptionKey)){
request.setSSECustomerKey(new SSECustomerKey(serverSideEncryptionKey));
}
wrappedStream = client.getObject(request).getObjectContent(); wrappedStream = client.getObject(request).getObjectContent();
contentRangeStart = targetPos; contentRangeStart = targetPos;
if (wrappedStream == null) { if (wrappedStream == null) {

View File

@ -723,4 +723,14 @@ public final class S3AUtils {
"patch of " + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH); "patch of " + S3A_SECURITY_CREDENTIAL_PROVIDER_PATH);
} }
} }
static String getServerSideEncryptionKey(Configuration conf) {
try {
return getPassword(conf, Constants.SERVER_SIDE_ENCRYPTION_KEY,
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY));
} catch (IOException e) {
LOG.error("Cannot retrieve SERVER_SIDE_ENCRYPTION_KEY", e);
}
return null;
}
} }

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
/**
* This class is only a holder for bucket, key, SSE Algorithm and SSE key
* attributes. It is only used in {@link S3AInputStream}
* as a way to reduce parameters being passed
* to the constructor of such class.
*/
class S3ObjectAttributes {
private String bucket;
private String key;
private S3AEncryptionMethods serverSideEncryptionAlgorithm;
private String serverSideEncryptionKey;
public S3ObjectAttributes(
String bucket,
String key,
S3AEncryptionMethods serverSideEncryptionAlgorithm,
String serverSideEncryptionKey) {
this.bucket = bucket;
this.key = key;
this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
this.serverSideEncryptionKey = serverSideEncryptionKey;
}
public String getBucket() {
return bucket;
}
public String getKey() {
return key;
}
public S3AEncryptionMethods getServerSideEncryptionAlgorithm() {
return serverSideEncryptionAlgorithm;
}
public String getServerSideEncryptionKey() {
return serverSideEncryptionKey;
}
}

View File

@ -840,10 +840,20 @@ from placing its declaration on the command line.
<property> <property>
<name>fs.s3a.server-side-encryption-algorithm</name> <name>fs.s3a.server-side-encryption-algorithm</name>
<description>Specify a server-side encryption algorithm for s3a: file system. <description>Specify a server-side encryption algorithm for s3a: file system.
Unset by default, and the only other currently allowable value is AES256. Unset by default. It supports the following values: 'AES256' (for SSE-S3), 'SSE-KMS'
and 'SSE-C'
</description> </description>
</property> </property>
<property>
<name>fs.s3a.server-side-encryption-key</name>
<description>Specific encryption key to use if fs.s3a.server-side-encryption-algorithm
has been set to 'SSE-KMS' or 'SSE-C'. In the case of SSE-C, the value of this property
should be the Base64 encoded key. If you are using SSE-KMS and leave this property empty,
you'll be using your default's S3 KMS key, otherwise you should set this property to
the specific KMS key id.</description>
</property>
<property> <property>
<name>fs.s3a.buffer.dir</name> <name>fs.s3a.buffer.dir</name>
<value>${hadoop.tmp.dir}/s3a</value> <value>${hadoop.tmp.dir}/s3a</value>
@ -2160,6 +2170,23 @@ that the file `contract-test-options.xml` does not contain any
secret credentials itself. As the auth keys XML file is kept out of the secret credentials itself. As the auth keys XML file is kept out of the
source code tree, it is not going to get accidentally committed. source code tree, it is not going to get accidentally committed.
### Configuring S3a Encryption
For S3a encryption tests to run correctly, the
`fs.s3a.server-side-encryption-key` must be configured in the s3a contract xml
file with a AWS KMS encryption key arn as this value is different for each AWS
KMS.
Example:
<property>
<name>fs.s3a.server-side-encryption-key</name>
<value>arn:aws:kms:us-west-2:360379543683:key/071a86ff-8881-4ba0-9230-95af6d01ca01</value>
</property>
You can also force all the tests to run with a specific SSE encryption method
by configuring the property `fs.s3a.server-side-encryption-algorithm` in the s3a
contract file.
### Running the Tests ### Running the Tests

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.fs.s3a; package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.net.util.Base64;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -34,15 +36,14 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
* are made for different file sizes as there have been reports that the * are made for different file sizes as there have been reports that the
* file length may be rounded up to match word boundaries. * file length may be rounded up to match word boundaries.
*/ */
public class ITestS3AEncryption extends AbstractS3ATestBase { public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
private static final String AES256 = Constants.SERVER_SIDE_ENCRYPTION_AES256;
@Override @Override
protected Configuration createConfiguration() { protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration(); Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf); S3ATestUtils.disableFilesystemCaching(conf);
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
AES256); getSSEAlgorithm().getMethod());
return conf; return conf;
} }
@ -80,7 +81,7 @@ public class ITestS3AEncryption extends AbstractS3ATestBase {
rm(getFileSystem(), path, false, false); rm(getFileSystem(), path, false, false);
} }
private String createFilename(int len) { protected String createFilename(int len) {
return String.format("%s-%04x", methodName.getMethodName(), len); return String.format("%s-%04x", methodName.getMethodName(), len);
} }
@ -89,9 +90,43 @@ public class ITestS3AEncryption extends AbstractS3ATestBase {
* @param path path * @param path path
* @throws IOException on a failure * @throws IOException on a failure
*/ */
private void assertEncrypted(Path path) throws IOException { protected void assertEncrypted(Path path) throws IOException {
ObjectMetadata md = getFileSystem().getObjectMetadata(path); ObjectMetadata md = getFileSystem().getObjectMetadata(path);
assertEquals(AES256, md.getSSEAlgorithm()); switch(getSSEAlgorithm()) {
case SSE_C:
assertEquals("AES256", md.getSSECustomerAlgorithm());
String md5Key = convertKeyToMd5();
assertEquals(md5Key, md.getSSECustomerKeyMd5());
break;
case SSE_KMS:
assertEquals("aws:kms", md.getSSEAlgorithm());
//S3 will return full arn of the key, so specify global arn in properties
assertEquals(this.getConfiguration().
getTrimmed(Constants.SERVER_SIDE_ENCRYPTION_KEY),
md.getSSEAwsKmsKeyId());
break;
default:
assertEquals("AES256", md.getSSEAlgorithm());
}
} }
/**
* Decodes the SERVER_SIDE_ENCRYPTION_KEY from base64 into an AES key, then
* gets the md5 of it, then encodes it in base64 so it will match the version
* that AWS returns to us.
*
* @return md5'd base64 encoded representation of the server side encryption
* key
*/
private String convertKeyToMd5() {
String base64Key = getConfiguration().getTrimmed(
Constants.SERVER_SIDE_ENCRYPTION_KEY
);
byte[] key = Base64.decodeBase64(base64Key);
byte[] md5 = DigestUtils.md5(key);
return Base64.encodeBase64String(md5).trim();
}
protected abstract S3AEncryptionMethods getSSEAlgorithm();
} }

View File

@ -1,76 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import java.io.IOException;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
/**
* Test whether or not encryption settings propagate by choosing an invalid
* one. We expect the write to fail with a 400 bad request error
*/
public class ITestS3AEncryptionAlgorithmPropagation
extends AbstractS3ATestBase {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
"DES");
return conf;
}
@Test
public void testEncrypt0() throws Throwable {
writeThenReadFileToFailure(0);
}
@Test
public void testEncrypt256() throws Throwable {
writeThenReadFileToFailure(256);
}
/**
* Make this a no-op so test setup doesn't fail.
* @param path path path
* @throws IOException on any failure
*/
@Override
protected void mkdirs(Path path) throws IOException {
}
protected void writeThenReadFileToFailure(int len) throws IOException {
skipIfEncryptionTestsDisabled(getConfiguration());
describe("Create an encrypted file of size " + len);
try {
writeThenReadFile(methodName.getMethodName() + '-' + len, len);
fail("Expected an exception about an illegal encryption algorithm");
} catch (AWSS3IOException e) {
assertStatusCode(e, 400);
}
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.junit.Ignore;
import org.junit.Test;
/**
* Test whether or not encryption settings propagate by choosing an invalid
* one. We expect the S3AFileSystem to fail to initialize.
*/
@Ignore
public class ITestS3AEncryptionAlgorithmValidation
extends AbstractS3ATestBase {
@Test
public void testEncryptionAlgorithmSetToDES() throws Throwable {
//skip tests if they aren't enabled
assumeEnabled();
intercept(IOException.class, "Unknown Server Side algorithm DES", () -> {
Configuration conf = super.createConfiguration();
//DES is an invalid encryption algorithm
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM, "DES");
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
//extract the test FS
FileSystem fileSystem = contract.getTestFileSystem();
assertNotNull("null filesystem", fileSystem);
URI fsURI = fileSystem.getUri();
LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
assertEquals("wrong filesystem of " + fsURI,
contract.getScheme(), fsURI.getScheme());
fileSystem.initialize(fsURI, conf);
throw new Exception("Do not reach here");
});
}
@Test
public void testEncryptionAlgorithmSSECWithNoEncryptionKey() throws
Throwable {
//skip tests if they aren't enabled
assumeEnabled();
intercept(IllegalArgumentException.class, "The value of property " +
Constants.SERVER_SIDE_ENCRYPTION_KEY + " must not be null", () -> {
Configuration conf = super.createConfiguration();
//SSE-C must be configured with an encryption key
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
S3AEncryptionMethods.SSE_C.getMethod());
conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, null);
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
//extract the test FS
FileSystem fileSystem = contract.getTestFileSystem();
assertNotNull("null filesystem", fileSystem);
URI fsURI = fileSystem.getUri();
LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
assertEquals("wrong filesystem of " + fsURI,
contract.getScheme(), fsURI.getScheme());
fileSystem.initialize(fsURI, conf);
throw new Exception("Do not reach here");
});
}
@Test
public void testEncryptionAlgorithmSSECWithBlankEncryptionKey() throws
Throwable {
intercept(IOException.class, Constants.SSE_C_NO_KEY_ERROR, () -> {
Configuration conf = super.createConfiguration();
//SSE-C must be configured with an encryption key
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
S3AEncryptionMethods.SSE_C.getMethod());
conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
//extract the test FS
FileSystem fileSystem = contract.getTestFileSystem();
assertNotNull("null filesystem", fileSystem);
URI fsURI = fileSystem.getUri();
LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
assertEquals("wrong filesystem of " + fsURI,
contract.getScheme(), fsURI.getScheme());
fileSystem.initialize(fsURI, conf);
throw new Exception("Do not reach here");
});
}
@Test
public void testEncryptionAlgorithmSSES3WithEncryptionKey() throws
Throwable {
//skip tests if they aren't enabled
assumeEnabled();
intercept(IOException.class, Constants.SSE_S3_WITH_KEY_ERROR, () -> {
Configuration conf = super.createConfiguration();
//SSE-S3 cannot be configured with an encryption key
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
S3AEncryptionMethods.SSE_S3.getMethod());
conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
"4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
//extract the test FS
FileSystem fileSystem = contract.getTestFileSystem();
assertNotNull("null filesystem", fileSystem);
URI fsURI = fileSystem.getUri();
LOG.info("Test filesystem = {} implemented by {}", fsURI, fileSystem);
assertEquals("wrong filesystem of " + fsURI,
contract.getScheme(), fsURI.getScheme());
fileSystem.initialize(fsURI, conf);
throw new Exception("Do not reach here");
});
}
/**
* Make this a no-op so test setup doesn't fail.
* @param path path path
* @throws IOException on any failure
*/
@Override
protected void mkdirs(Path path) throws IOException {
}
}

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.junit.Test;
/**
* Concrete class that extends {@link AbstractTestS3AEncryption}
* and tests SSE-C encryption.
*/
public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
getSSEAlgorithm().getMethod());
conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
"4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
return conf;
}
/**
* This will create and write to a file using encryption key A, then attempt
* to read from it again with encryption key B. This will not work as it
* cannot decrypt the file.
* @throws Exception
*/
@Test
public void testCreateFileAndReadWithDifferentEncryptionKey() throws
Exception {
final Path[] path = new Path[1];
intercept(java.nio.file.AccessDeniedException.class,
"Forbidden (Service: Amazon S3; Status Code: 403;", () -> {
int len = 2048;
skipIfEncryptionTestsDisabled(getConfiguration());
describe("Create an encrypted file of size " + len);
String src = createFilename(len);
path[0] = writeThenReadFile(src, len);
Configuration conf = this.createConfiguration();
conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
"kX7SdwVc/1VXJr76kfKnkQ3ONYhxianyL2+C3rPVT9s=");
S3AContract contract = (S3AContract) createContract(conf);
contract.init();
//skip tests if they aren't enabled
assumeEnabled();
//extract the test FS
FileSystem fileSystem = contract.getTestFileSystem();
byte[] data = dataset(len, 'a', 'z');
ContractTestUtils.verifyFileContents(fileSystem, path[0], data);
throw new Exception("Fail");
});
rm(getFileSystem(), path[0], false, false);
}
@Override
protected S3AEncryptionMethods getSSEAlgorithm() {
return S3AEncryptionMethods.SSE_C;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
/**
* Run the encryption tests against the Fast output stream.
* This verifies that both file writing paths can encrypt their data.
*/
public class ITestS3AEncryptionSSECBlockOutputStream
extends AbstractTestS3AEncryption {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.setBoolean(Constants.FAST_UPLOAD, true);
conf.set(Constants.FAST_UPLOAD_BUFFER,
Constants.FAST_UPLOAD_BYTEBUFFER);
conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY,
"4niV/jPK5VFRHY+KNb6wtqYd4xXyMgdJ9XQJpcQUVbs=");
return conf;
}
@Override
protected S3AEncryptionMethods getSSEAlgorithm() {
return S3AEncryptionMethods.SSE_C;
}
}

View File

@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.hamcrest.CoreMatchers.containsString;
import java.io.IOException;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
/**
* Concrete class that extends {@link AbstractTestS3AEncryption}
* and tests SSE-KMS encryption when no KMS encryption key is provided and AWS
* uses the default. Since this resource changes for every account and region,
* there is no good way to explicitly set this value to do a equality check
* in the response.
*/
public class ITestS3AEncryptionSSEKMSDefaultKey
extends AbstractTestS3AEncryption {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
return conf;
}
@Override
protected S3AEncryptionMethods getSSEAlgorithm() {
return S3AEncryptionMethods.SSE_KMS;
}
@Override
protected void assertEncrypted(Path path) throws IOException {
ObjectMetadata md = getFileSystem().getObjectMetadata(path);
assertEquals("aws:kms", md.getSSEAlgorithm());
assertThat(md.getSSEAwsKmsKeyId(), containsString("arn:aws:kms:"));
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
/**
* Concrete class that extends {@link AbstractTestS3AEncryption}
* and tests SSE-KMS encryption. This requires the SERVER_SIDE_ENCRYPTION_KEY
* to be set in auth-keys.xml for it to run.
*/
public class ITestS3AEncryptionSSEKMSUserDefinedKey
extends AbstractTestS3AEncryption {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
if(StringUtils.isBlank(conf.get(Constants.SERVER_SIDE_ENCRYPTION_KEY))){
skip(Constants.SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " +
S3AEncryptionMethods.SSE_KMS.getMethod());
}
return conf;
}
@Override
protected S3AEncryptionMethods getSSEAlgorithm() {
return S3AEncryptionMethods.SSE_KMS;
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
/**
* Run the encryption tests against the Fast output stream.
* This verifies that both file writing paths can encrypt their data. This
* requires the SERVER_SIDE_ENCRYPTION_KEY to be set in auth-keys.xml for it
* to run.
*/
public class ITestS3AEncryptionSSEKMSUserDefinedKeyBlockOutputStream
extends AbstractTestS3AEncryption {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
if(StringUtils.isBlank(conf.get(Constants.SERVER_SIDE_ENCRYPTION_KEY))){
skip(Constants.SERVER_SIDE_ENCRYPTION_KEY+ " is not set for " +
S3AEncryptionMethods.SSE_KMS.getMethod());
}
conf.setBoolean(Constants.FAST_UPLOAD, true);
conf.set(Constants.FAST_UPLOAD_BUFFER,
Constants.FAST_UPLOAD_BYTEBUFFER);
return conf;
}
@Override
protected S3AEncryptionMethods getSSEAlgorithm() {
return S3AEncryptionMethods.SSE_KMS;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import org.apache.hadoop.conf.Configuration;
/**
* Concrete class that extends {@link AbstractTestS3AEncryption}
* and tests SSE-S3 encryption.
*/
public class ITestS3AEncryptionSSES3 extends AbstractTestS3AEncryption {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
S3ATestUtils.disableFilesystemCaching(conf);
//must specify encryption key as empty because SSE-S3 does not allow it,
//nor can it be null.
conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
return conf;
}
@Override
protected S3AEncryptionMethods getSSEAlgorithm() {
return S3AEncryptionMethods.SSE_S3;
}
}

View File

@ -23,7 +23,8 @@ import org.apache.hadoop.conf.Configuration;
/** /**
* Run the encryption tests against the block output stream. * Run the encryption tests against the block output stream.
*/ */
public class ITestS3AEncryptionBlockOutputStream extends ITestS3AEncryption { public class ITestS3AEncryptionSSES3BlockOutputStream
extends AbstractTestS3AEncryption {
@Override @Override
protected Configuration createConfiguration() { protected Configuration createConfiguration() {
@ -31,6 +32,14 @@ public class ITestS3AEncryptionBlockOutputStream extends ITestS3AEncryption {
conf.setBoolean(Constants.FAST_UPLOAD, true); conf.setBoolean(Constants.FAST_UPLOAD, true);
conf.set(Constants.FAST_UPLOAD_BUFFER, conf.set(Constants.FAST_UPLOAD_BUFFER,
Constants.FAST_UPLOAD_BYTEBUFFER); Constants.FAST_UPLOAD_BYTEBUFFER);
//must specify encryption key as empty because SSE-S3 does not allow it,
//nor can it be null.
conf.set(Constants.SERVER_SIDE_ENCRYPTION_KEY, "");
return conf; return conf;
} }
@Override
protected S3AEncryptionMethods getSSEAlgorithm() {
return S3AEncryptionMethods.SSE_S3;
}
} }

View File

@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
import java.util.Collections; import java.util.Collections;
import java.util.Date; import java.util.Date;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest; import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.ObjectMetadata;
@ -34,6 +35,9 @@ import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.junit.Test; import org.junit.Test;
/** /**
@ -48,7 +52,8 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
ObjectMetadata meta = new ObjectMetadata(); ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(1L); meta.setContentLength(1L);
meta.setLastModified(new Date(2L)); meta.setLastModified(new Date(2L));
when(s3.getObjectMetadata(BUCKET, key)).thenReturn(meta); when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
.thenReturn(meta);
FileStatus stat = fs.getFileStatus(path); FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat); assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath()); assertEquals(fs.makeQualified(path), stat.getPath());
@ -61,10 +66,13 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
public void testFakeDirectory() throws Exception { public void testFakeDirectory() throws Exception {
Path path = new Path("/dir"); Path path = new Path("/dir");
String key = path.toUri().getPath().substring(1); String key = path.toUri().getPath().substring(1);
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
.thenThrow(NOT_FOUND);
ObjectMetadata meta = new ObjectMetadata(); ObjectMetadata meta = new ObjectMetadata();
meta.setContentLength(0L); meta.setContentLength(0L);
when(s3.getObjectMetadata(BUCKET, key + "/")).thenReturn(meta); when(s3.getObjectMetadata(argThat(
correctGetMetadataRequest(BUCKET, key + "/"))
)).thenReturn(meta);
FileStatus stat = fs.getFileStatus(path); FileStatus stat = fs.getFileStatus(path);
assertNotNull(stat); assertNotNull(stat);
assertEquals(fs.makeQualified(path), stat.getPath()); assertEquals(fs.makeQualified(path), stat.getPath());
@ -75,8 +83,11 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
public void testImplicitDirectory() throws Exception { public void testImplicitDirectory() throws Exception {
Path path = new Path("/dir"); Path path = new Path("/dir");
String key = path.toUri().getPath().substring(1); String key = path.toUri().getPath().substring(1);
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND); .thenThrow(NOT_FOUND);
when(s3.getObjectMetadata(argThat(
correctGetMetadataRequest(BUCKET, key + "/"))
)).thenThrow(NOT_FOUND);
ObjectListing objects = mock(ObjectListing.class); ObjectListing objects = mock(ObjectListing.class);
when(objects.getCommonPrefixes()).thenReturn( when(objects.getCommonPrefixes()).thenReturn(
Collections.singletonList("dir/")); Collections.singletonList("dir/"));
@ -93,8 +104,11 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
public void testRoot() throws Exception { public void testRoot() throws Exception {
Path path = new Path("/"); Path path = new Path("/");
String key = path.toUri().getPath().substring(1); String key = path.toUri().getPath().substring(1);
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND); .thenThrow(NOT_FOUND);
when(s3.getObjectMetadata(argThat(
correctGetMetadataRequest(BUCKET, key + "/")
))).thenThrow(NOT_FOUND);
ObjectListing objects = mock(ObjectListing.class); ObjectListing objects = mock(ObjectListing.class);
when(objects.getCommonPrefixes()).thenReturn( when(objects.getCommonPrefixes()).thenReturn(
Collections.<String>emptyList()); Collections.<String>emptyList());
@ -112,8 +126,11 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
public void testNotFound() throws Exception { public void testNotFound() throws Exception {
Path path = new Path("/dir"); Path path = new Path("/dir");
String key = path.toUri().getPath().substring(1); String key = path.toUri().getPath().substring(1);
when(s3.getObjectMetadata(BUCKET, key)).thenThrow(NOT_FOUND); when(s3.getObjectMetadata(argThat(correctGetMetadataRequest(BUCKET, key))))
when(s3.getObjectMetadata(BUCKET, key + "/")).thenThrow(NOT_FOUND); .thenThrow(NOT_FOUND);
when(s3.getObjectMetadata(argThat(
correctGetMetadataRequest(BUCKET, key + "/")
))).thenThrow(NOT_FOUND);
ObjectListing objects = mock(ObjectListing.class); ObjectListing objects = mock(ObjectListing.class);
when(objects.getCommonPrefixes()).thenReturn( when(objects.getCommonPrefixes()).thenReturn(
Collections.<String>emptyList()); Collections.<String>emptyList());
@ -123,4 +140,26 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
exception.expect(FileNotFoundException.class); exception.expect(FileNotFoundException.class);
fs.getFileStatus(path); fs.getFileStatus(path);
} }
private Matcher<GetObjectMetadataRequest> correctGetMetadataRequest(
String bucket, String key) {
return new BaseMatcher<GetObjectMetadataRequest>() {
@Override
public void describeTo(Description description) {
description.appendText("bucket and key match");
}
@Override
public boolean matches(Object o) {
if(o instanceof GetObjectMetadataRequest) {
GetObjectMetadataRequest getObjectMetadataRequest =
(GetObjectMetadataRequest)o;
return getObjectMetadataRequest.getBucketName().equals(bucket)
&& getObjectMetadataRequest.getKey().equals(key);
}
return false;
}
};
}
} }