From 76fab26c5c02cef38924d04136407489fd9457d9 Mon Sep 17 00:00:00 2001 From: cnauroth Date: Wed, 17 Feb 2016 10:17:12 -0800 Subject: [PATCH] HADOOP-12548. Read s3a creds from a Credential Provider. Contributed by Larry McCay. --- .../hadoop-common/CHANGES.txt | 3 + .../apache/hadoop/fs/s3a/S3AFileSystem.java | 212 +++++++++++++----- .../hadoop/fs/s3a/TestS3AConfiguration.java | 151 ++++++++++++- 3 files changed, 301 insertions(+), 65 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index cde5f3b7ccb..92699606157 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -766,6 +766,9 @@ Release 2.8.0 - UNRELEASED HADOOP-12426. Add Entry point for Kerberos health check (Steve Loughran via cnauroth) + HADOOP-12548. Read s3a creds from a Credential Provider. + (Larry McCay via cnauroth) + IMPROVEMENTS HADOOP-12458. Retries is typoed to spell Retires in parts of diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 5ea6bec80c8..b9590ea4de0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -106,23 +106,11 @@ public class S3AFileSystem extends FileSystem { workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri, this.getWorkingDirectory()); - // Try to get our credentials or just connect anonymously - String accessKey = conf.get(ACCESS_KEY, null); - String secretKey = conf.get(SECRET_KEY, null); - - String userInfo = name.getUserInfo(); - if (userInfo != null) { - int index = userInfo.indexOf(':'); - if (index != -1) { - accessKey = userInfo.substring(0, index); - secretKey = userInfo.substring(index + 1); - } else { - accessKey = userInfo; - } - } + AWSAccessKeys creds = getAWSAccessKeys(name, conf); AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain( - new BasicAWSCredentialsProvider(accessKey, secretKey), + new BasicAWSCredentialsProvider( + creds.getAccessKey(), creds.getAccessSecret()), new InstanceProfileCredentialsProvider(), new AnonymousAWSCredentialsProvider() ); @@ -146,6 +134,59 @@ public class S3AFileSystem extends FileSystem { awsConf.setSignerOverride(signerOverride); } + initProxySupport(conf, awsConf, secureConnections); + + initAmazonS3Client(conf, credentials, awsConf); + + maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); + partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); + multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, + DEFAULT_MIN_MULTIPART_THRESHOLD); + enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); + + if (partSize < 5 * 1024 * 1024) { + LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); + partSize = 5 * 1024 * 1024; + } + + if (multiPartThreshold < 5 * 1024 * 1024) { + LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); + multiPartThreshold = 5 * 1024 * 1024; + } + + int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); + if (maxThreads < 2) { + LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2."); + maxThreads = 2; + } + int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS); + if (totalTasks < 1) { + LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1."); + totalTasks = 1; + } + long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); + threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads, + maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS, + "s3a-transfer-shared"); + + initTransferManager(); + + initCannedAcls(conf); + + if (!s3.doesBucketExist(bucket)) { + throw new IOException("Bucket " + bucket + " does not exist"); + } + + initMultipartUploads(conf); + + serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); + + setConf(conf); + } + + void initProxySupport(Configuration conf, ClientConfiguration awsConf, + boolean secureConnections) throws IllegalArgumentException, + IllegalArgumentException { String proxyHost = conf.getTrimmed(PROXY_HOST, ""); int proxyPort = conf.getInt(PROXY_PORT, -1); if (!proxyHost.isEmpty()) { @@ -185,7 +226,11 @@ public class S3AFileSystem extends FileSystem { LOG.error(msg); throw new IllegalArgumentException(msg); } + } + private void initAmazonS3Client(Configuration conf, + AWSCredentialsProviderChain credentials, ClientConfiguration awsConf) + throws IllegalArgumentException { s3 = new AmazonS3Client(credentials, awsConf); String endPoint = conf.getTrimmed(ENDPOINT,""); if (!endPoint.isEmpty()) { @@ -197,56 +242,27 @@ public class S3AFileSystem extends FileSystem { throw new IllegalArgumentException(msg, e); } } + } - maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS); - partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); - multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD, - DEFAULT_MIN_MULTIPART_THRESHOLD); - enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); - - if (partSize < 5 * 1024 * 1024) { - LOG.error(MULTIPART_SIZE + " must be at least 5 MB"); - partSize = 5 * 1024 * 1024; - } - - if (multiPartThreshold < 5 * 1024 * 1024) { - LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB"); - multiPartThreshold = 5 * 1024 * 1024; - } - - int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); - if (maxThreads < 2) { - LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2."); - maxThreads = 2; - } - int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS); - if (totalTasks < 1) { - LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1."); - totalTasks = 1; - } - long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME); - threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads, - maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS, - "s3a-transfer-shared"); - + private void initTransferManager() { TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration(); transferConfiguration.setMinimumUploadPartSize(partSize); transferConfiguration.setMultipartUploadThreshold(multiPartThreshold); transfers = new TransferManager(s3, threadPoolExecutor); transfers.setConfiguration(transferConfiguration); + } + private void initCannedAcls(Configuration conf) { String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL); if (!cannedACLName.isEmpty()) { cannedACL = CannedAccessControlList.valueOf(cannedACLName); } else { cannedACL = null; } + } - if (!s3.doesBucketExist(bucket)) { - throw new IOException("Bucket " + bucket + " does not exist"); - } - + private void initMultipartUploads(Configuration conf) { boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, DEFAULT_PURGE_EXISTING_MULTIPART); long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE, @@ -257,10 +273,51 @@ public class S3AFileSystem extends FileSystem { transfers.abortMultipartUploads(bucket, purgeBefore); } + } - serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM); - - setConf(conf); + /** + * Return the access key and secret for S3 API use. + * Credentials may exist in configuration, within credential providers + * or indicated in the UserInfo of the name URI param. + * @param name the URI for which we need the access keys. + * @param conf the Configuration object to interogate for keys. + * @return AWSAccessKeys + */ + AWSAccessKeys getAWSAccessKeys(URI name, Configuration conf) + throws IOException { + String accessKey = null; + String secretKey = null; + String userInfo = name.getUserInfo(); + if (userInfo != null) { + int index = userInfo.indexOf(':'); + if (index != -1) { + accessKey = userInfo.substring(0, index); + secretKey = userInfo.substring(index + 1); + } else { + accessKey = userInfo; + } + } + if (accessKey == null) { + try { + final char[] key = conf.getPassword(ACCESS_KEY); + if (key != null) { + accessKey = (new String(key)).trim(); + } + } catch(IOException ioe) { + throw new IOException("Cannot find AWS access key.", ioe); + } + } + if (secretKey == null) { + try { + final char[] pass = conf.getPassword(SECRET_KEY); + if (pass != null) { + secretKey = (new String(pass)).trim(); + } + } catch(IOException ioe) { + throw new IOException("Cannot find AWS secret key.", ioe); + } + } + return new AWSAccessKeys(accessKey, secretKey); } /** @@ -341,14 +398,14 @@ public class S3AFileSystem extends FileSystem { * Create an FSDataOutputStream at the indicated Path with write-progress * reporting. * @param f the file name to open - * @param permission + * @param permission the permission to set. * @param overwrite if a file with this name already exists, then if true, * the file will be overwritten, and if false an error will be thrown. * @param bufferSize the size of the buffer to be used. * @param replication required block replication for the file. - * @param blockSize - * @param progress - * @throws IOException + * @param blockSize the requested block size. + * @param progress the progress reporter. + * @throws IOException in the event of IO related errors. * @see #setPermission(Path, FsPermission) */ @Override @@ -377,7 +434,7 @@ public class S3AFileSystem extends FileSystem { * @param f the existing file to be appended. * @param bufferSize the size of the buffer to be used. * @param progress for reporting progress if it is not null. - * @throws IOException + * @throws IOException indicating that append is not supported. */ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { @@ -585,7 +642,7 @@ public class S3AFileSystem extends FileSystem { * true, the directory is deleted else throws an exception. In * case of a file the recursive can be set to either true or false. * @return true if delete is successful else false. - * @throws IOException + * @throws IOException due to inability to delete a directory or file. */ public boolean delete(Path f, boolean recursive) throws IOException { if (LOG.isDebugEnabled()) { @@ -791,7 +848,7 @@ public class S3AFileSystem extends FileSystem { * Set the current working directory for the given file system. All relative * paths will be resolved relative to it. * - * @param new_dir + * @param new_dir the current working directory. */ public void setWorkingDirectory(Path new_dir) { workingDir = new_dir; @@ -1202,4 +1259,39 @@ public class S3AFileSystem extends FileSystem { "such as not being able to access the network."); LOG.info("Error Message: {}" + ace, ace); } + + /** + * This is a simple encapsulation of the + * S3 access key and secret. + */ + static class AWSAccessKeys { + private String accessKey = null; + private String accessSecret = null; + + /** + * Constructor. + * @param key - AWS access key + * @param secret - AWS secret key + */ + public AWSAccessKeys(String key, String secret) { + accessKey = key; + accessSecret = secret; + } + + /** + * Return the AWS access key. + * @return key + */ + public String getAccessKey() { + return accessKey; + } + + /** + * Return the AWS secret key. + * @return secret + */ + public String getAccessSecret() { + return accessSecret; + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java index 25068f8d9aa..3db84db8489 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java @@ -32,7 +32,21 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; +import java.io.File; +import java.net.URI; +import java.io.IOException; + +import org.apache.hadoop.security.ProviderUtils; +import org.apache.hadoop.security.alias.CredentialProvider; +import org.apache.hadoop.security.alias.CredentialProviderFactory; + +import org.junit.rules.TemporaryFolder; + public class TestS3AConfiguration { + private static final String EXAMPLE_ID = "AKASOMEACCESSKEY"; + private static final String EXAMPLE_KEY = + "RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE"; + private Configuration conf; private S3AFileSystem fs; @@ -44,6 +58,9 @@ public class TestS3AConfiguration { @Rule public Timeout testTimeout = new Timeout(30 * 60 * 1000); + @Rule + public final TemporaryFolder tempDir = new TemporaryFolder(); + /** * Test if custom endpoint is picked up. *

