HADOOP-16794. S3A reverts KMS encryption to the bucket's default KMS key in rename/copy.
AreContributed by Mukund Thakur. This addresses an issue which surfaced with KMS encryption: the wrong KMS key could be picked up in the S3 COPY operation, so renamed files, while encrypted, would end up with the bucket default key. As well as adding tests in the new suite ITestS3AEncryptionWithDefaultS3Settings, AbstractSTestS3AHugeFiles has a new test method to verify that the encryption settings also work for large files copied via multipart operations.
This commit is contained in:
parent
e553eda9cd
commit
f864ef7429
|
@ -3459,7 +3459,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
|
new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
|
||||||
changeTracker.maybeApplyConstraint(copyObjectRequest);
|
changeTracker.maybeApplyConstraint(copyObjectRequest);
|
||||||
|
|
||||||
setOptionalCopyObjectRequestParameters(copyObjectRequest);
|
setOptionalCopyObjectRequestParameters(srcom, copyObjectRequest);
|
||||||
copyObjectRequest.setCannedAccessControlList(cannedACL);
|
copyObjectRequest.setCannedAccessControlList(cannedACL);
|
||||||
copyObjectRequest.setNewObjectMetadata(dstom);
|
copyObjectRequest.setNewObjectMetadata(dstom);
|
||||||
Optional.ofNullable(srcom.getStorageClass())
|
Optional.ofNullable(srcom.getStorageClass())
|
||||||
|
@ -3489,6 +3489,41 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Propagate encryption parameters from source file if set else use the
|
||||||
|
* current filesystem encryption settings.
|
||||||
|
* @param srcom source object meta.
|
||||||
|
* @param copyObjectRequest copy object request body.
|
||||||
|
*/
|
||||||
|
private void setOptionalCopyObjectRequestParameters(
|
||||||
|
ObjectMetadata srcom,
|
||||||
|
CopyObjectRequest copyObjectRequest) {
|
||||||
|
String sourceKMSId = srcom.getSSEAwsKmsKeyId();
|
||||||
|
if (isNotEmpty(sourceKMSId)) {
|
||||||
|
// source KMS ID is propagated
|
||||||
|
LOG.debug("Propagating SSE-KMS settings from source {}",
|
||||||
|
sourceKMSId);
|
||||||
|
copyObjectRequest.setSSEAwsKeyManagementParams(
|
||||||
|
new SSEAwsKeyManagementParams(sourceKMSId));
|
||||||
|
}
|
||||||
|
switch(getServerSideEncryptionAlgorithm()) {
|
||||||
|
/**
|
||||||
|
* Overriding with client encryption settings.
|
||||||
|
*/
|
||||||
|
case SSE_C:
|
||||||
|
generateSSECustomerKey().ifPresent(customerKey -> {
|
||||||
|
copyObjectRequest.setSourceSSECustomerKey(customerKey);
|
||||||
|
copyObjectRequest.setDestinationSSECustomerKey(customerKey);
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
case SSE_KMS:
|
||||||
|
generateSSEAwsKeyParams().ifPresent(
|
||||||
|
copyObjectRequest::setSSEAwsKeyManagementParams);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the optional parameters when initiating the request (encryption,
|
* Set the optional parameters when initiating the request (encryption,
|
||||||
* headers, storage, etc).
|
* headers, storage, etc).
|
||||||
|
@ -3526,23 +3561,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
return getAmazonS3Client().initiateMultipartUpload(request);
|
return getAmazonS3Client().initiateMultipartUpload(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void setOptionalCopyObjectRequestParameters(
|
|
||||||
CopyObjectRequest copyObjectRequest) throws IOException {
|
|
||||||
switch (getServerSideEncryptionAlgorithm()) {
|
|
||||||
case SSE_KMS:
|
|
||||||
generateSSEAwsKeyParams().ifPresent(
|
|
||||||
copyObjectRequest::setSSEAwsKeyManagementParams);
|
|
||||||
break;
|
|
||||||
case SSE_C:
|
|
||||||
generateSSECustomerKey().ifPresent(customerKey -> {
|
|
||||||
copyObjectRequest.setSourceSSECustomerKey(customerKey);
|
|
||||||
copyObjectRequest.setDestinationSSECustomerKey(customerKey);
|
|
||||||
});
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void setOptionalPutRequestParameters(PutObjectRequest request) {
|
private void setOptionalPutRequestParameters(PutObjectRequest request) {
|
||||||
generateSSEAwsKeyParams().ifPresent(request::setSSEAwsKeyManagementParams);
|
generateSSEAwsKeyParams().ifPresent(request::setSSEAwsKeyManagementParams);
|
||||||
generateSSECustomerKey().ifPresent(request::setSSECustomerKey);
|
generateSSECustomerKey().ifPresent(request::setSSECustomerKey);
|
||||||
|
|
|
@ -41,7 +41,6 @@ import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestDynamoTablePrefix;
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
|
public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
|
||||||
implements S3ATestConstants {
|
implements S3ATestConstants {
|
||||||
|
|
||||||
protected static final Logger LOG =
|
protected static final Logger LOG =
|
||||||
LoggerFactory.getLogger(AbstractS3ATestBase.class);
|
LoggerFactory.getLogger(AbstractS3ATestBase.class);
|
||||||
|
|
||||||
|
@ -146,17 +145,4 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
|
||||||
protected String getTestTableName(String suffix) {
|
protected String getTestTableName(String suffix) {
|
||||||
return getTestDynamoTablePrefix(getConfiguration()) + suffix;
|
return getTestDynamoTablePrefix(getConfiguration()) + suffix;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Assert that an exception failed with a specific status code.
|
|
||||||
* @param e exception
|
|
||||||
* @param code expected status code
|
|
||||||
* @throws AWSServiceIOException rethrown if the status code does not match.
|
|
||||||
*/
|
|
||||||
protected void assertStatusCode(AWSServiceIOException e, int code)
|
|
||||||
throws AWSServiceIOException {
|
|
||||||
if (e.getStatusCode() != code) {
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,11 +20,8 @@ package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import com.amazonaws.services.s3.model.ObjectMetadata;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import org.apache.commons.codec.digest.DigestUtils;
|
|
||||||
import org.apache.commons.net.util.Base64;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
@ -33,7 +30,6 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
|
||||||
|
@ -45,20 +41,29 @@ import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
|
||||||
*/
|
*/
|
||||||
public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
|
public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
|
||||||
|
|
||||||
protected static final String AWS_KMS_SSE_ALGORITHM = "aws:kms";
|
|
||||||
|
|
||||||
protected static final String SSE_C_ALGORITHM = "AES256";
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Configuration createConfiguration() {
|
protected Configuration createConfiguration() {
|
||||||
Configuration conf = super.createConfiguration();
|
Configuration conf = super.createConfiguration();
|
||||||
S3ATestUtils.disableFilesystemCaching(conf);
|
S3ATestUtils.disableFilesystemCaching(conf);
|
||||||
|
patchConfigurationEncryptionSettings(conf);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This removes the encryption settings from the
|
||||||
|
* configuration and then sets the
|
||||||
|
* fs.s3a.server-side-encryption-algorithm value to
|
||||||
|
* be that of {@code getSSEAlgorithm()}.
|
||||||
|
* Called in {@code createConfiguration()}.
|
||||||
|
* @param conf configuration to patch.
|
||||||
|
*/
|
||||||
|
protected void patchConfigurationEncryptionSettings(
|
||||||
|
final Configuration conf) {
|
||||||
removeBaseAndBucketOverrides(conf,
|
removeBaseAndBucketOverrides(conf,
|
||||||
SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
||||||
SERVER_SIDE_ENCRYPTION_KEY);
|
SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
||||||
getSSEAlgorithm().getMethod());
|
getSSEAlgorithm().getMethod());
|
||||||
return conf;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final int[] SIZES = {
|
private static final int[] SIZES = {
|
||||||
|
@ -107,10 +112,15 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
|
||||||
validateEncrytionSecrets(secrets);
|
validateEncrytionSecrets(secrets);
|
||||||
writeDataset(fs, src, data, data.length, 1024 * 1024, true);
|
writeDataset(fs, src, data, data.length, 1024 * 1024, true);
|
||||||
ContractTestUtils.verifyFileContents(fs, src, data);
|
ContractTestUtils.verifyFileContents(fs, src, data);
|
||||||
Path dest = path(src.getName() + "-copy");
|
// this file will be encrypted
|
||||||
fs.rename(src, dest);
|
assertEncrypted(src);
|
||||||
ContractTestUtils.verifyFileContents(fs, dest, data);
|
|
||||||
assertEncrypted(dest);
|
Path targetDir = path("target");
|
||||||
|
mkdirs(targetDir);
|
||||||
|
fs.rename(src, targetDir);
|
||||||
|
Path renamedFile = new Path(targetDir, src.getName());
|
||||||
|
ContractTestUtils.verifyFileContents(fs, renamedFile, data);
|
||||||
|
assertEncrypted(renamedFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -147,42 +157,14 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
|
||||||
* @throws IOException on a failure
|
* @throws IOException on a failure
|
||||||
*/
|
*/
|
||||||
protected void assertEncrypted(Path path) throws IOException {
|
protected void assertEncrypted(Path path) throws IOException {
|
||||||
ObjectMetadata md = getFileSystem().getObjectMetadata(path);
|
|
||||||
switch(getSSEAlgorithm()) {
|
|
||||||
case SSE_C:
|
|
||||||
assertNull("Metadata algorithm should have been null",
|
|
||||||
md.getSSEAlgorithm());
|
|
||||||
assertEquals("Wrong SSE-C algorithm", SSE_C_ALGORITHM, md.getSSECustomerAlgorithm());
|
|
||||||
String md5Key = convertKeyToMd5();
|
|
||||||
assertEquals("getSSECustomerKeyMd5() wrong", md5Key, md.getSSECustomerKeyMd5());
|
|
||||||
break;
|
|
||||||
case SSE_KMS:
|
|
||||||
assertEquals(AWS_KMS_SSE_ALGORITHM, md.getSSEAlgorithm());
|
|
||||||
//S3 will return full arn of the key, so specify global arn in properties
|
//S3 will return full arn of the key, so specify global arn in properties
|
||||||
assertEquals(this.getConfiguration().
|
String kmsKeyArn = this.getConfiguration().
|
||||||
getTrimmed(SERVER_SIDE_ENCRYPTION_KEY),
|
getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
md.getSSEAwsKmsKeyId());
|
S3AEncryptionMethods algorithm = getSSEAlgorithm();
|
||||||
break;
|
EncryptionTestUtils.assertEncrypted(getFileSystem(),
|
||||||
default:
|
path,
|
||||||
assertEquals("AES256", md.getSSEAlgorithm());
|
algorithm,
|
||||||
}
|
kmsKeyArn);
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decodes the SERVER_SIDE_ENCRYPTION_KEY from base64 into an AES key, then
|
|
||||||
* gets the md5 of it, then encodes it in base64 so it will match the version
|
|
||||||
* that AWS returns to us.
|
|
||||||
*
|
|
||||||
* @return md5'd base64 encoded representation of the server side encryption
|
|
||||||
* key
|
|
||||||
*/
|
|
||||||
private String convertKeyToMd5() {
|
|
||||||
String base64Key = getFileSystem().getConf().getTrimmed(
|
|
||||||
SERVER_SIDE_ENCRYPTION_KEY
|
|
||||||
);
|
|
||||||
byte[] key = Base64.decodeBase64(base64Key);
|
|
||||||
byte[] md5 = DigestUtils.md5(key);
|
|
||||||
return Base64.encodeBase64String(md5).trim();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract S3AEncryptionMethods getSSEAlgorithm();
|
protected abstract S3AEncryptionMethods getSSEAlgorithm();
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||||
|
|
||||||
|
import org.apache.commons.codec.digest.DigestUtils;
|
||||||
|
import org.apache.commons.net.util.Base64;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
|
public final class EncryptionTestUtils {
|
||||||
|
|
||||||
|
/** Private constructor */
|
||||||
|
private EncryptionTestUtils() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final String AWS_KMS_SSE_ALGORITHM = "aws:kms";
|
||||||
|
|
||||||
|
public static final String SSE_C_ALGORITHM = "AES256";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decodes the SERVER_SIDE_ENCRYPTION_KEY from base64 into an AES key, then
|
||||||
|
* gets the md5 of it, then encodes it in base64 so it will match the version
|
||||||
|
* that AWS returns to us.
|
||||||
|
*
|
||||||
|
* @return md5'd base64 encoded representation of the server side encryption
|
||||||
|
* key
|
||||||
|
*/
|
||||||
|
public static String convertKeyToMd5(FileSystem fs) {
|
||||||
|
String base64Key = fs.getConf().getTrimmed(
|
||||||
|
SERVER_SIDE_ENCRYPTION_KEY
|
||||||
|
);
|
||||||
|
byte[] key = Base64.decodeBase64(base64Key);
|
||||||
|
byte[] md5 = DigestUtils.md5(key);
|
||||||
|
return Base64.encodeBase64String(md5).trim();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that a path is encrypted with right encryption settings.
|
||||||
|
* @param path file path.
|
||||||
|
* @param algorithm encryption algorithm.
|
||||||
|
* @param kmsKeyArn full kms key.
|
||||||
|
*/
|
||||||
|
public static void assertEncrypted(S3AFileSystem fs,
|
||||||
|
final Path path,
|
||||||
|
final S3AEncryptionMethods algorithm,
|
||||||
|
final String kmsKeyArn)
|
||||||
|
throws IOException {
|
||||||
|
ObjectMetadata md = fs.getObjectMetadata(path);
|
||||||
|
String details = String.format(
|
||||||
|
"file %s with encryption algorithm %s and key %s",
|
||||||
|
path,
|
||||||
|
md.getSSEAlgorithm(),
|
||||||
|
md.getSSEAwsKmsKeyId());
|
||||||
|
switch(algorithm) {
|
||||||
|
case SSE_C:
|
||||||
|
assertNull("Metadata algorithm should have been null in "
|
||||||
|
+ details,
|
||||||
|
md.getSSEAlgorithm());
|
||||||
|
assertEquals("Wrong SSE-C algorithm in "
|
||||||
|
+ details,
|
||||||
|
SSE_C_ALGORITHM, md.getSSECustomerAlgorithm());
|
||||||
|
String md5Key = convertKeyToMd5(fs);
|
||||||
|
assertEquals("getSSECustomerKeyMd5() wrong in " + details,
|
||||||
|
md5Key, md.getSSECustomerKeyMd5());
|
||||||
|
break;
|
||||||
|
case SSE_KMS:
|
||||||
|
assertEquals("Wrong algorithm in " + details,
|
||||||
|
AWS_KMS_SSE_ALGORITHM, md.getSSEAlgorithm());
|
||||||
|
assertEquals("Wrong KMS key in " + details,
|
||||||
|
kmsKeyArn,
|
||||||
|
md.getSSEAwsKmsKeyId());
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
assertEquals("AES256", md.getSSEAlgorithm());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -52,7 +52,8 @@ public class ITestS3AEncryptionSSEKMSDefaultKey
|
||||||
@Override
|
@Override
|
||||||
protected void assertEncrypted(Path path) throws IOException {
|
protected void assertEncrypted(Path path) throws IOException {
|
||||||
ObjectMetadata md = getFileSystem().getObjectMetadata(path);
|
ObjectMetadata md = getFileSystem().getObjectMetadata(path);
|
||||||
assertEquals("SSE Algorithm", AWS_KMS_SSE_ALGORITHM, md.getSSEAlgorithm());
|
assertEquals("SSE Algorithm", EncryptionTestUtils.AWS_KMS_SSE_ALGORITHM,
|
||||||
|
md.getSSEAlgorithm());
|
||||||
assertThat(md.getSSEAwsKmsKeyId(), containsString("arn:aws:kms:"));
|
assertThat(md.getSSEAwsKmsKeyId(), containsString("arn:aws:kms:"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Concrete class that extends {@link AbstractTestS3AEncryption}
|
||||||
|
* and tests already configured bucket level encryption using s3 console.
|
||||||
|
* This requires the SERVER_SIDE_ENCRYPTION_KEY
|
||||||
|
* to be set in auth-keys.xml for it to run. The value should match with the
|
||||||
|
* kms key set for the bucket.
|
||||||
|
* See HADOOP-16794.
|
||||||
|
*/
|
||||||
|
public class ITestS3AEncryptionWithDefaultS3Settings extends
|
||||||
|
AbstractTestS3AEncryption {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
super.setup();
|
||||||
|
// get the KMS key for this test.
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
Configuration c = fs.getConf();
|
||||||
|
String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
|
if (StringUtils.isBlank(kmsKey)) {
|
||||||
|
skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
|
||||||
|
SSE_KMS.getMethod());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void patchConfigurationEncryptionSettings(
|
||||||
|
final Configuration conf) {
|
||||||
|
removeBaseAndBucketOverrides(conf,
|
||||||
|
SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
||||||
|
conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
||||||
|
getSSEAlgorithm().getMethod());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setting this to NONE as we don't want to overwrite
|
||||||
|
* already configured encryption settings.
|
||||||
|
* @return the algorithm
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected S3AEncryptionMethods getSSEAlgorithm() {
|
||||||
|
return S3AEncryptionMethods.NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The check here is that the object is encrypted
|
||||||
|
* <i>and</i> that the encryption key is the KMS key
|
||||||
|
* provided, not any default key.
|
||||||
|
* @param path path
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected void assertEncrypted(Path path) throws IOException {
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
Configuration c = fs.getConf();
|
||||||
|
String kmsKey = c.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
|
EncryptionTestUtils.assertEncrypted(fs, path, SSE_KMS, kmsKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Ignore
|
||||||
|
@Test
|
||||||
|
public void testEncryptionSettingPropagation() throws Throwable {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
@Ignore
|
||||||
|
@Test
|
||||||
|
public void testEncryption() throws Throwable {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEncryptionOverRename2() throws Throwable {
|
||||||
|
S3AFileSystem fs = getFileSystem();
|
||||||
|
|
||||||
|
// write the file with the unencrypted FS.
|
||||||
|
// this will pick up whatever defaults we have.
|
||||||
|
Path src = path(createFilename(1024));
|
||||||
|
byte[] data = dataset(1024, 'a', 'z');
|
||||||
|
EncryptionSecrets secrets = fs.getEncryptionSecrets();
|
||||||
|
validateEncrytionSecrets(secrets);
|
||||||
|
writeDataset(fs, src, data, data.length, 1024 * 1024, true);
|
||||||
|
ContractTestUtils.verifyFileContents(fs, src, data);
|
||||||
|
|
||||||
|
// fs2 conf will always use SSE-KMS
|
||||||
|
Configuration fs2Conf = new Configuration(fs.getConf());
|
||||||
|
fs2Conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
||||||
|
S3AEncryptionMethods.SSE_KMS.getMethod());
|
||||||
|
try (FileSystem kmsFS = FileSystem.newInstance(fs.getUri(), fs2Conf)) {
|
||||||
|
Path targetDir = path("target");
|
||||||
|
kmsFS.mkdirs(targetDir);
|
||||||
|
ContractTestUtils.rename(kmsFS, src, targetDir);
|
||||||
|
Path renamedFile = new Path(targetDir, src.getName());
|
||||||
|
ContractTestUtils.verifyFileContents(fs, renamedFile, data);
|
||||||
|
String kmsKey = fs2Conf.getTrimmed(SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
|
// we assert that the renamed file has picked up the KMS key of our FS
|
||||||
|
EncryptionTestUtils.assertEncrypted(fs, renamedFile, SSE_KMS, kmsKey);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -457,6 +457,30 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
|
||||||
logFSState();
|
logFSState();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify source file encryption key.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_090_verifyRenameSourceEncryption() throws IOException {
|
||||||
|
if(isEncrypted(getFileSystem())) {
|
||||||
|
assertEncrypted(getHugefile());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void assertEncrypted(Path hugeFile) throws IOException {
|
||||||
|
//Concrete classes will have implementation.
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the encryption is enabled for the file system.
|
||||||
|
* @param fileSystem
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
protected boolean isEncrypted(S3AFileSystem fileSystem) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test_100_renameHugeFile() throws Throwable {
|
public void test_100_renameHugeFile() throws Throwable {
|
||||||
assumeHugeFileExists();
|
assumeHugeFileExists();
|
||||||
|
@ -485,6 +509,20 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
|
||||||
bandwidth(timer2, size);
|
bandwidth(timer2, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test to verify target file encryption key.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void test_110_verifyRenameDestEncryption() throws IOException {
|
||||||
|
if(isEncrypted(getFileSystem())) {
|
||||||
|
/**
|
||||||
|
* Using hugeFile again as hugeFileRenamed is renamed back
|
||||||
|
* to hugeFile.
|
||||||
|
*/
|
||||||
|
assertEncrypted(hugefile);
|
||||||
|
}
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Cleanup: delete the files.
|
* Cleanup: delete the files.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,76 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a.scale;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
|
import org.apache.hadoop.fs.s3a.EncryptionTestUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to test SSE_KMS encryption settings for huge files.
|
||||||
|
* Tests will only run if value of {@link Constants#SERVER_SIDE_ENCRYPTION_KEY}
|
||||||
|
* is set in the configuration. The testing bucket must be configured with this
|
||||||
|
* same key else test might fail.
|
||||||
|
*/
|
||||||
|
public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setup() throws Exception {
|
||||||
|
Configuration c = new Configuration();
|
||||||
|
String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
|
if (StringUtils.isBlank(kmsKey)) {
|
||||||
|
skip(SERVER_SIDE_ENCRYPTION_KEY + " is not set for " +
|
||||||
|
SSE_KMS.getMethod());
|
||||||
|
}
|
||||||
|
super.setup();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getBlockOutputBufferName() {
|
||||||
|
return Constants.FAST_UPLOAD_BUFFER_ARRAY;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param fileSystem
|
||||||
|
* @return true if {@link Constants#SERVER_SIDE_ENCRYPTION_KEY} is set
|
||||||
|
* in the config.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected boolean isEncrypted(S3AFileSystem fileSystem) {
|
||||||
|
Configuration c = new Configuration();
|
||||||
|
return c.get(SERVER_SIDE_ENCRYPTION_KEY) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertEncrypted(Path hugeFile) throws IOException {
|
||||||
|
Configuration c = new Configuration();
|
||||||
|
String kmsKey = c.get(SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
|
EncryptionTestUtils.assertEncrypted(getFileSystem(), hugeFile,
|
||||||
|
SSE_KMS, kmsKey);
|
||||||
|
}
|
||||||
|
}
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||||
|
import org.apache.hadoop.fs.s3a.AWSServiceIOException;
|
||||||
import org.apache.hadoop.util.DurationInfo;
|
import org.apache.hadoop.util.DurationInfo;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
|
||||||
|
@ -135,4 +136,17 @@ public final class ExtraAssertions {
|
||||||
thrown);
|
thrown);
|
||||||
return (T)cause;
|
return (T)cause;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that an exception failed with a specific status code.
|
||||||
|
* @param e exception
|
||||||
|
* @param code expected status code
|
||||||
|
* @throws AWSServiceIOException rethrown if the status code does not match.
|
||||||
|
*/
|
||||||
|
protected void assertStatusCode(AWSServiceIOException e, int code)
|
||||||
|
throws AWSServiceIOException {
|
||||||
|
if (e.getStatusCode() != code) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue