HADOOP-12548. Read s3a creds from a Credential Provider. Contributed by Larry McCay.

(cherry picked from commit 76fab26c5c)

Conflicts:
	hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

(cherry picked from commit 6731bb468d)
This commit is contained in:
cnauroth 2016-02-17 10:17:12 -08:00
parent b318429ac9
commit 26ce37a1a6
3 changed files with 307 additions and 71 deletions

View File

@ -51,6 +51,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-12426. Add Entry point for Kerberos health check
(Steve Loughran via cnauroth)
HADOOP-12548. Read s3a creds from a Credential Provider.
(Larry McCay via cnauroth)
IMPROVEMENTS
HADOOP-12458. Retries is typoed to spell Retires in parts of

View File

@ -158,23 +158,11 @@ public class S3AFileSystem extends FileSystem {
workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
this.getWorkingDirectory());
// Try to get our credentials or just connect anonymously
String accessKey = conf.get(ACCESS_KEY, null);
String secretKey = conf.get(SECRET_KEY, null);
String userInfo = name.getUserInfo();
if (userInfo != null) {
int index = userInfo.indexOf(':');
if (index != -1) {
accessKey = userInfo.substring(0, index);
secretKey = userInfo.substring(index + 1);
} else {
accessKey = userInfo;
}
}
AWSAccessKeys creds = getAWSAccessKeys(name, conf);
AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
new BasicAWSCredentialsProvider(accessKey, secretKey),
new BasicAWSCredentialsProvider(
creds.getAccessKey(), creds.getAccessSecret()),
new InstanceProfileCredentialsProvider(),
new AnonymousAWSCredentialsProvider()
);
@ -198,6 +186,65 @@ public class S3AFileSystem extends FileSystem {
awsConf.setSignerOverride(signerOverride);
}
initProxySupport(conf, awsConf, secureConnections);
initAmazonS3Client(conf, credentials, awsConf);
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
DEFAULT_MIN_MULTIPART_THRESHOLD);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
if (partSize < 5 * 1024 * 1024) {
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
partSize = 5 * 1024 * 1024;
}
if (multiPartThreshold < 5 * 1024 * 1024) {
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
multiPartThreshold = 5 * 1024 * 1024;
}
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(maxThreads *
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
threadPoolExecutor = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
newDaemonThreadFactory("s3a-transfer-shared-"));
threadPoolExecutor.allowCoreThreadTimeOut(true);
initTransferManager();
initCannedAcls(conf);
if (!s3.doesBucketExist(bucket)) {
throw new IOException("Bucket " + bucket + " does not exist");
}
initMultipartUploads(conf);
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
setConf(conf);
}
void initProxySupport(Configuration conf, ClientConfiguration awsConf,
boolean secureConnections) throws IllegalArgumentException,
IllegalArgumentException {
String proxyHost = conf.getTrimmed(PROXY_HOST, "");
int proxyPort = conf.getInt(PROXY_PORT, -1);
if (!proxyHost.isEmpty()) {
@ -237,7 +284,11 @@ public class S3AFileSystem extends FileSystem {
LOG.error(msg);
throw new IllegalArgumentException(msg);
}
}
private void initAmazonS3Client(Configuration conf,
AWSCredentialsProviderChain credentials, ClientConfiguration awsConf)
throws IllegalArgumentException {
s3 = new AmazonS3Client(credentials, awsConf);
String endPoint = conf.getTrimmed(ENDPOINT,"");
if (!endPoint.isEmpty()) {
@ -249,62 +300,27 @@ public class S3AFileSystem extends FileSystem {
throw new IllegalArgumentException(msg, e);
}
}
}
maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
DEFAULT_MIN_MULTIPART_THRESHOLD);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
if (partSize < 5 * 1024 * 1024) {
LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
partSize = 5 * 1024 * 1024;
}
if (multiPartThreshold < 5 * 1024 * 1024) {
LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
multiPartThreshold = 5 * 1024 * 1024;
}
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
if (maxThreads == 0) {
maxThreads = Runtime.getRuntime().availableProcessors() * 8;
}
if (coreThreads == 0) {
coreThreads = Runtime.getRuntime().availableProcessors() * 8;
}
long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
LinkedBlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(maxThreads *
conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
threadPoolExecutor = new ThreadPoolExecutor(
coreThreads,
maxThreads,
keepAliveTime,
TimeUnit.SECONDS,
workQueue,
newDaemonThreadFactory("s3a-transfer-shared-"));
threadPoolExecutor.allowCoreThreadTimeOut(true);
private void initTransferManager() {
TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize);
transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
transfers = new TransferManager(s3, threadPoolExecutor);
transfers.setConfiguration(transferConfiguration);
}
private void initCannedAcls(Configuration conf) {
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
if (!cannedACLName.isEmpty()) {
cannedACL = CannedAccessControlList.valueOf(cannedACLName);
} else {
cannedACL = null;
}
}
if (!s3.doesBucketExist(bucket)) {
throw new IOException("Bucket " + bucket + " does not exist");
}
private void initMultipartUploads(Configuration conf) {
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
DEFAULT_PURGE_EXISTING_MULTIPART);
long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
@ -315,10 +331,51 @@ public class S3AFileSystem extends FileSystem {
transfers.abortMultipartUploads(bucket, purgeBefore);
}
}
serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
setConf(conf);
/**
* Return the access key and secret for S3 API use.
* Credentials may exist in configuration, within credential providers
* or indicated in the UserInfo of the name URI param.
* @param name the URI for which we need the access keys.
* @param conf the Configuration object to interogate for keys.
* @return AWSAccessKeys
*/
AWSAccessKeys getAWSAccessKeys(URI name, Configuration conf)
throws IOException {
String accessKey = null;
String secretKey = null;
String userInfo = name.getUserInfo();
if (userInfo != null) {
int index = userInfo.indexOf(':');
if (index != -1) {
accessKey = userInfo.substring(0, index);
secretKey = userInfo.substring(index + 1);
} else {
accessKey = userInfo;
}
}
if (accessKey == null) {
try {
final char[] key = conf.getPassword(ACCESS_KEY);
if (key != null) {
accessKey = (new String(key)).trim();
}
} catch(IOException ioe) {
throw new IOException("Cannot find AWS access key.", ioe);
}
}
if (secretKey == null) {
try {
final char[] pass = conf.getPassword(SECRET_KEY);
if (pass != null) {
secretKey = (new String(pass)).trim();
}
} catch(IOException ioe) {
throw new IOException("Cannot find AWS secret key.", ioe);
}
}
return new AWSAccessKeys(accessKey, secretKey);
}
/**
@ -399,14 +456,14 @@ public class S3AFileSystem extends FileSystem {
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* @param f the file name to open
* @param permission
* @param permission the permission to set.
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize
* @param progress
* @throws IOException
* @param blockSize the requested block size.
* @param progress the progress reporter.
* @throws IOException in the event of IO related errors.
* @see #setPermission(Path, FsPermission)
*/
@Override
@ -435,7 +492,7 @@ public class S3AFileSystem extends FileSystem {
* @param f the existing file to be appended.
* @param bufferSize the size of the buffer to be used.
* @param progress for reporting progress if it is not null.
* @throws IOException
* @throws IOException indicating that append is not supported.
*/
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
@ -643,7 +700,7 @@ public class S3AFileSystem extends FileSystem {
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if delete is successful else false.
* @throws IOException
* @throws IOException due to inability to delete a directory or file.
*/
public boolean delete(Path f, boolean recursive) throws IOException {
if (LOG.isDebugEnabled()) {
@ -849,7 +906,7 @@ public class S3AFileSystem extends FileSystem {
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it.
*
* @param new_dir
* @param new_dir the current working directory.
*/
public void setWorkingDirectory(Path new_dir) {
workingDir = new_dir;
@ -1260,4 +1317,39 @@ public class S3AFileSystem extends FileSystem {
"such as not being able to access the network.");
LOG.info("Error Message: {}" + ace, ace);
}
/**
* This is a simple encapsulation of the
* S3 access key and secret.
*/
static class AWSAccessKeys {
private String accessKey = null;
private String accessSecret = null;
/**
* Constructor.
* @param key - AWS access key
* @param secret - AWS secret key
*/
public AWSAccessKeys(String key, String secret) {
accessKey = key;
accessSecret = secret;
}
/**
* Return the AWS access key.
* @return key
*/
public String getAccessKey() {
return accessKey;
}
/**
* Return the AWS secret key.
* @return secret
*/
public String getAccessSecret() {
return accessSecret;
}
}
}

View File

@ -32,7 +32,21 @@ import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.File;
import java.net.URI;
import java.io.IOException;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.alias.CredentialProvider;
import org.apache.hadoop.security.alias.CredentialProviderFactory;
import org.junit.rules.TemporaryFolder;
public class TestS3AConfiguration {
private static final String EXAMPLE_ID = "AKASOMEACCESSKEY";
private static final String EXAMPLE_KEY =
"RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE";
private Configuration conf;
private S3AFileSystem fs;
@ -44,6 +58,9 @@ public class TestS3AConfiguration {
@Rule
public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
/**
* Test if custom endpoint is picked up.
* <p/>
@ -59,7 +76,7 @@ public class TestS3AConfiguration {
* @throws Exception
*/
@Test
public void TestEndpoint() throws Exception {
public void testEndpoint() throws Exception {
conf = new Configuration();
String endpoint = conf.getTrimmed(TEST_ENDPOINT, "");
if (endpoint.isEmpty()) {
@ -85,7 +102,7 @@ public class TestS3AConfiguration {
}
@Test
public void TestProxyConnection() throws Exception {
public void testProxyConnection() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
@ -103,7 +120,7 @@ public class TestS3AConfiguration {
}
@Test
public void TestProxyPortWithoutHost() throws Exception {
public void testProxyPortWithoutHost() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.setInt(Constants.PROXY_PORT, 1);
@ -120,7 +137,7 @@ public class TestS3AConfiguration {
}
@Test
public void TestAutomaticProxyPortSelection() throws Exception {
public void testAutomaticProxyPortSelection() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
@ -145,7 +162,7 @@ public class TestS3AConfiguration {
}
@Test
public void TestUsernameInconsistentWithPassword() throws Exception {
public void testUsernameInconsistentWithPassword() throws Exception {
conf = new Configuration();
conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
conf.set(Constants.PROXY_HOST, "127.0.0.1");
@ -177,4 +194,128 @@ public class TestS3AConfiguration {
}
}
}
@Test
public void testCredsFromCredentialProvider() throws Exception {
// set up conf to have a cred provider
final Configuration conf = new Configuration();
final File file = tempDir.newFile("test.jks");
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
file.toURI());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
jks.toString());
provisionAccessKeys(conf);
S3AFileSystem s3afs = new S3AFileSystem();
conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
S3AFileSystem.AWSAccessKeys creds =
s3afs.getAWSAccessKeys(new URI("s3a://foobar"), conf);
assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey());
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
}
void provisionAccessKeys(final Configuration conf) throws Exception {
// add our creds to the provider
final CredentialProvider provider =
CredentialProviderFactory.getProviders(conf).get(0);
provider.createCredentialEntry(Constants.ACCESS_KEY,
EXAMPLE_ID.toCharArray());
provider.createCredentialEntry(Constants.SECRET_KEY,
EXAMPLE_KEY.toCharArray());
provider.flush();
}
@Test
public void testCredsFromUserInfo() throws Exception {
// set up conf to have a cred provider
final Configuration conf = new Configuration();
final File file = tempDir.newFile("test.jks");
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
file.toURI());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
jks.toString());
provisionAccessKeys(conf);
S3AFileSystem s3afs = new S3AFileSystem();
conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
URI uriWithUserInfo = new URI("s3a://123:456@foobar");
S3AFileSystem.AWSAccessKeys creds =
s3afs.getAWSAccessKeys(uriWithUserInfo, conf);
assertEquals("AccessKey incorrect.", "123", creds.getAccessKey());
assertEquals("SecretKey incorrect.", "456", creds.getAccessSecret());
}
@Test
public void testIDFromUserInfoSecretFromCredentialProvider()
throws Exception {
// set up conf to have a cred provider
final Configuration conf = new Configuration();
final File file = tempDir.newFile("test.jks");
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
file.toURI());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
jks.toString());
provisionAccessKeys(conf);
S3AFileSystem s3afs = new S3AFileSystem();
conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
URI uriWithUserInfo = new URI("s3a://123@foobar");
S3AFileSystem.AWSAccessKeys creds =
s3afs.getAWSAccessKeys(uriWithUserInfo, conf);
assertEquals("AccessKey incorrect.", "123", creds.getAccessKey());
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
}
@Test
public void testSecretFromCredentialProviderIDFromConfig() throws Exception {
// set up conf to have a cred provider
final Configuration conf = new Configuration();
final File file = tempDir.newFile("test.jks");
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
file.toURI());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
jks.toString());
// add our creds to the provider
final CredentialProvider provider =
CredentialProviderFactory.getProviders(conf).get(0);
provider.createCredentialEntry(Constants.SECRET_KEY,
EXAMPLE_KEY.toCharArray());
provider.flush();
S3AFileSystem s3afs = new S3AFileSystem();
conf.set(Constants.ACCESS_KEY, EXAMPLE_ID);
S3AFileSystem.AWSAccessKeys creds =
s3afs.getAWSAccessKeys(new URI("s3a://foobar"), conf);
assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey());
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
}
@Test
public void testIDFromCredentialProviderSecretFromConfig() throws Exception {
// set up conf to have a cred provider
final Configuration conf = new Configuration();
final File file = tempDir.newFile("test.jks");
final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
file.toURI());
conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
jks.toString());
// add our creds to the provider
final CredentialProvider provider =
CredentialProviderFactory.getProviders(conf).get(0);
provider.createCredentialEntry(Constants.ACCESS_KEY,
EXAMPLE_ID.toCharArray());
provider.flush();
S3AFileSystem s3afs = new S3AFileSystem();
conf.set(Constants.SECRET_KEY, EXAMPLE_KEY);
S3AFileSystem.AWSAccessKeys creds =
s3afs.getAWSAccessKeys(new URI("s3a://foobar"), conf);
assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey());
assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
}
}