@@ -59,7 +76,7 @@ public class TestS3AConfiguration { * @throws Exception */ @Test - public void TestEndpoint() throws Exception { + public void testEndpoint() throws Exception { conf = new Configuration(); String endpoint = conf.getTrimmed(TEST_ENDPOINT, ""); if (endpoint.isEmpty()) { @@ -85,7 +102,7 @@ public class TestS3AConfiguration { } @Test - public void TestProxyConnection() throws Exception { + public void testProxyConnection() throws Exception { conf = new Configuration(); conf.setInt(Constants.MAX_ERROR_RETRIES, 2); conf.set(Constants.PROXY_HOST, "127.0.0.1"); @@ -103,7 +120,7 @@ public class TestS3AConfiguration { } @Test - public void TestProxyPortWithoutHost() throws Exception { + public void testProxyPortWithoutHost() throws Exception { conf = new Configuration(); conf.setInt(Constants.MAX_ERROR_RETRIES, 2); conf.setInt(Constants.PROXY_PORT, 1); @@ -120,7 +137,7 @@ public class TestS3AConfiguration { } @Test - public void TestAutomaticProxyPortSelection() throws Exception { + public void testAutomaticProxyPortSelection() throws Exception { conf = new Configuration(); conf.setInt(Constants.MAX_ERROR_RETRIES, 2); conf.set(Constants.PROXY_HOST, "127.0.0.1"); @@ -145,7 +162,7 @@ public class TestS3AConfiguration { } @Test - public void TestUsernameInconsistentWithPassword() throws Exception { + public void testUsernameInconsistentWithPassword() throws Exception { conf = new Configuration(); conf.setInt(Constants.MAX_ERROR_RETRIES, 2); conf.set(Constants.PROXY_HOST, "127.0.0.1"); @@ -177,4 +194,128 @@ public class TestS3AConfiguration { } } } + + @Test + public void testCredsFromCredentialProvider() throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + provisionAccessKeys(conf); + + S3AFileSystem s3afs = new S3AFileSystem(); + conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM"); + S3AFileSystem.AWSAccessKeys creds = + s3afs.getAWSAccessKeys(new URI("s3a://foobar"), conf); + assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey()); + assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret()); + } + + void provisionAccessKeys(final Configuration conf) throws Exception { + // add our creds to the provider + final CredentialProvider provider = + CredentialProviderFactory.getProviders(conf).get(0); + provider.createCredentialEntry(Constants.ACCESS_KEY, + EXAMPLE_ID.toCharArray()); + provider.createCredentialEntry(Constants.SECRET_KEY, + EXAMPLE_KEY.toCharArray()); + provider.flush(); + } + + @Test + public void testCredsFromUserInfo() throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + provisionAccessKeys(conf); + + S3AFileSystem s3afs = new S3AFileSystem(); + conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM"); + URI uriWithUserInfo = new URI("s3a://123:456@foobar"); + S3AFileSystem.AWSAccessKeys creds = + s3afs.getAWSAccessKeys(uriWithUserInfo, conf); + assertEquals("AccessKey incorrect.", "123", creds.getAccessKey()); + assertEquals("SecretKey incorrect.", "456", creds.getAccessSecret()); + } + + @Test + public void testIDFromUserInfoSecretFromCredentialProvider() + throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + provisionAccessKeys(conf); + + S3AFileSystem s3afs = new S3AFileSystem(); + conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM"); + URI uriWithUserInfo = new URI("s3a://123@foobar"); + S3AFileSystem.AWSAccessKeys creds = + s3afs.getAWSAccessKeys(uriWithUserInfo, conf); + assertEquals("AccessKey incorrect.", "123", creds.getAccessKey()); + assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret()); + } + + @Test + public void testSecretFromCredentialProviderIDFromConfig() throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + // add our creds to the provider + final CredentialProvider provider = + CredentialProviderFactory.getProviders(conf).get(0); + provider.createCredentialEntry(Constants.SECRET_KEY, + EXAMPLE_KEY.toCharArray()); + provider.flush(); + + S3AFileSystem s3afs = new S3AFileSystem(); + conf.set(Constants.ACCESS_KEY, EXAMPLE_ID); + S3AFileSystem.AWSAccessKeys creds = + s3afs.getAWSAccessKeys(new URI("s3a://foobar"), conf); + assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey()); + assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret()); + } + + @Test + public void testIDFromCredentialProviderSecretFromConfig() throws Exception { + // set up conf to have a cred provider + final Configuration conf = new Configuration(); + final File file = tempDir.newFile("test.jks"); + final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider( + file.toURI()); + conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH, + jks.toString()); + + // add our creds to the provider + final CredentialProvider provider = + CredentialProviderFactory.getProviders(conf).get(0); + provider.createCredentialEntry(Constants.ACCESS_KEY, + EXAMPLE_ID.toCharArray()); + provider.flush(); + + S3AFileSystem s3afs = new S3AFileSystem(); + conf.set(Constants.SECRET_KEY, EXAMPLE_KEY); + S3AFileSystem.AWSAccessKeys creds = + s3afs.getAWSAccessKeys(new URI("s3a://foobar"), conf); + assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey()); + assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret()); + } }