HADOOP-14507. Extend per-bucket secret key config with explicit getPassword() on fs.s3a.$bucket.secret.key.
Contributed by Steve Loughran.
This commit is contained in:
parent
82f029f7b5
commit
7ac88244c5
|
@ -79,7 +79,6 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.util.concurrent.ListeningExecutorService;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -122,6 +121,8 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
|
|||
import static org.apache.hadoop.fs.s3a.Invoker.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
|
||||
import static org.apache.hadoop.fs.s3a.Statistic.*;
|
||||
import static org.apache.commons.lang.StringUtils.isNotBlank;
|
||||
import static org.apache.commons.lang.StringUtils.isNotEmpty;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -300,7 +301,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
|
||||
verifyBucketExists();
|
||||
|
||||
serverSideEncryptionAlgorithm = getEncryptionAlgorithm(conf);
|
||||
serverSideEncryptionAlgorithm = getEncryptionAlgorithm(bucket, conf);
|
||||
inputPolicy = S3AInputPolicy.getPolicy(
|
||||
conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
|
||||
LOG.debug("Input fadvise policy = {}", inputPolicy);
|
||||
|
@ -700,7 +701,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
bucket,
|
||||
pathToKey(f),
|
||||
serverSideEncryptionAlgorithm,
|
||||
getServerSideEncryptionKey(getConf())),
|
||||
getServerSideEncryptionKey(bucket, getConf())),
|
||||
fileStatus.getLen(),
|
||||
s3,
|
||||
statistics,
|
||||
|
@ -1217,7 +1218,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
new GetObjectMetadataRequest(bucket, key);
|
||||
//SSE-C requires to be filled in if enabled for object metadata
|
||||
if(S3AEncryptionMethods.SSE_C.equals(serverSideEncryptionAlgorithm) &&
|
||||
StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))){
|
||||
isNotBlank(getServerSideEncryptionKey(bucket, getConf()))){
|
||||
request.setSSECustomerKey(generateSSECustomerKey());
|
||||
}
|
||||
ObjectMetadata meta = invoker.retryUntranslated("GET " + key, true,
|
||||
|
@ -1440,7 +1441,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
ObjectMetadata metadata,
|
||||
InputStream inputStream) {
|
||||
Preconditions.checkNotNull(inputStream);
|
||||
Preconditions.checkArgument(StringUtils.isNotEmpty(key), "Null/empty key");
|
||||
Preconditions.checkArgument(isNotEmpty(key), "Null/empty key");
|
||||
PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
|
||||
inputStream, metadata);
|
||||
setOptionalPutRequestParameters(putObjectRequest);
|
||||
|
@ -2545,7 +2546,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
req.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
|
||||
break;
|
||||
case SSE_C:
|
||||
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
|
||||
if (isNotBlank(getServerSideEncryptionKey(bucket, getConf()))) {
|
||||
//at the moment, only supports copy using the same key
|
||||
req.setSSECustomerKey(generateSSECustomerKey());
|
||||
}
|
||||
|
@ -2579,7 +2580,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
);
|
||||
break;
|
||||
case SSE_C:
|
||||
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
|
||||
if (isNotBlank(getServerSideEncryptionKey(bucket, getConf()))) {
|
||||
//at the moment, only supports copy using the same key
|
||||
SSECustomerKey customerKey = generateSSECustomerKey();
|
||||
copyObjectRequest.setSourceSSECustomerKey(customerKey);
|
||||
|
@ -2596,7 +2597,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
request.setSSEAwsKeyManagementParams(generateSSEAwsKeyParams());
|
||||
break;
|
||||
case SSE_C:
|
||||
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
|
||||
if (isNotBlank(getServerSideEncryptionKey(bucket, getConf()))) {
|
||||
request.setSSECustomerKey(generateSSECustomerKey());
|
||||
}
|
||||
break;
|
||||
|
@ -2610,23 +2611,32 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the AWS SDK structure used to configure SSE, based on the
|
||||
* configuration.
|
||||
* @return an instance of the class, which main contain the encryption key
|
||||
*/
|
||||
@Retries.OnceExceptionsSwallowed
|
||||
private SSEAwsKeyManagementParams generateSSEAwsKeyParams() {
|
||||
//Use specified key, otherwise default to default master aws/s3 key by AWS
|
||||
SSEAwsKeyManagementParams sseAwsKeyManagementParams =
|
||||
new SSEAwsKeyManagementParams();
|
||||
if (StringUtils.isNotBlank(getServerSideEncryptionKey(getConf()))) {
|
||||
sseAwsKeyManagementParams =
|
||||
new SSEAwsKeyManagementParams(
|
||||
getServerSideEncryptionKey(getConf())
|
||||
);
|
||||
String encryptionKey = getServerSideEncryptionKey(bucket, getConf());
|
||||
if (isNotBlank(encryptionKey)) {
|
||||
sseAwsKeyManagementParams = new SSEAwsKeyManagementParams(encryptionKey);
|
||||
}
|
||||
return sseAwsKeyManagementParams;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the SSE-C structure for the AWS SDK.
|
||||
* This will contain a secret extracted from the bucket/configuration.
|
||||
* @return the customer key.
|
||||
*/
|
||||
@Retries.OnceExceptionsSwallowed
|
||||
private SSECustomerKey generateSSECustomerKey() {
|
||||
SSECustomerKey customerKey = new SSECustomerKey(
|
||||
getServerSideEncryptionKey(getConf())
|
||||
);
|
||||
getServerSideEncryptionKey(bucket, getConf()));
|
||||
return customerKey;
|
||||
}
|
||||
|
||||
|
|
|
@ -118,6 +118,8 @@ public final class S3AUtils {
|
|||
private static final String EOF_MESSAGE_IN_XML_PARSER
|
||||
= "Failed to sanitize XML document destined for handler class";
|
||||
|
||||
private static final String BUCKET_PATTERN = FS_S3A_BUCKET_PREFIX + "%s.%s";
|
||||
|
||||
|
||||
private S3AUtils() {
|
||||
}
|
||||
|
@ -540,7 +542,8 @@ public final class S3AUtils {
|
|||
/**
|
||||
* Create the AWS credentials from the providers, the URI and
|
||||
* the key {@link Constants#AWS_CREDENTIALS_PROVIDER} in the configuration.
|
||||
* @param binding Binding URI, may contain user:pass login details
|
||||
* @param binding Binding URI, may contain user:pass login details;
|
||||
* may be null
|
||||
* @param conf filesystem configuration
|
||||
* @return a credentials provider list
|
||||
* @throws IOException Problems loading the providers (including reading
|
||||
|
@ -560,7 +563,9 @@ public final class S3AUtils {
|
|||
credentials.add(InstanceProfileCredentialsProvider.getInstance());
|
||||
} else {
|
||||
for (Class<?> aClass : awsClasses) {
|
||||
credentials.add(createAWSCredentialProvider(conf, aClass));
|
||||
credentials.add(createAWSCredentialProvider(conf,
|
||||
aClass,
|
||||
binding));
|
||||
}
|
||||
}
|
||||
// make sure the logging message strips out any auth details
|
||||
|
@ -594,8 +599,8 @@ public final class S3AUtils {
|
|||
* attempted in order:
|
||||
*
|
||||
* <ol>
|
||||
* <li>a public constructor accepting
|
||||
* org.apache.hadoop.conf.Configuration</li>
|
||||
* <li>a public constructor accepting java.net.URI and
|
||||
* org.apache.hadoop.conf.Configuration</li>
|
||||
* <li>a public static method named getInstance that accepts no
|
||||
* arguments and returns an instance of
|
||||
* com.amazonaws.auth.AWSCredentialsProvider, or</li>
|
||||
|
@ -604,11 +609,14 @@ public final class S3AUtils {
|
|||
*
|
||||
* @param conf configuration
|
||||
* @param credClass credential class
|
||||
* @param uri URI of the FS
|
||||
* @return the instantiated class
|
||||
* @throws IOException on any instantiation failure.
|
||||
*/
|
||||
public static AWSCredentialsProvider createAWSCredentialProvider(
|
||||
Configuration conf, Class<?> credClass) throws IOException {
|
||||
Configuration conf,
|
||||
Class<?> credClass,
|
||||
URI uri) throws IOException {
|
||||
AWSCredentialsProvider credentials;
|
||||
String className = credClass.getName();
|
||||
if (!AWSCredentialsProvider.class.isAssignableFrom(credClass)) {
|
||||
|
@ -620,8 +628,15 @@ public final class S3AUtils {
|
|||
LOG.debug("Credential provider class is {}", className);
|
||||
|
||||
try {
|
||||
// new X(uri, conf)
|
||||
Constructor cons = getConstructor(credClass, URI.class,
|
||||
Configuration.class);
|
||||
if (cons != null) {
|
||||
credentials = (AWSCredentialsProvider)cons.newInstance(uri, conf);
|
||||
return credentials;
|
||||
}
|
||||
// new X(conf)
|
||||
Constructor cons = getConstructor(credClass, Configuration.class);
|
||||
cons = getConstructor(credClass, Configuration.class);
|
||||
if (cons != null) {
|
||||
credentials = (AWSCredentialsProvider)cons.newInstance(conf);
|
||||
return credentials;
|
||||
|
@ -676,7 +691,7 @@ public final class S3AUtils {
|
|||
* Return the access key and secret for S3 API use.
|
||||
* Credentials may exist in configuration, within credential providers
|
||||
* or indicated in the UserInfo of the name URI param.
|
||||
* @param name the URI for which we need the access keys.
|
||||
* @param name the URI for which we need the access keys; may be null
|
||||
* @param conf the Configuration object to interrogate for keys.
|
||||
* @return AWSAccessKeys
|
||||
* @throws IOException problems retrieving passwords from KMS.
|
||||
|
@ -687,11 +702,64 @@ public final class S3AUtils {
|
|||
S3xLoginHelper.extractLoginDetailsWithWarnings(name);
|
||||
Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
|
||||
conf, S3AFileSystem.class);
|
||||
String accessKey = getPassword(c, ACCESS_KEY, login.getUser());
|
||||
String secretKey = getPassword(c, SECRET_KEY, login.getPassword());
|
||||
String bucket = name != null ? name.getHost() : "";
|
||||
|
||||
// build the secrets. as getPassword() uses the last arg as
|
||||
// the return value if non-null, the ordering of
|
||||
// login -> bucket -> base is critical
|
||||
|
||||
// get the bucket values
|
||||
String accessKey = lookupPassword(bucket, c, ACCESS_KEY,
|
||||
login.getUser());
|
||||
|
||||
// finally the base
|
||||
String secretKey = lookupPassword(bucket, c, SECRET_KEY,
|
||||
login.getPassword());
|
||||
|
||||
// and override with any per bucket values
|
||||
return new S3xLoginHelper.Login(accessKey, secretKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a password from a configuration, including JCEKS files, handling both
|
||||
* the absolute key and bucket override.
|
||||
* @param bucket bucket or "" if none known
|
||||
* @param conf configuration
|
||||
* @param baseKey base key to look up, e.g "fs.s3a.secret.key"
|
||||
* @param overrideVal override value: if non empty this is used instead of
|
||||
* querying the configuration.
|
||||
* @return a password or "".
|
||||
* @throws IOException on any IO problem
|
||||
* @throws IllegalArgumentException bad arguments
|
||||
*/
|
||||
public static String lookupPassword(
|
||||
String bucket,
|
||||
Configuration conf,
|
||||
String baseKey,
|
||||
String overrideVal)
|
||||
throws IOException {
|
||||
String initialVal;
|
||||
Preconditions.checkArgument(baseKey.startsWith(FS_S3A_PREFIX),
|
||||
"%s does not start with $%s", baseKey, FS_S3A_PREFIX);
|
||||
// if there's a bucket, work with it
|
||||
if (StringUtils.isNotEmpty(bucket)) {
|
||||
String subkey = baseKey.substring(FS_S3A_PREFIX.length());
|
||||
String shortBucketKey = String.format(
|
||||
BUCKET_PATTERN, bucket, subkey);
|
||||
String longBucketKey = String.format(
|
||||
BUCKET_PATTERN, bucket, baseKey);
|
||||
|
||||
// set from the long key unless overidden.
|
||||
initialVal = getPassword(conf, longBucketKey, overrideVal);
|
||||
// then override from the short one if it is set
|
||||
initialVal = getPassword(conf, shortBucketKey, initialVal);
|
||||
} else {
|
||||
// no bucket, make the initial value the override value
|
||||
initialVal = overrideVal;
|
||||
}
|
||||
return getPassword(conf, baseKey, initialVal);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a password from a configuration, or, if a value is passed in,
|
||||
* pick that up instead.
|
||||
|
@ -702,10 +770,9 @@ public final class S3AUtils {
|
|||
* @return a password or "".
|
||||
* @throws IOException on any problem
|
||||
*/
|
||||
static String getPassword(Configuration conf, String key, String val)
|
||||
private static String getPassword(Configuration conf, String key, String val)
|
||||
throws IOException {
|
||||
String defVal = "";
|
||||
return getPassword(conf, key, val, defVal);
|
||||
return getPassword(conf, key, val, "");
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1124,16 +1191,21 @@ public final class S3AUtils {
|
|||
* This operation handles the case where the option has been
|
||||
* set in the provider or configuration to the option
|
||||
* {@code OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY}.
|
||||
* IOExceptions raised during retrieval are swallowed.
|
||||
* @param bucket bucket to query for
|
||||
* @param conf configuration to examine
|
||||
* @return the encryption key or null
|
||||
* @return the encryption key or ""
|
||||
* @throws IllegalArgumentException bad arguments.
|
||||
*/
|
||||
static String getServerSideEncryptionKey(Configuration conf) {
|
||||
static String getServerSideEncryptionKey(String bucket,
|
||||
Configuration conf) {
|
||||
try {
|
||||
return lookupPassword(conf, SERVER_SIDE_ENCRYPTION_KEY,
|
||||
return lookupPassword(bucket, conf,
|
||||
SERVER_SIDE_ENCRYPTION_KEY,
|
||||
getPassword(conf, OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY,
|
||||
null, null));
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot retrieve SERVER_SIDE_ENCRYPTION_KEY", e);
|
||||
LOG.error("Cannot retrieve " + SERVER_SIDE_ENCRYPTION_KEY, e);
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
@ -1142,16 +1214,19 @@ public final class S3AUtils {
|
|||
* Get the server-side encryption algorithm.
|
||||
* This includes validation of the configuration, checking the state of
|
||||
* the encryption key given the chosen algorithm.
|
||||
*
|
||||
* @param bucket bucket to query for
|
||||
* @param conf configuration to scan
|
||||
* @return the encryption mechanism (which will be {@code NONE} unless
|
||||
* one is set.
|
||||
* @throws IOException on any validation problem.
|
||||
*/
|
||||
static S3AEncryptionMethods getEncryptionAlgorithm(Configuration conf)
|
||||
throws IOException {
|
||||
static S3AEncryptionMethods getEncryptionAlgorithm(String bucket,
|
||||
Configuration conf) throws IOException {
|
||||
S3AEncryptionMethods sse = S3AEncryptionMethods.getMethod(
|
||||
conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM));
|
||||
String sseKey = getServerSideEncryptionKey(conf);
|
||||
lookupPassword(bucket, conf,
|
||||
SERVER_SIDE_ENCRYPTION_ALGORITHM, null));
|
||||
String sseKey = getServerSideEncryptionKey(bucket, conf);
|
||||
int sseKeyLen = StringUtils.isBlank(sseKey) ? 0 : sseKey.length();
|
||||
String diagnostics = passwordDiagnostics(sseKey, "key");
|
||||
switch (sse) {
|
||||
|
|
|
@ -34,9 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
public interface S3ClientFactory {
|
||||
|
||||
/**
|
||||
* Creates a new {@link AmazonS3} client. This method accepts the S3A file
|
||||
* system URI both in raw input form and validated form as separate arguments,
|
||||
* because both values may be useful in logging.
|
||||
* Creates a new {@link AmazonS3} client.
|
||||
*
|
||||
* @param name raw input S3A file system URI
|
||||
* @return S3 client
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.security.ProviderUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.ACCESS_KEY;
|
||||
import static org.apache.hadoop.fs.s3a.Constants.SECRET_KEY;
|
||||
|
@ -50,12 +51,13 @@ public class SimpleAWSCredentialsProvider implements AWSCredentialsProvider {
|
|||
private String secretKey;
|
||||
private IOException lookupIOE;
|
||||
|
||||
public SimpleAWSCredentialsProvider(Configuration conf) {
|
||||
public SimpleAWSCredentialsProvider(URI uri, Configuration conf) {
|
||||
try {
|
||||
String bucket = uri != null ? uri.getHost() : "";
|
||||
Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
|
||||
conf, S3AFileSystem.class);
|
||||
this.accessKey = S3AUtils.lookupPassword(c, ACCESS_KEY, null);
|
||||
this.secretKey = S3AUtils.lookupPassword(c, SECRET_KEY, null);
|
||||
this.accessKey = S3AUtils.lookupPassword(bucket, c, ACCESS_KEY, null);
|
||||
this.secretKey = S3AUtils.lookupPassword(bucket, c, SECRET_KEY, null);
|
||||
} catch (IOException e) {
|
||||
lookupIOE = e;
|
||||
}
|
||||
|
@ -71,7 +73,7 @@ public class SimpleAWSCredentialsProvider implements AWSCredentialsProvider {
|
|||
return new BasicAWSCredentials(accessKey, secretKey);
|
||||
}
|
||||
throw new CredentialInitializationException(
|
||||
"Access key, secret key or session token is unset");
|
||||
"Access key or secret key is unset");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,6 +24,7 @@ import com.amazonaws.auth.AWSCredentials;
|
|||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -31,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.security.ProviderUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.s3a.Constants.*;
|
||||
import static org.apache.hadoop.fs.s3a.S3AUtils.lookupPassword;
|
||||
|
||||
/**
|
||||
* Support session credentials for authenticating with AWS.
|
||||
|
@ -51,12 +53,18 @@ public class TemporaryAWSCredentialsProvider implements AWSCredentialsProvider {
|
|||
private IOException lookupIOE;
|
||||
|
||||
public TemporaryAWSCredentialsProvider(Configuration conf) {
|
||||
this(null, conf);
|
||||
}
|
||||
|
||||
public TemporaryAWSCredentialsProvider(URI uri, Configuration conf) {
|
||||
try {
|
||||
// determine the bucket
|
||||
String bucket = uri != null ? uri.getHost(): "";
|
||||
Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
|
||||
conf, S3AFileSystem.class);
|
||||
this.accessKey = S3AUtils.lookupPassword(c, ACCESS_KEY, null);
|
||||
this.secretKey = S3AUtils.lookupPassword(c, SECRET_KEY, null);
|
||||
this.sessionToken = S3AUtils.lookupPassword(c, SESSION_TOKEN, null);
|
||||
this.accessKey = lookupPassword(bucket, c, ACCESS_KEY, null);
|
||||
this.secretKey = lookupPassword(bucket, c, SECRET_KEY, null);
|
||||
this.sessionToken = lookupPassword(bucket, c, SESSION_TOKEN, null);
|
||||
} catch (IOException e) {
|
||||
lookupIOE = e;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a.auth;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -80,12 +81,14 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
|
|||
* Instantiate.
|
||||
* This calls {@link #getCredentials()} to fail fast on the inner
|
||||
* role credential retrieval.
|
||||
* @param uri URI of endpoint.
|
||||
* @param conf configuration
|
||||
* @throws IOException on IO problems and some parameter checking
|
||||
* @throws IllegalArgumentException invalid parameters
|
||||
* @throws AWSSecurityTokenServiceException problems getting credentials
|
||||
*/
|
||||
public AssumedRoleCredentialProvider(Configuration conf) throws IOException {
|
||||
public AssumedRoleCredentialProvider(URI uri, Configuration conf)
|
||||
throws IOException {
|
||||
|
||||
arn = conf.getTrimmed(ASSUMED_ROLE_ARN, "");
|
||||
if (StringUtils.isEmpty(arn)) {
|
||||
|
@ -101,7 +104,7 @@ public class AssumedRoleCredentialProvider implements AWSCredentialsProvider,
|
|||
if (this.getClass().equals(aClass)) {
|
||||
throw new IOException(E_FORBIDDEN_PROVIDER);
|
||||
}
|
||||
credentials.add(createAWSCredentialProvider(conf, aClass));
|
||||
credentials.add(createAWSCredentialProvider(conf, aClass, uri));
|
||||
}
|
||||
|
||||
// then the STS binding
|
||||
|
|
|
@ -548,15 +548,33 @@ to keep secrets outside Hadoop configuration files, storing them in encrypted
|
|||
files in local or Hadoop filesystems, and including them in requests.
|
||||
|
||||
The S3A configuration options with sensitive data
|
||||
(`fs.s3a.secret.key`, `fs.s3a.access.key` and `fs.s3a.session.token`) can
|
||||
(`fs.s3a.secret.key`, `fs.s3a.access.key`, `fs.s3a.session.token`
|
||||
and `fs.s3a.server-side-encryption.key`) can
|
||||
have their data saved to a binary file stored, with the values being read in
|
||||
when the S3A filesystem URL is used for data access. The reference to this
|
||||
credential provider is all that is passed as a direct configuration option.
|
||||
credential provider then declareed in the hadoop configuration.
|
||||
|
||||
For additional reading on the Hadoop Credential Provider API see:
|
||||
[Credential Provider API](../../../hadoop-project-dist/hadoop-common/CredentialProviderAPI.html).
|
||||
|
||||
|
||||
The following configuration options can be storeed in Hadoop Credential Provider
|
||||
stores.
|
||||
|
||||
```
|
||||
fs.s3a.access.key
|
||||
fs.s3a.secret.key
|
||||
fs.s3a.session.token
|
||||
fs.s3a.server-side-encryption.key
|
||||
fs.s3a.server-side-encryption-algorithm
|
||||
```
|
||||
|
||||
The first three are for authentication; the final two for
|
||||
[encryption](./encryption.html). Of the latter, only the encryption key can
|
||||
be considered "sensitive". However, being able to include the algorithm in
|
||||
the credentials allows for a JCECKS file to contain all the options needed
|
||||
to encrypt new data written to S3.
|
||||
|
||||
### Step 1: Create a credential file
|
||||
|
||||
A credential file can be created on any Hadoop filesystem; when creating one on HDFS or
|
||||
|
@ -565,7 +583,6 @@ private to the reader —though as directory permissions are not touched,
|
|||
users should verify that the directory containing the file is readable only by
|
||||
the current user.
|
||||
|
||||
|
||||
```bash
|
||||
hadoop credential create fs.s3a.access.key -value 123 \
|
||||
-provider jceks://hdfs@nn1.example.com:9001/user/backup/s3.jceks
|
||||
|
@ -621,9 +638,12 @@ over that of the `hadoop.security` list (i.e. they are prepended to the common l
|
|||
</property>
|
||||
```
|
||||
|
||||
Supporting a separate list in an `fs.s3a.` prefix permits per-bucket configuration
|
||||
of credential files.
|
||||
|
||||
This was added to suppport binding different credential providers on a per
|
||||
bucket basis, without adding alternative secrets in the credential list.
|
||||
However, some applications (e.g Hive) prevent the list of credential providers
|
||||
from being dynamically updated by users. As per-bucket secrets are now supported,
|
||||
it is better to include per-bucket keys in JCEKS files and other sources
|
||||
of credentials.
|
||||
|
||||
### Using secrets from credential providers
|
||||
|
||||
|
@ -1133,16 +1153,28 @@ Finally, the public `s3a://landsat-pds/` bucket can be accessed anonymously:
|
|||
|
||||
### Customizing S3A secrets held in credential files
|
||||
|
||||
Although most properties are automatically propagated from their
|
||||
`fs.s3a.bucket.`-prefixed custom entry to that of the base `fs.s3a.` option
|
||||
supporting secrets kept in Hadoop credential files is slightly more complex.
|
||||
This is because the property values are kept in these files, and cannot be
|
||||
dynamically patched.
|
||||
|
||||
Instead, callers need to create different configuration files for each
|
||||
bucket, setting the base secrets (`fs.s3a.access.key`, etc),
|
||||
then declare the path to the appropriate credential file in
|
||||
a bucket-specific version of the property `fs.s3a.security.credential.provider.path`.
|
||||
Secrets in JCEKS files or provided by other Hadoop credential providers
|
||||
can also be configured on a per bucket basis. The S3A client will
|
||||
look for the per-bucket secrets be
|
||||
|
||||
|
||||
Consider a JCEKS file with six keys:
|
||||
|
||||
```
|
||||
fs.s3a.access.key
|
||||
fs.s3a.secret.key
|
||||
fs.s3a.server-side-encryption-algorithm
|
||||
fs.s3a.bucket.nightly.access.key
|
||||
fs.s3a.bucket.nightly.secret.key
|
||||
fs.s3a.bucket.nightly.session.token
|
||||
fs.s3a.bucket.nightly.server-side-encryption.key
|
||||
fs.s3a.bucket.nightly.server-side-encryption-algorithm
|
||||
```
|
||||
|
||||
When accessing the bucket `s3a://nightly/`, the per-bucket configuration
|
||||
options for that backet will be used, here the access keys and token,
|
||||
and including the encryption algorithm and key.
|
||||
|
||||
|
||||
### <a name="per_bucket_endpoints"></a>Using Per-Bucket Configuration to access data round the world
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.fs.s3a;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
|
@ -41,9 +40,14 @@ import static org.apache.hadoop.test.LambdaTestUtils.*;
|
|||
|
||||
/**
|
||||
* Test SSE setup operations and errors raised.
|
||||
* Tests related to secret providers and AWS credentials are also
|
||||
* included, as they share some common setup operations.
|
||||
*/
|
||||
public class TestSSEConfiguration extends Assert {
|
||||
|
||||
/** Bucket to use for per-bucket options. */
|
||||
public static final String BUCKET = "dataset-1";
|
||||
|
||||
@Rule
|
||||
public Timeout testTimeout = new Timeout(
|
||||
S3ATestConstants.S3A_TEST_TIMEOUT
|
||||
|
@ -54,12 +58,12 @@ public class TestSSEConfiguration extends Assert {
|
|||
|
||||
@Test
|
||||
public void testSSECNoKey() throws Throwable {
|
||||
assertExceptionTextEquals(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), null);
|
||||
assertGetAlgorithmFails(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), null);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSSECBlankKey() throws Throwable {
|
||||
assertExceptionTextEquals(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), "");
|
||||
assertGetAlgorithmFails(SSE_C_NO_KEY_ERROR, SSE_C.getMethod(), "");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -74,74 +78,67 @@ public class TestSSEConfiguration extends Assert {
|
|||
|
||||
@Test
|
||||
public void testKMSGoodOldOptionName() throws Throwable {
|
||||
Configuration conf = new Configuration(false);
|
||||
Configuration conf = emptyConf();
|
||||
conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, SSE_KMS.getMethod());
|
||||
conf.set(OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, "kmskeyID");
|
||||
// verify key round trip
|
||||
assertEquals("kmskeyID", getServerSideEncryptionKey(conf));
|
||||
assertEquals("kmskeyID", getServerSideEncryptionKey(BUCKET, conf));
|
||||
// and that KMS lookup finds it
|
||||
assertEquals(SSE_KMS, getEncryptionAlgorithm(conf));
|
||||
assertEquals(SSE_KMS, getEncryptionAlgorithm(BUCKET, conf));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAESKeySet() throws Throwable {
|
||||
assertExceptionTextEquals(SSE_S3_WITH_KEY_ERROR,
|
||||
assertGetAlgorithmFails(SSE_S3_WITH_KEY_ERROR,
|
||||
SSE_S3.getMethod(), "setkey");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSSEEmptyKey() throws Throwable {
|
||||
public void testSSEEmptyKey() {
|
||||
// test the internal logic of the test setup code
|
||||
Configuration c = buildConf(SSE_C.getMethod(), "");
|
||||
assertEquals("", getServerSideEncryptionKey(c));
|
||||
assertEquals("", getServerSideEncryptionKey(BUCKET, c));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSSEKeyNull() throws Throwable {
|
||||
// test the internal logic of the test setup code
|
||||
final Configuration c = buildConf(SSE_C.getMethod(), null);
|
||||
assertNull("", getServerSideEncryptionKey(c));
|
||||
assertEquals("", getServerSideEncryptionKey(BUCKET, c));
|
||||
|
||||
intercept(IOException.class, SSE_C_NO_KEY_ERROR,
|
||||
new Callable<S3AEncryptionMethods>() {
|
||||
@Override
|
||||
public S3AEncryptionMethods call() throws Exception {
|
||||
return getEncryptionAlgorithm(c);
|
||||
}
|
||||
});
|
||||
() -> getEncryptionAlgorithm(BUCKET, c));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSSEKeyFromCredentialProvider() throws Exception {
|
||||
// set up conf to have a cred provider
|
||||
final Configuration conf = new Configuration();
|
||||
addFileProvider(conf);
|
||||
final Configuration conf = confWithProvider();
|
||||
String key = "provisioned";
|
||||
provisionSSEKey(conf, SERVER_SIDE_ENCRYPTION_KEY, key);
|
||||
setProviderOption(conf, SERVER_SIDE_ENCRYPTION_KEY, key);
|
||||
// let's set the password in config and ensure that it uses the credential
|
||||
// provider provisioned value instead.
|
||||
conf.set(SERVER_SIDE_ENCRYPTION_KEY, "keyInConfObject");
|
||||
|
||||
String sseKey = getServerSideEncryptionKey(conf);
|
||||
String sseKey = getServerSideEncryptionKey(BUCKET, conf);
|
||||
assertNotNull("Proxy password should not retrun null.", sseKey);
|
||||
assertEquals("Proxy password override did NOT work.", key, sseKey);
|
||||
}
|
||||
|
||||
/**
|
||||
* Very that the old key is picked up via the properties
|
||||
* Very that the old key is picked up via the properties.
|
||||
* @throws Exception failure
|
||||
*/
|
||||
@Test
|
||||
public void testOldKeyFromCredentialProvider() throws Exception {
|
||||
// set up conf to have a cred provider
|
||||
final Configuration conf = new Configuration();
|
||||
addFileProvider(conf);
|
||||
final Configuration conf = confWithProvider();
|
||||
String key = "provisioned";
|
||||
provisionSSEKey(conf, OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, key);
|
||||
setProviderOption(conf, OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, key);
|
||||
// let's set the password in config and ensure that it uses the credential
|
||||
// provider provisioned value instead.
|
||||
//conf.set(OLD_S3A_SERVER_SIDE_ENCRYPTION_KEY, "oldKeyInConf");
|
||||
String sseKey = getServerSideEncryptionKey(conf);
|
||||
String sseKey = getServerSideEncryptionKey(BUCKET, conf);
|
||||
assertNotNull("Proxy password should not retrun null.", sseKey);
|
||||
assertEquals("Proxy password override did NOT work.", key, sseKey);
|
||||
}
|
||||
|
@ -161,38 +158,35 @@ public class TestSSEConfiguration extends Assert {
|
|||
}
|
||||
|
||||
/**
|
||||
* Set the SSE Key via the provision API, not the config itself.
|
||||
* Set the an option under the configuration via the
|
||||
* {@link CredentialProviderFactory} APIs.
|
||||
* @param conf config
|
||||
* @param option option name
|
||||
* @param key key to set
|
||||
* @param value value to set option to.
|
||||
* @throws Exception failure
|
||||
*/
|
||||
void provisionSSEKey(final Configuration conf,
|
||||
String option, String key) throws Exception {
|
||||
void setProviderOption(final Configuration conf,
|
||||
String option, String value) throws Exception {
|
||||
// add our password to the provider
|
||||
final CredentialProvider provider =
|
||||
CredentialProviderFactory.getProviders(conf).get(0);
|
||||
provider.createCredentialEntry(option,
|
||||
key.toCharArray());
|
||||
value.toCharArray());
|
||||
provider.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the exception text from a config contains the expected string
|
||||
* @param expected expected substring
|
||||
* Assert that the exception text from {@link #getAlgorithm(String, String)}
|
||||
* is as expected.
|
||||
* @param expected expected substring in error
|
||||
* @param alg algorithm to ask for
|
||||
* @param key optional key value
|
||||
* @throws Exception anything else which gets raised
|
||||
*/
|
||||
public void assertExceptionTextEquals(String expected,
|
||||
public void assertGetAlgorithmFails(String expected,
|
||||
final String alg, final String key) throws Exception {
|
||||
intercept(IOException.class, expected,
|
||||
new Callable<S3AEncryptionMethods>() {
|
||||
@Override
|
||||
public S3AEncryptionMethods call() throws Exception {
|
||||
return getAlgorithm(alg, key);
|
||||
}
|
||||
});
|
||||
() -> getAlgorithm(alg, key));
|
||||
}
|
||||
|
||||
private S3AEncryptionMethods getAlgorithm(S3AEncryptionMethods algorithm,
|
||||
|
@ -203,11 +197,18 @@ public class TestSSEConfiguration extends Assert {
|
|||
|
||||
private S3AEncryptionMethods getAlgorithm(String algorithm, String key)
|
||||
throws IOException {
|
||||
return getEncryptionAlgorithm(buildConf(algorithm, key));
|
||||
return getEncryptionAlgorithm(BUCKET, buildConf(algorithm, key));
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a new configuration with the given S3-SSE algorithm
|
||||
* and key.
|
||||
* @param algorithm algorithm to use, may be null
|
||||
* @param key key, may be null
|
||||
* @return the new config.
|
||||
*/
|
||||
private Configuration buildConf(String algorithm, String key) {
|
||||
Configuration conf = new Configuration(false);
|
||||
Configuration conf = emptyConf();
|
||||
if (algorithm != null) {
|
||||
conf.set(SERVER_SIDE_ENCRYPTION_ALGORITHM, algorithm);
|
||||
} else {
|
||||
|
@ -220,4 +221,92 @@ public class TestSSEConfiguration extends Assert {
|
|||
}
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an empty conf: no -default or -site values.
|
||||
* @return an empty configuration
|
||||
*/
|
||||
private Configuration emptyConf() {
|
||||
return new Configuration(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a configuration with no defaults and bonded to a file
|
||||
* provider, so that
|
||||
* {@link #setProviderOption(Configuration, String, String)}
|
||||
* can be used to set a secret.
|
||||
* @return the configuration
|
||||
* @throws Exception any failure
|
||||
*/
|
||||
private Configuration confWithProvider() throws Exception {
|
||||
final Configuration conf = emptyConf();
|
||||
addFileProvider(conf);
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
||||
private static final String SECRET = "*secret*";
|
||||
|
||||
private static final String BUCKET_PATTERN = FS_S3A_BUCKET_PREFIX + "%s.%s";
|
||||
|
||||
@Test
|
||||
public void testGetPasswordFromConf() throws Throwable {
|
||||
final Configuration conf = emptyConf();
|
||||
conf.set(SECRET_KEY, SECRET);
|
||||
assertEquals(SECRET, lookupPassword(conf, SECRET_KEY, ""));
|
||||
assertEquals(SECRET, lookupPassword(conf, SECRET_KEY, "defVal"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetPasswordFromProvider() throws Throwable {
|
||||
final Configuration conf = confWithProvider();
|
||||
setProviderOption(conf, SECRET_KEY, SECRET);
|
||||
assertEquals(SECRET, lookupPassword(conf, SECRET_KEY, ""));
|
||||
assertSecretKeyEquals(conf, null, SECRET, "");
|
||||
assertSecretKeyEquals(conf, null, "overidden", "overidden");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBucketPasswordFromProvider() throws Throwable {
|
||||
final Configuration conf = confWithProvider();
|
||||
URI bucketURI = new URI("s3a://"+ BUCKET +"/");
|
||||
setProviderOption(conf, SECRET_KEY, "unbucketed");
|
||||
|
||||
String bucketedKey = String.format(BUCKET_PATTERN, BUCKET, SECRET_KEY);
|
||||
setProviderOption(conf, bucketedKey, SECRET);
|
||||
String overrideVal;
|
||||
overrideVal = "";
|
||||
assertSecretKeyEquals(conf, BUCKET, SECRET, overrideVal);
|
||||
assertSecretKeyEquals(conf, bucketURI.getHost(), SECRET, "");
|
||||
assertSecretKeyEquals(conf, bucketURI.getHost(), "overidden", "overidden");
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that a secret key is as expected.
|
||||
* @param conf configuration to examine
|
||||
* @param bucket bucket name
|
||||
* @param expected expected value
|
||||
* @param overrideVal override value in {@code S3AUtils.lookupPassword()}
|
||||
* @throws IOException IO problem
|
||||
*/
|
||||
private void assertSecretKeyEquals(Configuration conf,
|
||||
String bucket,
|
||||
String expected, String overrideVal) throws IOException {
|
||||
assertEquals(expected,
|
||||
S3AUtils.lookupPassword(bucket, conf, SECRET_KEY, overrideVal));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBucketPasswordFromProviderShort() throws Throwable {
|
||||
final Configuration conf = confWithProvider();
|
||||
URI bucketURI = new URI("s3a://"+ BUCKET +"/");
|
||||
setProviderOption(conf, SECRET_KEY, "unbucketed");
|
||||
|
||||
String bucketedKey = String.format(BUCKET_PATTERN, BUCKET, "secret.key");
|
||||
setProviderOption(conf, bucketedKey, SECRET);
|
||||
assertSecretKeyEquals(conf, BUCKET, SECRET, "");
|
||||
assertSecretKeyEquals(conf, bucketURI.getHost(), SECRET, "");
|
||||
assertSecretKeyEquals(conf, bucketURI.getHost(), "overidden", "overidden");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a.auth;
|
|||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.nio.file.AccessDeniedException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
@ -43,6 +44,7 @@ import org.apache.hadoop.fs.s3a.AWSBadRequestException;
|
|||
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
|
||||
import org.apache.hadoop.fs.s3a.MultipartUtils;
|
||||
import org.apache.hadoop.fs.s3a.S3AFileSystem;
|
||||
import org.apache.hadoop.fs.s3a.S3ATestConstants;
|
||||
import org.apache.hadoop.fs.s3a.S3AUtils;
|
||||
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
|
||||
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
|
||||
|
@ -73,6 +75,11 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
|
|||
|
||||
private static final Path ROOT = new Path("/");
|
||||
|
||||
/**
|
||||
* test URI, built in setup.
|
||||
*/
|
||||
private URI uri;
|
||||
|
||||
/**
|
||||
* A role FS; if non-null it is closed in teardown.
|
||||
*/
|
||||
|
@ -82,6 +89,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
|
|||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
assumeRoleTests();
|
||||
uri = new URI(S3ATestConstants.DEFAULT_CSVTEST_FILE);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -128,7 +136,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
|
|||
conf.set(ASSUMED_ROLE_SESSION_DURATION, "45m");
|
||||
bindRolePolicy(conf, RESTRICTED_POLICY);
|
||||
try (AssumedRoleCredentialProvider provider
|
||||
= new AssumedRoleCredentialProvider(conf)) {
|
||||
= new AssumedRoleCredentialProvider(uri, conf)) {
|
||||
LOG.info("Provider is {}", provider);
|
||||
AWSCredentials credentials = provider.getCredentials();
|
||||
assertNotNull("Null credentials from " + provider, credentials);
|
||||
|
@ -141,7 +149,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
|
|||
conf.set(ASSUMED_ROLE_ARN, ROLE_ARN_EXAMPLE);
|
||||
interceptClosing(AWSSecurityTokenServiceException.class,
|
||||
E_BAD_ROLE,
|
||||
() -> new AssumedRoleCredentialProvider(conf));
|
||||
() -> new AssumedRoleCredentialProvider(uri, conf));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -264,7 +272,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
|
|||
conf.set(ASSUMED_ROLE_ARN, "");
|
||||
interceptClosing(IOException.class,
|
||||
AssumedRoleCredentialProvider.E_NO_ROLE,
|
||||
() -> new AssumedRoleCredentialProvider(conf));
|
||||
() -> new AssumedRoleCredentialProvider(uri, conf));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -273,7 +281,7 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
|
|||
Configuration conf = new Configuration();
|
||||
conf.set(ASSUMED_ROLE_SESSION_DURATION, "30s");
|
||||
interceptClosing(IllegalArgumentException.class, "",
|
||||
() -> new AssumedRoleCredentialProvider(conf));
|
||||
() -> new AssumedRoleCredentialProvider(uri, conf));
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue