HADOOP-17922. move to fs.s3a.encryption.algorithm - JCEKS integration (#3466)
The ordering of the resolution of new and deprecated s3a encryption options & secrets is the same when JCEKS and other hadoop credentials stores are used to store them as when they are in XML files: per-bucket settings always take priority over global values, even when the bucket-level options use the old option names. Contributed by Mehakmeet Singh and Steve Loughran
This commit is contained in:
parent
7097e5b793
commit
d609f44aa0
|
@ -58,8 +58,9 @@ import static org.apache.hadoop.fs.s3a.Constants.AWS_REGION;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
|
import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_CENTRAL_REGION;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
|
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
|
import static org.apache.hadoop.fs.s3a.Constants.EXPERIMENTAL_AWS_INTERNAL_THROTTLING_DEFAULT;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -96,6 +97,9 @@ public class DefaultS3ClientFactory extends Configured
|
||||||
/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
|
/** Exactly once log to inform about ignoring the AWS-SDK Warnings for CSE. */
|
||||||
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
|
private static final LogExactlyOnce IGNORE_CSE_WARN = new LogExactlyOnce(LOG);
|
||||||
|
|
||||||
|
/** Bucket name. */
|
||||||
|
private String bucket;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create the client by preparing the AwsConf configuration
|
* Create the client by preparing the AwsConf configuration
|
||||||
* and then invoking {@code buildAmazonS3Client()}.
|
* and then invoking {@code buildAmazonS3Client()}.
|
||||||
|
@ -105,9 +109,10 @@ public class DefaultS3ClientFactory extends Configured
|
||||||
final URI uri,
|
final URI uri,
|
||||||
final S3ClientCreationParameters parameters) throws IOException {
|
final S3ClientCreationParameters parameters) throws IOException {
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
|
bucket = uri.getHost();
|
||||||
final ClientConfiguration awsConf = S3AUtils
|
final ClientConfiguration awsConf = S3AUtils
|
||||||
.createAwsConf(conf,
|
.createAwsConf(conf,
|
||||||
uri.getHost(),
|
bucket,
|
||||||
Constants.AWS_SERVICE_IDENTIFIER_S3);
|
Constants.AWS_SERVICE_IDENTIFIER_S3);
|
||||||
// add any headers
|
// add any headers
|
||||||
parameters.getHeaders().forEach((h, v) ->
|
parameters.getHeaders().forEach((h, v) ->
|
||||||
|
@ -126,10 +131,13 @@ public class DefaultS3ClientFactory extends Configured
|
||||||
awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix());
|
awsConf.setUserAgentSuffix(parameters.getUserAgentSuffix());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Get the encryption method for this bucket.
|
||||||
|
S3AEncryptionMethods encryptionMethods =
|
||||||
|
getEncryptionAlgorithm(bucket, conf);
|
||||||
try {
|
try {
|
||||||
if (S3AEncryptionMethods.getMethod(S3AUtils.
|
// If CSE is enabled then build a S3EncryptionClient.
|
||||||
lookupPassword(conf, S3_ENCRYPTION_ALGORITHM, null))
|
if (S3AEncryptionMethods.CSE_KMS.getMethod()
|
||||||
.equals(S3AEncryptionMethods.CSE_KMS)) {
|
.equals(encryptionMethods.getMethod())) {
|
||||||
return buildAmazonS3EncryptionClient(
|
return buildAmazonS3EncryptionClient(
|
||||||
awsConf,
|
awsConf,
|
||||||
parameters);
|
parameters);
|
||||||
|
@ -163,12 +171,11 @@ public class DefaultS3ClientFactory extends Configured
|
||||||
new AmazonS3EncryptionClientV2Builder();
|
new AmazonS3EncryptionClientV2Builder();
|
||||||
Configuration conf = getConf();
|
Configuration conf = getConf();
|
||||||
|
|
||||||
//CSE-KMS Method
|
// CSE-KMS Method
|
||||||
String kmsKeyId = S3AUtils.lookupPassword(conf,
|
String kmsKeyId = getS3EncryptionKey(bucket, conf, true);
|
||||||
S3_ENCRYPTION_KEY, null);
|
|
||||||
// Check if kmsKeyID is not null
|
// Check if kmsKeyID is not null
|
||||||
Preconditions.checkArgument(kmsKeyId != null, "CSE-KMS method "
|
Preconditions.checkArgument(!StringUtils.isBlank(kmsKeyId), "CSE-KMS "
|
||||||
+ "requires KMS key ID. Use " + S3_ENCRYPTION_KEY
|
+ "method requires KMS key ID. Use " + S3_ENCRYPTION_KEY
|
||||||
+ " property to set it. ");
|
+ " property to set it. ");
|
||||||
|
|
||||||
EncryptionMaterialsProvider materialsProvider =
|
EncryptionMaterialsProvider materialsProvider =
|
||||||
|
|
|
@ -25,31 +25,45 @@ import org.apache.commons.lang3.StringUtils;
|
||||||
/**
|
/**
|
||||||
* This enum is to centralize the encryption methods and
|
* This enum is to centralize the encryption methods and
|
||||||
* the value required in the configuration.
|
* the value required in the configuration.
|
||||||
*
|
|
||||||
* There's two enum values for the two client encryption mechanisms the AWS
|
|
||||||
* S3 SDK supports, even though these are not currently supported in S3A.
|
|
||||||
* This is to aid supporting CSE in some form in future, fundamental
|
|
||||||
* issues about file length of encrypted data notwithstanding.
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public enum S3AEncryptionMethods {
|
public enum S3AEncryptionMethods {
|
||||||
|
|
||||||
NONE("", false),
|
NONE("", false, false),
|
||||||
SSE_S3("AES256", true),
|
SSE_S3("AES256", true, false),
|
||||||
SSE_KMS("SSE-KMS", true),
|
SSE_KMS("SSE-KMS", true, false),
|
||||||
SSE_C("SSE-C", true),
|
SSE_C("SSE-C", true, true),
|
||||||
CSE_KMS("CSE-KMS", false),
|
CSE_KMS("CSE-KMS", false, true),
|
||||||
CSE_CUSTOM("CSE-CUSTOM", false);
|
CSE_CUSTOM("CSE-CUSTOM", false, true);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Error string when {@link #getMethod(String)} fails.
|
||||||
|
* Used in tests.
|
||||||
|
*/
|
||||||
static final String UNKNOWN_ALGORITHM
|
static final String UNKNOWN_ALGORITHM
|
||||||
= "Unknown encryption algorithm ";
|
= "Unknown encryption algorithm ";
|
||||||
|
|
||||||
private String method;
|
/**
|
||||||
private boolean serverSide;
|
* What is the encryption method?
|
||||||
|
*/
|
||||||
|
private final String method;
|
||||||
|
|
||||||
S3AEncryptionMethods(String method, final boolean serverSide) {
|
/**
|
||||||
|
* Is this server side?
|
||||||
|
*/
|
||||||
|
private final boolean serverSide;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Does the encryption method require a
|
||||||
|
* secret in the encryption.key property?
|
||||||
|
*/
|
||||||
|
private final boolean requiresSecret;
|
||||||
|
|
||||||
|
S3AEncryptionMethods(String method,
|
||||||
|
final boolean serverSide,
|
||||||
|
final boolean requiresSecret) {
|
||||||
this.method = method;
|
this.method = method;
|
||||||
this.serverSide = serverSide;
|
this.serverSide = serverSide;
|
||||||
|
this.requiresSecret = requiresSecret;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getMethod() {
|
public String getMethod() {
|
||||||
|
@ -64,6 +78,14 @@ public enum S3AEncryptionMethods {
|
||||||
return serverSide;
|
return serverSide;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Does this encryption algorithm require a secret?
|
||||||
|
* @return true if a secret must be retrieved.
|
||||||
|
*/
|
||||||
|
public boolean requiresSecret() {
|
||||||
|
return requiresSecret;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the encryption mechanism from the value provided.
|
* Get the encryption mechanism from the value provided.
|
||||||
* @param name algorithm name
|
* @param name algorithm name
|
||||||
|
@ -75,7 +97,7 @@ public enum S3AEncryptionMethods {
|
||||||
return NONE;
|
return NONE;
|
||||||
}
|
}
|
||||||
for (S3AEncryptionMethods v : values()) {
|
for (S3AEncryptionMethods v : values()) {
|
||||||
if (v.getMethod().equals(name)) {
|
if (v.getMethod().equalsIgnoreCase(name)) {
|
||||||
return v;
|
return v;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -458,17 +458,15 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
|
||||||
|
|
||||||
// look for encryption data
|
// look for encryption data
|
||||||
// DT Bindings may override this
|
// DT Bindings may override this
|
||||||
setEncryptionSecrets(new EncryptionSecrets(
|
setEncryptionSecrets(
|
||||||
getEncryptionAlgorithm(bucket, conf),
|
buildEncryptionSecrets(bucket, conf));
|
||||||
getS3EncryptionKey(bucket, getConf())));
|
|
||||||
|
|
||||||
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
|
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
|
||||||
instrumentation = new S3AInstrumentation(uri);
|
instrumentation = new S3AInstrumentation(uri);
|
||||||
initializeStatisticsBinding();
|
initializeStatisticsBinding();
|
||||||
// If CSE-KMS method is set then CSE is enabled.
|
// If CSE-KMS method is set then CSE is enabled.
|
||||||
isCSEEnabled = S3AUtils.lookupPassword(conf,
|
isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod()
|
||||||
Constants.S3_ENCRYPTION_ALGORITHM, "")
|
.equals(getS3EncryptionAlgorithm().getMethod());
|
||||||
.equals(S3AEncryptionMethods.CSE_KMS.getMethod());
|
|
||||||
LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled);
|
LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled);
|
||||||
setCSEGauge();
|
setCSEGauge();
|
||||||
// Username is the current user at the time the FS was instantiated.
|
// Username is the current user at the time the FS was instantiated.
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.PathFilter;
|
import org.apache.hadoop.fs.PathFilter;
|
||||||
import org.apache.hadoop.fs.RemoteIterator;
|
import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.util.functional.RemoteIterators;
|
import org.apache.hadoop.util.functional.RemoteIterators;
|
||||||
|
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
||||||
import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
|
import org.apache.hadoop.fs.s3a.auth.IAMInstanceCredentialsProvider;
|
||||||
import org.apache.hadoop.fs.s3a.impl.NetworkBinding;
|
import org.apache.hadoop.fs.s3a.impl.NetworkBinding;
|
||||||
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
|
||||||
|
@ -64,6 +65,7 @@ import java.io.EOFException;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
|
@ -1568,22 +1570,101 @@ public final class S3AUtils {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get any SSE/CSE key from a configuration/credential provider.
|
* Lookup a per-bucket-secret from a configuration including JCEKS files.
|
||||||
* This operation handles the case where the option has been
|
* No attempt is made to look for the global configuration.
|
||||||
* set in the provider or configuration to the option
|
* @param bucket bucket or "" if none known
|
||||||
* {@code OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY}.
|
* @param conf configuration
|
||||||
* IOExceptions raised during retrieval are swallowed.
|
* @param baseKey base key to look up, e.g "fs.s3a.secret.key"
|
||||||
|
* @return the secret or null.
|
||||||
|
* @throws IOException on any IO problem
|
||||||
|
* @throws IllegalArgumentException bad arguments
|
||||||
|
*/
|
||||||
|
private static String lookupBucketSecret(
|
||||||
|
String bucket,
|
||||||
|
Configuration conf,
|
||||||
|
String baseKey)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
Preconditions.checkArgument(!isEmpty(bucket), "null/empty bucket argument");
|
||||||
|
Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
|
||||||
|
"%s does not start with $%s", baseKey, FS_S3A_PREFIX);
|
||||||
|
String subkey = baseKey.substring(FS_S3A_PREFIX.length());
|
||||||
|
|
||||||
|
// set from the long key unless overidden.
|
||||||
|
String longBucketKey = String.format(
|
||||||
|
BUCKET_PATTERN, bucket, baseKey);
|
||||||
|
String initialVal = getPassword(conf, longBucketKey, null, null);
|
||||||
|
// then override from the short one if it is set
|
||||||
|
String shortBucketKey = String.format(
|
||||||
|
BUCKET_PATTERN, bucket, subkey);
|
||||||
|
return getPassword(conf, shortBucketKey, initialVal, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get any S3 encryption key, without propagating exceptions from
|
||||||
|
* JCEKs files.
|
||||||
* @param bucket bucket to query for
|
* @param bucket bucket to query for
|
||||||
* @param conf configuration to examine
|
* @param conf configuration to examine
|
||||||
* @return the encryption key or ""
|
* @return the encryption key or ""
|
||||||
* @throws IllegalArgumentException bad arguments.
|
* @throws IllegalArgumentException bad arguments.
|
||||||
*/
|
*/
|
||||||
public static String getS3EncryptionKey(String bucket,
|
public static String getS3EncryptionKey(
|
||||||
|
String bucket,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
try {
|
try {
|
||||||
return lookupPassword(bucket, conf, Constants.S3_ENCRYPTION_KEY);
|
return getS3EncryptionKey(bucket, conf, false);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Cannot retrieve " + Constants.S3_ENCRYPTION_KEY, e);
|
// never going to happen, but to make sure, covert to
|
||||||
|
// runtime exception
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get any SSE/CSE key from a configuration/credential provider.
|
||||||
|
* This operation handles the case where the option has been
|
||||||
|
* set in the provider or configuration to the option
|
||||||
|
* {@code SERVER_SIDE_ENCRYPTION_KEY}.
|
||||||
|
* IOExceptions raised during retrieval are swallowed.
|
||||||
|
* @param bucket bucket to query for
|
||||||
|
* @param conf configuration to examine
|
||||||
|
* @param propagateExceptions should IO exceptions be rethrown?
|
||||||
|
* @return the encryption key or ""
|
||||||
|
* @throws IllegalArgumentException bad arguments.
|
||||||
|
* @throws IOException if propagateExceptions==true and reading a JCEKS file raised an IOE
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public static String getS3EncryptionKey(
|
||||||
|
String bucket,
|
||||||
|
Configuration conf,
|
||||||
|
boolean propagateExceptions) throws IOException {
|
||||||
|
try {
|
||||||
|
// look up the per-bucket value of the new key,
|
||||||
|
// which implicitly includes the deprecation remapping
|
||||||
|
String key = lookupBucketSecret(bucket, conf, S3_ENCRYPTION_KEY);
|
||||||
|
if (key == null) {
|
||||||
|
// old key in bucket, jceks
|
||||||
|
key = lookupBucketSecret(bucket, conf, SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
|
}
|
||||||
|
if (key == null) {
|
||||||
|
// new key, global; implicit translation of old key in XML files.
|
||||||
|
key = lookupPassword(null, conf, S3_ENCRYPTION_KEY);
|
||||||
|
}
|
||||||
|
if (key == null) {
|
||||||
|
// old key, JCEKS
|
||||||
|
key = lookupPassword(null, conf, SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
|
}
|
||||||
|
if (key == null) {
|
||||||
|
// no key, return ""
|
||||||
|
key = "";
|
||||||
|
}
|
||||||
|
return key;
|
||||||
|
} catch (IOException e) {
|
||||||
|
if (propagateExceptions) {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
LOG.warn("Cannot retrieve {} for bucket {}",
|
||||||
|
S3_ENCRYPTION_KEY, bucket, e);
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1597,14 +1678,50 @@ public final class S3AUtils {
|
||||||
* @param conf configuration to scan
|
* @param conf configuration to scan
|
||||||
* @return the encryption mechanism (which will be {@code NONE} unless
|
* @return the encryption mechanism (which will be {@code NONE} unless
|
||||||
* one is set.
|
* one is set.
|
||||||
* @throws IOException on any validation problem.
|
* @throws IOException on JCKES lookup or invalid method/key configuration.
|
||||||
*/
|
*/
|
||||||
public static S3AEncryptionMethods getEncryptionAlgorithm(String bucket,
|
public static S3AEncryptionMethods getEncryptionAlgorithm(String bucket,
|
||||||
Configuration conf) throws IOException {
|
Configuration conf) throws IOException {
|
||||||
S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(
|
return buildEncryptionSecrets(bucket, conf).getEncryptionMethod();
|
||||||
lookupPassword(bucket, conf,
|
}
|
||||||
Constants.S3_ENCRYPTION_ALGORITHM));
|
|
||||||
String encryptionKey = getS3EncryptionKey(bucket, conf);
|
/**
|
||||||
|
* Get the server-side encryption or client side encryption algorithm.
|
||||||
|
* This includes validation of the configuration, checking the state of
|
||||||
|
* the encryption key given the chosen algorithm.
|
||||||
|
*
|
||||||
|
* @param bucket bucket to query for
|
||||||
|
* @param conf configuration to scan
|
||||||
|
* @return the encryption mechanism (which will be {@code NONE} unless
|
||||||
|
* one is set and secrets.
|
||||||
|
* @throws IOException on JCKES lookup or invalid method/key configuration.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public static EncryptionSecrets buildEncryptionSecrets(String bucket,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
|
||||||
|
// new key, per-bucket
|
||||||
|
// this will include fixup of the old key in config XML entries
|
||||||
|
String algorithm = lookupBucketSecret(bucket, conf, S3_ENCRYPTION_ALGORITHM);
|
||||||
|
if (algorithm == null) {
|
||||||
|
// try the old key, per-bucket setting, which will find JCEKS values
|
||||||
|
algorithm = lookupBucketSecret(bucket, conf, SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
||||||
|
}
|
||||||
|
if (algorithm == null) {
|
||||||
|
// new key, global setting
|
||||||
|
// this will include fixup of the old key in config XML entries
|
||||||
|
algorithm = lookupPassword(null, conf, S3_ENCRYPTION_ALGORITHM);
|
||||||
|
}
|
||||||
|
if (algorithm == null) {
|
||||||
|
// old key, global setting, for JCEKS entries.
|
||||||
|
algorithm = lookupPassword(null, conf, SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
||||||
|
}
|
||||||
|
// now determine the algorithm
|
||||||
|
final S3AEncryptionMethods encryptionMethod = S3AEncryptionMethods.getMethod(algorithm);
|
||||||
|
|
||||||
|
// look up the encryption key
|
||||||
|
String encryptionKey = getS3EncryptionKey(bucket, conf,
|
||||||
|
encryptionMethod.requiresSecret());
|
||||||
int encryptionKeyLen =
|
int encryptionKeyLen =
|
||||||
StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
|
StringUtils.isBlank(encryptionKey) ? 0 : encryptionKey.length();
|
||||||
String diagnostics = passwordDiagnostics(encryptionKey, "key");
|
String diagnostics = passwordDiagnostics(encryptionKey, "key");
|
||||||
|
@ -1638,7 +1755,7 @@ public final class S3AUtils {
|
||||||
LOG.debug("Data is unencrypted");
|
LOG.debug("Data is unencrypted");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return encryptionMethod;
|
return new EncryptionSecrets(encryptionMethod, encryptionKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1437,32 +1437,47 @@ Finally, the public `s3a://landsat-pds/` bucket can be accessed anonymously:
|
||||||
</property>
|
</property>
|
||||||
```
|
```
|
||||||
|
|
||||||
### Customizing S3A secrets held in credential files
|
#### per-bucket configuration and deprecated configuration options
|
||||||
|
|
||||||
|
Per-bucket declaration of the deprecated encryption options
|
||||||
|
will take priority over a global option -even when the
|
||||||
|
global option uses the newer configuration keys.
|
||||||
|
|
||||||
|
This means that when setting encryption options in XML files,
|
||||||
|
the option, `fs.bucket.BUCKET.fs.s3a.server-side-encryption-algorithm`
|
||||||
|
will take priority over the global value of `fs.bucket.s3a.encryption.algorithm`.
|
||||||
|
The same holds for the encryption key option `fs.s3a.encryption.key`
|
||||||
|
and its predecessor `fs.s3a.server-side-encryption.key`.
|
||||||
|
|
||||||
|
|
||||||
Secrets in JCEKS files or provided by other Hadoop credential providers
|
For a site configuration of:
|
||||||
can also be configured on a per bucket basis. The S3A client will
|
|
||||||
look for the per-bucket secrets be
|
|
||||||
|
|
||||||
|
```xml
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.bucket.nightly.server-side-encryption-algorithm</name>
|
||||||
|
<value>SSE-KMS</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.bucket.nightly.server-side-encryption.key</name>
|
||||||
|
<value>arn:aws:kms:eu-west-2:1528130000000:key/753778e4-2d0f-42e6-b894-6a3ae4ea4e5f</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.encryption.algorithm</name>
|
||||||
|
<value>AES256</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>fs.s3a.encryption.key</name>
|
||||||
|
<value>unset</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
Consider a JCEKS file with six keys:
|
|
||||||
|
|
||||||
```
|
|
||||||
fs.s3a.access.key
|
|
||||||
fs.s3a.secret.key
|
|
||||||
fs.s3a.encryption.algorithm
|
|
||||||
fs.s3a.encryption.key
|
|
||||||
fs.s3a.bucket.nightly.access.key
|
|
||||||
fs.s3a.bucket.nightly.secret.key
|
|
||||||
fs.s3a.bucket.nightly.session.token
|
|
||||||
fs.s3a.bucket.nightly.server-side-encryption.key
|
|
||||||
fs.s3a.bucket.nightly.server-side-encryption-algorithm
|
|
||||||
```
|
```
|
||||||
|
|
||||||
When accessing the bucket `s3a://nightly/`, the per-bucket configuration
|
The bucket "nightly" will be encrypted with SSE-KMS using the KMS key
|
||||||
options for that bucket will be used, here the access keys and token,
|
`arn:aws:kms:eu-west-2:1528130000000:key/753778e4-2d0f-42e6-b894-6a3ae4ea4e5f`
|
||||||
and including the encryption algorithm and key.
|
|
||||||
|
|
||||||
|
|
||||||
### <a name="per_bucket_endpoints"></a>Using Per-Bucket Configuration to access data round the world
|
### <a name="per_bucket_endpoints"></a>Using Per-Bucket Configuration to access data round the world
|
||||||
|
|
||||||
|
|
|
@ -32,9 +32,11 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test whether or not encryption works by turning it on. Some checks
|
* Test whether or not encryption works by turning it on. Some checks
|
||||||
|
@ -163,8 +165,9 @@ public abstract class AbstractTestS3AEncryption extends AbstractS3ATestBase {
|
||||||
*/
|
*/
|
||||||
protected void assertEncrypted(Path path) throws IOException {
|
protected void assertEncrypted(Path path) throws IOException {
|
||||||
//S3 will return full arn of the key, so specify global arn in properties
|
//S3 will return full arn of the key, so specify global arn in properties
|
||||||
String kmsKeyArn = this.getConfiguration().
|
String kmsKeyArn =
|
||||||
getTrimmed(S3_ENCRYPTION_KEY);
|
getS3EncryptionKey(getTestBucketName(getConfiguration()),
|
||||||
|
getConfiguration());
|
||||||
S3AEncryptionMethods algorithm = getSSEAlgorithm();
|
S3AEncryptionMethods algorithm = getSSEAlgorithm();
|
||||||
EncryptionTestUtils.assertEncrypted(getFileSystem(),
|
EncryptionTestUtils.assertEncrypted(getFileSystem(),
|
||||||
path,
|
path,
|
||||||
|
|
|
@ -45,8 +45,12 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
|
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_MIN_SIZE;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -193,6 +197,7 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
|
||||||
* Testing how unencrypted and encrypted data behaves when read through
|
* Testing how unencrypted and encrypted data behaves when read through
|
||||||
* CSE enabled and disabled FS respectively.
|
* CSE enabled and disabled FS respectively.
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Test
|
@Test
|
||||||
public void testEncryptionEnabledAndDisabledFS() throws Exception {
|
public void testEncryptionEnabledAndDisabledFS() throws Exception {
|
||||||
maybeSkipTest();
|
maybeSkipTest();
|
||||||
|
@ -203,8 +208,12 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
|
||||||
Path encryptedFilePath = path(getMethodName() + "cse");
|
Path encryptedFilePath = path(getMethodName() + "cse");
|
||||||
|
|
||||||
// Initialize a CSE disabled FS.
|
// Initialize a CSE disabled FS.
|
||||||
cseDisabledConf.unset(S3_ENCRYPTION_ALGORITHM);
|
removeBaseAndBucketOverrides(getTestBucketName(cseDisabledConf),
|
||||||
cseDisabledConf.unset(S3_ENCRYPTION_KEY);
|
cseDisabledConf,
|
||||||
|
S3_ENCRYPTION_ALGORITHM,
|
||||||
|
S3_ENCRYPTION_KEY,
|
||||||
|
SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
||||||
|
SERVER_SIDE_ENCRYPTION_KEY);
|
||||||
cseDisabledFS.initialize(getFileSystem().getUri(),
|
cseDisabledFS.initialize(getFileSystem().getUri(),
|
||||||
cseDisabledConf);
|
cseDisabledConf);
|
||||||
|
|
||||||
|
@ -288,7 +297,7 @@ public abstract class ITestS3AClientSideEncryption extends AbstractS3ATestBase {
|
||||||
/**
|
/**
|
||||||
* Skip tests if certain conditions are met.
|
* Skip tests if certain conditions are met.
|
||||||
*/
|
*/
|
||||||
protected abstract void maybeSkipTest();
|
protected abstract void maybeSkipTest() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Assert that at path references an encrypted blob.
|
* Assert that at path references an encrypted blob.
|
||||||
|
|
|
@ -28,8 +28,10 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
|
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Testing the S3 CSE - KMS method.
|
* Testing the S3 CSE - KMS method.
|
||||||
|
@ -53,7 +55,7 @@ public class ITestS3AClientSideEncryptionKms
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void maybeSkipTest() {
|
protected void maybeSkipTest() throws IOException {
|
||||||
skipIfEncryptionTestsDisabled(getConfiguration());
|
skipIfEncryptionTestsDisabled(getConfiguration());
|
||||||
// skip the test if CSE-KMS or KMS key is not set.
|
// skip the test if CSE-KMS or KMS key is not set.
|
||||||
skipIfEncryptionNotSet(getConfiguration(), S3AEncryptionMethods.CSE_KMS);
|
skipIfEncryptionNotSet(getConfiguration(), S3AEncryptionMethods.CSE_KMS);
|
||||||
|
@ -71,8 +73,8 @@ public class ITestS3AClientSideEncryptionKms
|
||||||
|
|
||||||
// Assert content encryption algo for KMS, is present in the
|
// Assert content encryption algo for KMS, is present in the
|
||||||
// materials description and KMS key ID isn't.
|
// materials description and KMS key ID isn't.
|
||||||
String keyId =
|
String keyId = getS3EncryptionKey(getTestBucketName(getConfiguration()),
|
||||||
getConfiguration().get(Constants.S3_ENCRYPTION_KEY);
|
getConfiguration());
|
||||||
Assertions.assertThat(processHeader(fsXAttrs,
|
Assertions.assertThat(processHeader(fsXAttrs,
|
||||||
xAttrPrefix + Headers.MATERIALS_DESCRIPTION))
|
xAttrPrefix + Headers.MATERIALS_DESCRIPTION))
|
||||||
.describedAs("Materials Description should contain the content "
|
.describedAs("Materials Description should contain the content "
|
||||||
|
|
|
@ -42,7 +42,6 @@ import java.io.IOException;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.Collection;
|
|
||||||
|
|
||||||
import org.apache.hadoop.security.ProviderUtils;
|
import org.apache.hadoop.security.ProviderUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -511,81 +510,6 @@ public class ITestS3AConfiguration {
|
||||||
return fieldType.cast(obj);
|
return fieldType.cast(obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBucketConfigurationPropagation() throws Throwable {
|
|
||||||
Configuration config = new Configuration(false);
|
|
||||||
setBucketOption(config, "b", "base", "1024");
|
|
||||||
String basekey = "fs.s3a.base";
|
|
||||||
assertOptionEquals(config, basekey, null);
|
|
||||||
String bucketKey = "fs.s3a.bucket.b.base";
|
|
||||||
assertOptionEquals(config, bucketKey, "1024");
|
|
||||||
Configuration updated = propagateBucketOptions(config, "b");
|
|
||||||
assertOptionEquals(updated, basekey, "1024");
|
|
||||||
// original conf is not updated
|
|
||||||
assertOptionEquals(config, basekey, null);
|
|
||||||
|
|
||||||
String[] sources = updated.getPropertySources(basekey);
|
|
||||||
assertEquals(1, sources.length);
|
|
||||||
String sourceInfo = sources[0];
|
|
||||||
assertTrue("Wrong source " + sourceInfo, sourceInfo.contains(bucketKey));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBucketConfigurationPropagationResolution() throws Throwable {
|
|
||||||
Configuration config = new Configuration(false);
|
|
||||||
String basekey = "fs.s3a.base";
|
|
||||||
String baseref = "fs.s3a.baseref";
|
|
||||||
String baseref2 = "fs.s3a.baseref2";
|
|
||||||
config.set(basekey, "orig");
|
|
||||||
config.set(baseref2, "${fs.s3a.base}");
|
|
||||||
setBucketOption(config, "b", basekey, "1024");
|
|
||||||
setBucketOption(config, "b", baseref, "${fs.s3a.base}");
|
|
||||||
Configuration updated = propagateBucketOptions(config, "b");
|
|
||||||
assertOptionEquals(updated, basekey, "1024");
|
|
||||||
assertOptionEquals(updated, baseref, "1024");
|
|
||||||
assertOptionEquals(updated, baseref2, "1024");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testMultipleBucketConfigurations() throws Throwable {
|
|
||||||
Configuration config = new Configuration(false);
|
|
||||||
setBucketOption(config, "b", USER_AGENT_PREFIX, "UA-b");
|
|
||||||
setBucketOption(config, "c", USER_AGENT_PREFIX, "UA-c");
|
|
||||||
config.set(USER_AGENT_PREFIX, "UA-orig");
|
|
||||||
Configuration updated = propagateBucketOptions(config, "c");
|
|
||||||
assertOptionEquals(updated, USER_AGENT_PREFIX, "UA-c");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testClearBucketOption() throws Throwable {
|
|
||||||
Configuration config = new Configuration();
|
|
||||||
config.set(USER_AGENT_PREFIX, "base");
|
|
||||||
setBucketOption(config, "bucket", USER_AGENT_PREFIX, "overridden");
|
|
||||||
clearBucketOption(config, "bucket", USER_AGENT_PREFIX);
|
|
||||||
Configuration updated = propagateBucketOptions(config, "c");
|
|
||||||
assertOptionEquals(updated, USER_AGENT_PREFIX, "base");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testBucketConfigurationSkipsUnmodifiable() throws Throwable {
|
|
||||||
Configuration config = new Configuration(false);
|
|
||||||
String impl = "fs.s3a.impl";
|
|
||||||
config.set(impl, "orig");
|
|
||||||
setBucketOption(config, "b", impl, "b");
|
|
||||||
String metastoreImpl = "fs.s3a.metadatastore.impl";
|
|
||||||
String ddb = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore";
|
|
||||||
setBucketOption(config, "b", metastoreImpl, ddb);
|
|
||||||
setBucketOption(config, "b", "impl2", "b2");
|
|
||||||
setBucketOption(config, "b", "bucket.b.loop", "b3");
|
|
||||||
assertOptionEquals(config, "fs.s3a.bucket.b.impl", "b");
|
|
||||||
|
|
||||||
Configuration updated = propagateBucketOptions(config, "b");
|
|
||||||
assertOptionEquals(updated, impl, "orig");
|
|
||||||
assertOptionEquals(updated, "fs.s3a.impl2", "b2");
|
|
||||||
assertOptionEquals(updated, metastoreImpl, ddb);
|
|
||||||
assertOptionEquals(updated, "fs.s3a.bucket.b.loop", null);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConfOptionPropagationToFS() throws Exception {
|
public void testConfOptionPropagationToFS() throws Exception {
|
||||||
Configuration config = new Configuration();
|
Configuration config = new Configuration();
|
||||||
|
@ -597,53 +521,6 @@ public class ITestS3AConfiguration {
|
||||||
assertOptionEquals(updated, "fs.s3a.propagation", "propagated");
|
assertOptionEquals(updated, "fs.s3a.propagation", "propagated");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSecurityCredentialPropagationNoOverride() throws Exception {
|
|
||||||
Configuration config = new Configuration();
|
|
||||||
config.set(CREDENTIAL_PROVIDER_PATH, "base");
|
|
||||||
patchSecurityCredentialProviders(config);
|
|
||||||
assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
|
|
||||||
"base");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSecurityCredentialPropagationOverrideNoBase()
|
|
||||||
throws Exception {
|
|
||||||
Configuration config = new Configuration();
|
|
||||||
config.unset(CREDENTIAL_PROVIDER_PATH);
|
|
||||||
config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
|
|
||||||
patchSecurityCredentialProviders(config);
|
|
||||||
assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
|
|
||||||
"override");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSecurityCredentialPropagationOverride() throws Exception {
|
|
||||||
Configuration config = new Configuration();
|
|
||||||
config.set(CREDENTIAL_PROVIDER_PATH, "base");
|
|
||||||
config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
|
|
||||||
patchSecurityCredentialProviders(config);
|
|
||||||
assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
|
|
||||||
"override,base");
|
|
||||||
Collection<String> all = config.getStringCollection(
|
|
||||||
CREDENTIAL_PROVIDER_PATH);
|
|
||||||
assertTrue(all.contains("override"));
|
|
||||||
assertTrue(all.contains("base"));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testSecurityCredentialPropagationEndToEnd() throws Exception {
|
|
||||||
Configuration config = new Configuration();
|
|
||||||
config.set(CREDENTIAL_PROVIDER_PATH, "base");
|
|
||||||
setBucketOption(config, "b", S3A_SECURITY_CREDENTIAL_PROVIDER_PATH,
|
|
||||||
"override");
|
|
||||||
Configuration updated = propagateBucketOptions(config, "b");
|
|
||||||
|
|
||||||
patchSecurityCredentialProviders(updated);
|
|
||||||
assertOptionEquals(updated, CREDENTIAL_PROVIDER_PATH,
|
|
||||||
"override,base");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test(timeout = 10_000L)
|
@Test(timeout = 10_000L)
|
||||||
public void testS3SpecificSignerOverride() throws IOException {
|
public void testS3SpecificSignerOverride() throws IOException {
|
||||||
ClientConfiguration clientConfiguration = null;
|
ClientConfiguration clientConfiguration = null;
|
||||||
|
|
|
@ -18,11 +18,13 @@
|
||||||
|
|
||||||
package org.apache.hadoop.fs.s3a;
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
|
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Concrete class that extends {@link AbstractTestS3AEncryption}
|
* Concrete class that extends {@link AbstractTestS3AEncryption}
|
||||||
|
@ -36,9 +38,12 @@ public class ITestS3AEncryptionSSEKMSUserDefinedKey
|
||||||
protected Configuration createConfiguration() {
|
protected Configuration createConfiguration() {
|
||||||
// get the KMS key for this test.
|
// get the KMS key for this test.
|
||||||
Configuration c = new Configuration();
|
Configuration c = new Configuration();
|
||||||
String kmsKey = c.get(S3_ENCRYPTION_KEY);
|
String kmsKey = S3AUtils.getS3EncryptionKey(getTestBucketName(c), c);
|
||||||
// skip the test if SSE-KMS or KMS key not set.
|
// skip the test if SSE-KMS or KMS key not set.
|
||||||
skipIfEncryptionNotSet(c, getSSEAlgorithm());
|
if (StringUtils.isBlank(kmsKey)) {
|
||||||
|
skip(S3_ENCRYPTION_KEY + " is not set for " +
|
||||||
|
SSE_KMS.getMethod());
|
||||||
|
}
|
||||||
Configuration conf = super.createConfiguration();
|
Configuration conf = super.createConfiguration();
|
||||||
conf.set(S3_ENCRYPTION_KEY, kmsKey);
|
conf.set(S3_ENCRYPTION_KEY, kmsKey);
|
||||||
return conf;
|
return conf;
|
||||||
|
|
|
@ -34,11 +34,13 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
|
||||||
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||||
import static org.apache.hadoop.fs.s3a.EncryptionTestUtils.AWS_KMS_SSE_ALGORITHM;
|
import static org.apache.hadoop.fs.s3a.EncryptionTestUtils.AWS_KMS_SSE_ALGORITHM;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
|
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Concrete class that extends {@link AbstractTestS3AEncryption}
|
* Concrete class that extends {@link AbstractTestS3AEncryption}
|
||||||
|
@ -60,11 +62,13 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
|
||||||
skipIfEncryptionNotSet(c, getSSEAlgorithm());
|
skipIfEncryptionNotSet(c, getSSEAlgorithm());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
@Override
|
@Override
|
||||||
protected void patchConfigurationEncryptionSettings(
|
protected void patchConfigurationEncryptionSettings(
|
||||||
final Configuration conf) {
|
final Configuration conf) {
|
||||||
removeBaseAndBucketOverrides(conf,
|
removeBaseAndBucketOverrides(conf,
|
||||||
S3_ENCRYPTION_ALGORITHM);
|
S3_ENCRYPTION_ALGORITHM,
|
||||||
|
SERVER_SIDE_ENCRYPTION_ALGORITHM);
|
||||||
conf.set(S3_ENCRYPTION_ALGORITHM,
|
conf.set(S3_ENCRYPTION_ALGORITHM,
|
||||||
getSSEAlgorithm().getMethod());
|
getSSEAlgorithm().getMethod());
|
||||||
}
|
}
|
||||||
|
@ -89,7 +93,7 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
|
||||||
protected void assertEncrypted(Path path) throws IOException {
|
protected void assertEncrypted(Path path) throws IOException {
|
||||||
S3AFileSystem fs = getFileSystem();
|
S3AFileSystem fs = getFileSystem();
|
||||||
Configuration c = fs.getConf();
|
Configuration c = fs.getConf();
|
||||||
String kmsKey = c.getTrimmed(S3_ENCRYPTION_KEY);
|
String kmsKey = getS3EncryptionKey(getTestBucketName(c), c);
|
||||||
EncryptionTestUtils.assertEncrypted(fs, path, SSE_KMS, kmsKey);
|
EncryptionTestUtils.assertEncrypted(fs, path, SSE_KMS, kmsKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,7 +149,7 @@ public class ITestS3AEncryptionWithDefaultS3Settings extends
|
||||||
ContractTestUtils.rename(kmsFS, src, targetDir);
|
ContractTestUtils.rename(kmsFS, src, targetDir);
|
||||||
Path renamedFile = new Path(targetDir, src.getName());
|
Path renamedFile = new Path(targetDir, src.getName());
|
||||||
ContractTestUtils.verifyFileContents(fs, renamedFile, data);
|
ContractTestUtils.verifyFileContents(fs, renamedFile, data);
|
||||||
String kmsKey = fs2Conf.getTrimmed(S3_ENCRYPTION_KEY);
|
String kmsKey = getS3EncryptionKey(getTestBucketName(fs2Conf), fs2Conf);
|
||||||
// we assert that the renamed file has picked up the KMS key of our FS
|
// we assert that the renamed file has picked up the KMS key of our FS
|
||||||
EncryptionTestUtils.assertEncrypted(fs, renamedFile, SSE_KMS, kmsKey);
|
EncryptionTestUtils.assertEncrypted(fs, renamedFile, SSE_KMS, kmsKey);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.RemoteIterator;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
|
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding;
|
||||||
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
|
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
|
||||||
|
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
||||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
|
||||||
|
@ -60,6 +61,7 @@ import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
import org.apache.hadoop.util.functional.CallableRaisingIOE;
|
||||||
|
|
||||||
import com.amazonaws.auth.AWSCredentialsProvider;
|
import com.amazonaws.auth.AWSCredentialsProvider;
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
import org.hamcrest.core.Is;
|
import org.hamcrest.core.Is;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
|
@ -90,6 +92,8 @@ import static org.apache.hadoop.fs.impl.FutureIOSupport.awaitFuture;
|
||||||
import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
|
import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
|
import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.buildEncryptionSecrets;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
|
||||||
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
|
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
|
||||||
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
@ -245,9 +249,10 @@ public final class S3ATestUtils {
|
||||||
*
|
*
|
||||||
* @param conf Test Configuration.
|
* @param conf Test Configuration.
|
||||||
*/
|
*/
|
||||||
private static void skipIfS3GuardAndS3CSEEnabled(Configuration conf) {
|
private static void skipIfS3GuardAndS3CSEEnabled(Configuration conf)
|
||||||
String encryptionMethod =
|
throws IOException {
|
||||||
conf.getTrimmed(Constants.S3_ENCRYPTION_ALGORITHM, "");
|
String encryptionMethod = getEncryptionAlgorithm(getTestBucketName(conf),
|
||||||
|
conf).getMethod();
|
||||||
String metaStore = conf.getTrimmed(S3_METADATA_STORE_IMPL, "");
|
String metaStore = conf.getTrimmed(S3_METADATA_STORE_IMPL, "");
|
||||||
if (encryptionMethod.equals(S3AEncryptionMethods.CSE_KMS.getMethod()) &&
|
if (encryptionMethod.equals(S3AEncryptionMethods.CSE_KMS.getMethod()) &&
|
||||||
!metaStore.equals(S3GUARD_METASTORE_NULL)) {
|
!metaStore.equals(S3GUARD_METASTORE_NULL)) {
|
||||||
|
@ -1239,7 +1244,13 @@ public final class S3ATestUtils {
|
||||||
public static void assertOptionEquals(Configuration conf,
|
public static void assertOptionEquals(Configuration conf,
|
||||||
String key,
|
String key,
|
||||||
String expected) {
|
String expected) {
|
||||||
assertEquals("Value of " + key, expected, conf.get(key));
|
String actual = conf.get(key);
|
||||||
|
String origin = actual == null
|
||||||
|
? "(none)"
|
||||||
|
: "[" + StringUtils.join(conf.getPropertySources(key), ", ") + "]";
|
||||||
|
Assertions.assertThat(actual)
|
||||||
|
.describedAs("Value of %s with origin %s", key, origin)
|
||||||
|
.isEqualTo(expected);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1539,15 +1550,17 @@ public final class S3ATestUtils {
|
||||||
* @param configuration configuration to probe.
|
* @param configuration configuration to probe.
|
||||||
*/
|
*/
|
||||||
public static void skipIfEncryptionNotSet(Configuration configuration,
|
public static void skipIfEncryptionNotSet(Configuration configuration,
|
||||||
S3AEncryptionMethods s3AEncryptionMethod) {
|
S3AEncryptionMethods s3AEncryptionMethod) throws IOException {
|
||||||
// if S3 encryption algorithm is not set to desired method or AWS encryption
|
// if S3 encryption algorithm is not set to desired method or AWS encryption
|
||||||
// key is not set, then skip.
|
// key is not set, then skip.
|
||||||
if (!configuration.getTrimmed(S3_ENCRYPTION_ALGORITHM, "")
|
String bucket = getTestBucketName(configuration);
|
||||||
.equals(s3AEncryptionMethod.getMethod())
|
final EncryptionSecrets secrets = buildEncryptionSecrets(bucket, configuration);
|
||||||
|| configuration.get(Constants.S3_ENCRYPTION_KEY) == null) {
|
if (!s3AEncryptionMethod.getMethod().equals(secrets.getEncryptionMethod().getMethod())
|
||||||
|
|| StringUtils.isBlank(secrets.getEncryptionKey())) {
|
||||||
skip(S3_ENCRYPTION_KEY + " is not set for " + s3AEncryptionMethod
|
skip(S3_ENCRYPTION_KEY + " is not set for " + s3AEncryptionMethod
|
||||||
.getMethod() + " or " + S3_ENCRYPTION_ALGORITHM + " is not set to "
|
.getMethod() + " or " + S3_ENCRYPTION_ALGORITHM + " is not set to "
|
||||||
+ s3AEncryptionMethod.getMethod());
|
+ s3AEncryptionMethod.getMethod()
|
||||||
|
+ " in " + secrets);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,261 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.s3a;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.assertj.core.api.Assertions;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
|
||||||
|
import org.apache.hadoop.security.ProviderUtils;
|
||||||
|
import org.apache.hadoop.security.alias.CredentialProvider;
|
||||||
|
import org.apache.hadoop.security.alias.CredentialProviderFactory;
|
||||||
|
import org.apache.hadoop.test.AbstractHadoopTestBase;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.FS_S3A_BUCKET_PREFIX;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.S3A_SECURITY_CREDENTIAL_PROVIDER_PATH;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
|
||||||
|
import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertOptionEquals;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.CREDENTIAL_PROVIDER_PATH;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.clearBucketOption;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getEncryptionAlgorithm;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.patchSecurityCredentialProviders;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.propagateBucketOptions;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.setBucketOption;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* S3A tests for configuration option propagation.
|
||||||
|
*/
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public class TestBucketConfiguration extends AbstractHadoopTestBase {
|
||||||
|
|
||||||
|
private static final String NEW_ALGORITHM_KEY_GLOBAL = "CSE-KMS";
|
||||||
|
private static final String OLD_ALGORITHM_KEY_BUCKET = "SSE-KMS";
|
||||||
|
@Rule
|
||||||
|
public final TemporaryFolder tempDir = new TemporaryFolder();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup: create the contract then init it.
|
||||||
|
* @throws Exception on any failure
|
||||||
|
*/
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
// forces in deprecation wireup, even when this test method is running isolated
|
||||||
|
S3AFileSystem.initializeClass();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBucketConfigurationPropagation() throws Throwable {
|
||||||
|
Configuration config = new Configuration(false);
|
||||||
|
setBucketOption(config, "b", "base", "1024");
|
||||||
|
String basekey = "fs.s3a.base";
|
||||||
|
assertOptionEquals(config, basekey, null);
|
||||||
|
String bucketKey = "fs.s3a.bucket.b.base";
|
||||||
|
assertOptionEquals(config, bucketKey, "1024");
|
||||||
|
Configuration updated = propagateBucketOptions(config, "b");
|
||||||
|
assertOptionEquals(updated, basekey, "1024");
|
||||||
|
// original conf is not updated
|
||||||
|
assertOptionEquals(config, basekey, null);
|
||||||
|
|
||||||
|
String[] sources = updated.getPropertySources(basekey);
|
||||||
|
assertEquals(1, sources.length);
|
||||||
|
Assertions.assertThat(sources)
|
||||||
|
.describedAs("base key property sources")
|
||||||
|
.hasSize(1);
|
||||||
|
Assertions.assertThat(sources[0])
|
||||||
|
.describedAs("Property source")
|
||||||
|
.contains(bucketKey);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBucketConfigurationPropagationResolution() throws Throwable {
|
||||||
|
Configuration config = new Configuration(false);
|
||||||
|
String basekey = "fs.s3a.base";
|
||||||
|
String baseref = "fs.s3a.baseref";
|
||||||
|
String baseref2 = "fs.s3a.baseref2";
|
||||||
|
config.set(basekey, "orig");
|
||||||
|
config.set(baseref2, "${fs.s3a.base}");
|
||||||
|
setBucketOption(config, "b", basekey, "1024");
|
||||||
|
setBucketOption(config, "b", baseref, "${fs.s3a.base}");
|
||||||
|
Configuration updated = propagateBucketOptions(config, "b");
|
||||||
|
assertOptionEquals(updated, basekey, "1024");
|
||||||
|
assertOptionEquals(updated, baseref, "1024");
|
||||||
|
assertOptionEquals(updated, baseref2, "1024");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultipleBucketConfigurations() throws Throwable {
|
||||||
|
Configuration config = new Configuration(false);
|
||||||
|
setBucketOption(config, "b", USER_AGENT_PREFIX, "UA-b");
|
||||||
|
setBucketOption(config, "c", USER_AGENT_PREFIX, "UA-c");
|
||||||
|
config.set(USER_AGENT_PREFIX, "UA-orig");
|
||||||
|
Configuration updated = propagateBucketOptions(config, "c");
|
||||||
|
assertOptionEquals(updated, USER_AGENT_PREFIX, "UA-c");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testClearBucketOption() throws Throwable {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.set(USER_AGENT_PREFIX, "base");
|
||||||
|
setBucketOption(config, "bucket", USER_AGENT_PREFIX, "overridden");
|
||||||
|
clearBucketOption(config, "bucket", USER_AGENT_PREFIX);
|
||||||
|
Configuration updated = propagateBucketOptions(config, "c");
|
||||||
|
assertOptionEquals(updated, USER_AGENT_PREFIX, "base");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBucketConfigurationSkipsUnmodifiable() throws Throwable {
|
||||||
|
Configuration config = new Configuration(false);
|
||||||
|
String impl = "fs.s3a.impl";
|
||||||
|
config.set(impl, "orig");
|
||||||
|
setBucketOption(config, "b", impl, "b");
|
||||||
|
String metastoreImpl = "fs.s3a.metadatastore.impl";
|
||||||
|
String ddb = "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore";
|
||||||
|
setBucketOption(config, "b", metastoreImpl, ddb);
|
||||||
|
setBucketOption(config, "b", "impl2", "b2");
|
||||||
|
setBucketOption(config, "b", "bucket.b.loop", "b3");
|
||||||
|
assertOptionEquals(config, "fs.s3a.bucket.b.impl", "b");
|
||||||
|
|
||||||
|
Configuration updated = propagateBucketOptions(config, "b");
|
||||||
|
assertOptionEquals(updated, impl, "orig");
|
||||||
|
assertOptionEquals(updated, "fs.s3a.impl2", "b2");
|
||||||
|
assertOptionEquals(updated, metastoreImpl, ddb);
|
||||||
|
assertOptionEquals(updated, "fs.s3a.bucket.b.loop", null);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSecurityCredentialPropagationNoOverride() throws Exception {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.set(CREDENTIAL_PROVIDER_PATH, "base");
|
||||||
|
patchSecurityCredentialProviders(config);
|
||||||
|
assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
|
||||||
|
"base");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSecurityCredentialPropagationOverrideNoBase()
|
||||||
|
throws Exception {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.unset(CREDENTIAL_PROVIDER_PATH);
|
||||||
|
config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
|
||||||
|
patchSecurityCredentialProviders(config);
|
||||||
|
assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
|
||||||
|
"override");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSecurityCredentialPropagationOverride() throws Exception {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.set(CREDENTIAL_PROVIDER_PATH, "base");
|
||||||
|
config.set(S3A_SECURITY_CREDENTIAL_PROVIDER_PATH, "override");
|
||||||
|
patchSecurityCredentialProviders(config);
|
||||||
|
assertOptionEquals(config, CREDENTIAL_PROVIDER_PATH,
|
||||||
|
"override,base");
|
||||||
|
Collection<String> all = config.getStringCollection(
|
||||||
|
CREDENTIAL_PROVIDER_PATH);
|
||||||
|
assertTrue(all.contains("override"));
|
||||||
|
assertTrue(all.contains("base"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSecurityCredentialPropagationEndToEnd() throws Exception {
|
||||||
|
Configuration config = new Configuration();
|
||||||
|
config.set(CREDENTIAL_PROVIDER_PATH, "base");
|
||||||
|
setBucketOption(config, "b", S3A_SECURITY_CREDENTIAL_PROVIDER_PATH,
|
||||||
|
"override");
|
||||||
|
Configuration updated = propagateBucketOptions(config, "b");
|
||||||
|
|
||||||
|
patchSecurityCredentialProviders(updated);
|
||||||
|
assertOptionEquals(updated, CREDENTIAL_PROVIDER_PATH,
|
||||||
|
"override,base");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test shows that a per-bucket value of the older key takes priority
|
||||||
|
* over a global value of a new key in XML configuration file.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testBucketConfigurationDeprecatedEncryptionAlgorithm()
|
||||||
|
throws Throwable {
|
||||||
|
Configuration config = new Configuration(false);
|
||||||
|
config.set(S3_ENCRYPTION_ALGORITHM, NEW_ALGORITHM_KEY_GLOBAL);
|
||||||
|
setBucketOption(config, "b", SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
||||||
|
OLD_ALGORITHM_KEY_BUCKET);
|
||||||
|
Configuration updated = propagateBucketOptions(config, "b");
|
||||||
|
|
||||||
|
// Get the encryption method and verify that the value is per-bucket of
|
||||||
|
// old keys.
|
||||||
|
String value = getEncryptionAlgorithm("b", updated).getMethod();
|
||||||
|
Assertions.assertThat(value)
|
||||||
|
.describedAs("lookupPassword(%s)", S3_ENCRYPTION_ALGORITHM)
|
||||||
|
.isEqualTo(OLD_ALGORITHM_KEY_BUCKET);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJceksDeprecatedEncryptionAlgorithm() throws Exception {
|
||||||
|
// set up conf to have a cred provider
|
||||||
|
final Configuration conf = new Configuration(false);
|
||||||
|
final File file = tempDir.newFile("test.jks");
|
||||||
|
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
|
||||||
|
file.toURI());
|
||||||
|
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
|
||||||
|
jks.toString());
|
||||||
|
|
||||||
|
// add our creds to the provider
|
||||||
|
final CredentialProvider provider =
|
||||||
|
CredentialProviderFactory.getProviders(conf).get(0);
|
||||||
|
provider.createCredentialEntry(S3_ENCRYPTION_ALGORITHM,
|
||||||
|
NEW_ALGORITHM_KEY_GLOBAL.toCharArray());
|
||||||
|
provider.createCredentialEntry(S3_ENCRYPTION_KEY,
|
||||||
|
"global s3 encryption key".toCharArray());
|
||||||
|
provider.createCredentialEntry(
|
||||||
|
FS_S3A_BUCKET_PREFIX + "b." + SERVER_SIDE_ENCRYPTION_ALGORITHM,
|
||||||
|
OLD_ALGORITHM_KEY_BUCKET.toCharArray());
|
||||||
|
final String bucketKey = "bucket-server-side-encryption-key";
|
||||||
|
provider.createCredentialEntry(
|
||||||
|
FS_S3A_BUCKET_PREFIX + "b." + SERVER_SIDE_ENCRYPTION_KEY,
|
||||||
|
bucketKey.toCharArray());
|
||||||
|
provider.flush();
|
||||||
|
|
||||||
|
// Get the encryption method and verify that the value is per-bucket of
|
||||||
|
// old keys.
|
||||||
|
final EncryptionSecrets secrets = S3AUtils.buildEncryptionSecrets("b", conf);
|
||||||
|
Assertions.assertThat(secrets.getEncryptionMethod().getMethod())
|
||||||
|
.describedAs("buildEncryptionSecrets() encryption algorithm resolved to %s", secrets)
|
||||||
|
.isEqualTo(OLD_ALGORITHM_KEY_BUCKET);
|
||||||
|
|
||||||
|
Assertions.assertThat(secrets.getEncryptionKey())
|
||||||
|
.describedAs("buildEncryptionSecrets() encryption key resolved to %s", secrets)
|
||||||
|
.isEqualTo(bucketKey);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -66,8 +66,10 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeSessionTestsEnabled;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.unsetHadoopCredentialProviders;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.unsetHadoopCredentialProviders;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
|
||||||
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.*;
|
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.*;
|
||||||
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenIOException.TOKEN_MISMATCH;
|
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenIOException.TOKEN_MISMATCH;
|
||||||
import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.ALICE;
|
import static org.apache.hadoop.fs.s3a.auth.delegation.MiniKerberizedHadoopCluster.ALICE;
|
||||||
|
@ -146,7 +148,7 @@ public class ITestSessionDelegationInFileystem extends AbstractDelegationIT {
|
||||||
String s3EncryptionMethod =
|
String s3EncryptionMethod =
|
||||||
conf.getTrimmed(Constants.S3_ENCRYPTION_ALGORITHM,
|
conf.getTrimmed(Constants.S3_ENCRYPTION_ALGORITHM,
|
||||||
S3AEncryptionMethods.SSE_KMS.getMethod());
|
S3AEncryptionMethods.SSE_KMS.getMethod());
|
||||||
String s3EncryptionKey = conf.getTrimmed(Constants.S3_ENCRYPTION_KEY, "");
|
String s3EncryptionKey = getS3EncryptionKey(getTestBucketName(conf), conf);
|
||||||
removeBaseAndBucketOverrides(conf,
|
removeBaseAndBucketOverrides(conf,
|
||||||
DELEGATION_TOKEN_BINDING,
|
DELEGATION_TOKEN_BINDING,
|
||||||
Constants.S3_ENCRYPTION_ALGORITHM,
|
Constants.S3_ENCRYPTION_ALGORITHM,
|
||||||
|
|
|
@ -20,15 +20,17 @@ package org.apache.hadoop.fs.s3a.scale;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.lang.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.s3a.Constants;
|
import org.apache.hadoop.fs.s3a.Constants;
|
||||||
import org.apache.hadoop.fs.s3a.EncryptionTestUtils;
|
import org.apache.hadoop.fs.s3a.EncryptionTestUtils;
|
||||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
|
|
||||||
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
|
import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_KMS;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
|
||||||
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
|
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionNotSet;
|
||||||
|
import static org.apache.hadoop.fs.s3a.S3AUtils.getS3EncryptionKey;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class to test SSE_KMS encryption settings for huge files.
|
* Class to test SSE_KMS encryption settings for huge files.
|
||||||
|
@ -58,13 +60,13 @@ public class ITestS3AHugeFilesEncryption extends AbstractSTestS3AHugeFiles {
|
||||||
@Override
|
@Override
|
||||||
protected boolean isEncrypted(S3AFileSystem fileSystem) {
|
protected boolean isEncrypted(S3AFileSystem fileSystem) {
|
||||||
Configuration c = new Configuration();
|
Configuration c = new Configuration();
|
||||||
return c.get(S3_ENCRYPTION_KEY) != null;
|
return StringUtils.isNotBlank(getS3EncryptionKey(getTestBucketName(c), c));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void assertEncrypted(Path hugeFile) throws IOException {
|
protected void assertEncrypted(Path hugeFile) throws IOException {
|
||||||
Configuration c = new Configuration();
|
Configuration c = new Configuration();
|
||||||
String kmsKey = c.get(S3_ENCRYPTION_KEY);
|
String kmsKey = getS3EncryptionKey(getTestBucketName(c), c);
|
||||||
EncryptionTestUtils.assertEncrypted(getFileSystem(), hugeFile,
|
EncryptionTestUtils.assertEncrypted(getFileSystem(), hugeFile,
|
||||||
SSE_KMS, kmsKey);
|
SSE_KMS, kmsKey);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue