HADOOP-16445. Allow separate custom signing algorithms for S3 and DDB (#1332)

This commit is contained in:
Siddharth Seth 2019-09-21 11:50:45 +05:30 committed by GitHub
parent dbdc612b3b
commit e02b1023c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 402 additions and 8 deletions

View File

@ -350,9 +350,43 @@ private Constants() {
public static final String SERVER_SIDE_ENCRYPTION_KEY =
"fs.s3a.server-side-encryption.key";
//override signature algorithm used for signing requests
/**
* List of custom Signers. The signer class will be loaded, and the signer
* name will be associated with this signer class in the S3 SDK. e.g. Single
* CustomSigner -> 'CustomSigner:org.apache...CustomSignerClass Multiple
* CustomSigners -> 'CSigner1:CustomSignerClass1,CSigner2:CustomerSignerClass2
*/
public static final String CUSTOM_SIGNERS = "fs.s3a.custom.signers";
/**
* There's 3 parameters that can be used to specify a non-default signing
* algorithm. fs.s3a.signing-algorithm - This property has existed for the
* longest time. If specified, without either of the other 2 properties being
* specified, this signing algorithm will be used for S3 and DDB (S3Guard).
* The other 2 properties override this value for S3 or DDB.
* fs.s3a.s3.signing-algorithm - Allows overriding the S3 Signing algorithm.
* This does not affect DDB. Specifying this property without specifying
* fs.s3a.signing-algorithm will only update the signing algorithm for S3
* requests, and the default will be used for DDB fs.s3a.ddb.signing-algorithm
* - Allows overriding the DDB Signing algorithm. This does not affect S3.
* Specifying this property without specifying fs.s3a.signing-algorithm will
* only update the signing algorithm for DDB requests, and the default will be
* used for S3
*/
public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";
public static final String SIGNING_ALGORITHM_S3 =
"fs.s3a." + Constants.AWS_SERVICE_IDENTIFIER_S3.toLowerCase()
+ ".signing-algorithm";
public static final String SIGNING_ALGORITHM_DDB =
"fs.s3a." + Constants.AWS_SERVICE_IDENTIFIER_DDB.toLowerCase()
+ "signing-algorithm";
public static final String SIGNING_ALGORITHM_STS =
"fs.s3a." + Constants.AWS_SERVICE_IDENTIFIER_STS.toLowerCase()
+ "signing-algorithm";
public static final String S3N_FOLDER_SUFFIX = "_$folder$";
public static final String FS_S3A_BLOCK_SIZE = "fs.s3a.block.size";
public static final String FS_S3A = "s3a";
@ -796,4 +830,7 @@ private Constants() {
public static final String S3GUARD_CONSISTENCY_RETRY_INTERVAL_DEFAULT =
"2s";
public static final String AWS_SERVICE_IDENTIFIER_S3 = "S3";
public static final String AWS_SERVICE_IDENTIFIER_DDB = "DDB";
public static final String AWS_SERVICE_IDENTIFIER_STS = "STS";
}

View File

@ -55,7 +55,8 @@ public AmazonS3 createS3Client(URI name,
final AWSCredentialsProvider credentials,
final String userAgentSuffix) throws IOException {
Configuration conf = getConf();
final ClientConfiguration awsConf = S3AUtils.createAwsConf(getConf(), bucket);
final ClientConfiguration awsConf = S3AUtils
.createAwsConf(getConf(), bucket, Constants.AWS_SERVICE_IDENTIFIER_S3);
if (!StringUtils.isEmpty(userAgentSuffix)) {
awsConf.setUserAgentSuffix(userAgentSuffix);
}

View File

@ -259,6 +259,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
private MagicCommitIntegration committerIntegration;
private AWSCredentialProviderList credentials;
private SignerManager signerManager;
private ITtlTimeProvider ttlTimeProvider;
@ -359,6 +360,9 @@ public void initialize(URI name, Configuration originalConf)
}
useListV1 = (listVersion == 1);
signerManager = new SignerManager();
signerManager.initCustomSigners(conf);
// creates the AWS client, including overriding auth chain if
// the FS came with a DT
// this may do some patching of the configuration (e.g. setting
@ -3053,6 +3057,8 @@ public void close() throws IOException {
instrumentation = null;
closeAutocloseables(LOG, credentials);
cleanupWithLogger(LOG, delegationTokens.orElse(null));
cleanupWithLogger(LOG, signerManager);
signerManager = null;
credentials = null;
}
}

View File

@ -1203,14 +1203,59 @@ public static void deleteWithWarning(FileSystem fs,
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @return new AWS client configuration
* @throws IOException problem creating AWS client configuration
*
* @deprecated use {@link #createAwsConf(Configuration, String, String)}
*/
@Deprecated
public static ClientConfiguration createAwsConf(Configuration conf,
String bucket)
throws IOException {
return createAwsConf(conf, bucket, null);
}
/**
* Create a new AWS {@code ClientConfiguration}. All clients to AWS services
* <i>MUST</i> use this or the equivalents for the specific service for
* consistent setup of connectivity, UA, proxy settings.
*
* @param conf The Hadoop configuration
* @param bucket Optional bucket to use to look up per-bucket proxy secrets
* @param awsServiceIdentifier a string representing the AWS service (S3,
* DDB, etc) for which the ClientConfiguration is being created.
* @return new AWS client configuration
* @throws IOException problem creating AWS client configuration
*/
public static ClientConfiguration createAwsConf(Configuration conf,
String bucket, String awsServiceIdentifier)
throws IOException {
final ClientConfiguration awsConf = new ClientConfiguration();
initConnectionSettings(conf, awsConf);
initProxySupport(conf, bucket, awsConf);
initUserAgent(conf, awsConf);
if (StringUtils.isNotEmpty(awsServiceIdentifier)) {
String configKey = null;
switch (awsServiceIdentifier) {
case AWS_SERVICE_IDENTIFIER_S3:
configKey = SIGNING_ALGORITHM_S3;
break;
case AWS_SERVICE_IDENTIFIER_DDB:
configKey = SIGNING_ALGORITHM_DDB;
break;
case AWS_SERVICE_IDENTIFIER_STS:
configKey = SIGNING_ALGORITHM_STS;
break;
default:
// Nothing to do. The original signer override is already setup
}
if (configKey != null) {
String signerOverride = conf.getTrimmed(configKey, "");
if (!signerOverride.isEmpty()) {
LOG.debug("Signer override for {}} = {}", awsServiceIdentifier,
signerOverride);
awsConf.setSignerOverride(signerOverride);
}
}
}
return awsConf;
}

View File

@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.auth.Signer;
import com.amazonaws.auth.SignerFactory;
import java.io.Closeable;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
/**
* Class to handle custom signers.
*/
public class SignerManager implements Closeable {
private static final Logger LOG = LoggerFactory
.getLogger(SignerManager.class);
public SignerManager() {
}
/**
* Initialize custom signers and register them with the AWS SDK.
*
* @param conf Hadoop configuration
*/
public void initCustomSigners(Configuration conf) {
String[] customSigners = conf.getTrimmedStrings(CUSTOM_SIGNERS);
if (customSigners == null || customSigners.length == 0) {
// No custom signers specified, nothing to do.
LOG.debug("No custom signers specified");
return;
}
for (String customSigner : customSigners) {
String[] parts = customSigner.split(":");
if (parts.length != 2) {
String message =
"Invalid format (Expected name:SignerClass) for CustomSigner: ["
+ customSigner
+ "]";
LOG.error(message);
throw new IllegalArgumentException(message);
}
maybeRegisterSigner(parts[0], parts[1], conf);
}
}
/*
* Make sure the signer class is registered once with the AWS SDK
*/
private static void maybeRegisterSigner(String signerName,
String signerClassName, Configuration conf) {
try {
SignerFactory.getSignerByTypeAndService(signerName, null);
} catch (IllegalArgumentException e) {
// Signer is not registered with the AWS SDK.
// Load the class and register the signer.
Class<? extends Signer> clazz = null;
try {
clazz = (Class<? extends Signer>) conf.getClassByName(signerClassName);
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException(String
.format("Signer class [%s] not found for signer [%s]",
signerClassName, signerName), cnfe);
}
LOG.debug("Registering Custom Signer - [{}->{}]", signerName,
clazz.getName());
synchronized (SignerManager.class) {
SignerFactory.registerSigner(signerName, clazz);
}
}
}
@Override
public void close() throws IOException {
}
}

View File

@ -31,12 +31,14 @@
import com.amazonaws.services.securitytoken.model.Credentials;
import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AUtils;
@ -73,7 +75,8 @@ public static AWSSecurityTokenServiceClientBuilder builder(
final Configuration conf,
final String bucket,
final AWSCredentialsProvider credentials) throws IOException {
final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket);
final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket,
Constants.AWS_SERVICE_IDENTIFIER_STS);
String endpoint = conf.getTrimmed(DELEGATION_TOKEN_ENDPOINT,
DEFAULT_DELEGATION_TOKEN_ENDPOINT);
String region = conf.getTrimmed(DELEGATION_TOKEN_REGION,
@ -99,7 +102,8 @@ public static AWSSecurityTokenServiceClientBuilder builder(
final AWSCredentialsProvider credentials,
final String stsEndpoint,
final String stsRegion) throws IOException {
final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket);
final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket,
Constants.AWS_SERVICE_IDENTIFIER_STS);
return builder(credentials, awsConf, stsEndpoint, stsRegion);
}

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
@ -301,7 +302,8 @@ private synchronized Optional<STSClientFactory.STSClient> maybeInitSTS()
invoker = new Invoker(new S3ARetryPolicy(conf), LOG_EVENT);
ClientConfiguration awsConf =
S3AUtils.createAwsConf(conf, uri.getHost());
S3AUtils.createAwsConf(conf, uri.getHost(),
Constants.AWS_SERVICE_IDENTIFIER_STS);
AWSSecurityTokenService tokenService =
STSClientFactory.builder(parentAuthChain,
awsConf,

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AUtils;
import static org.apache.hadoop.fs.s3a.Constants.S3GUARD_DDB_REGION_KEY;
@ -80,7 +81,8 @@ public AmazonDynamoDB createDynamoDBClient(String defaultRegion,
"Should have been configured before usage");
final Configuration conf = getConf();
final ClientConfiguration awsConf = S3AUtils.createAwsConf(conf, bucket);
final ClientConfiguration awsConf = S3AUtils
.createAwsConf(conf, bucket, Constants.AWS_SERVICE_IDENTIFIER_DDB);
final String region = getRegion(conf, defaultRegion);
LOG.debug("Creating DynamoDB client in region {}", region);

View File

@ -22,6 +22,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.S3ClientOptions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.conf.Configuration;
@ -30,12 +31,14 @@
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.File;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
@ -617,4 +620,69 @@ public void testSecurityCredentialPropagationEndToEnd() throws Exception {
"override,base");
}
@Test(timeout = 10_000L)
public void testS3SpecificSignerOverride() throws IOException {
ClientConfiguration clientConfiguration = null;
Configuration config;
String signerOverride = "testSigner";
String s3SignerOverride = "testS3Signer";
// Default SIGNING_ALGORITHM, overridden for S3 only
config = new Configuration();
config.set(SIGNING_ALGORITHM_S3, s3SignerOverride);
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_S3);
Assert.assertEquals(s3SignerOverride,
clientConfiguration.getSignerOverride());
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_DDB);
Assert.assertNull(clientConfiguration.getSignerOverride());
// Configured base SIGNING_ALGORITHM, overridden for S3 only
config = new Configuration();
config.set(SIGNING_ALGORITHM, signerOverride);
config.set(SIGNING_ALGORITHM_S3, s3SignerOverride);
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_S3);
Assert.assertEquals(s3SignerOverride,
clientConfiguration.getSignerOverride());
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_DDB);
Assert
.assertEquals(signerOverride, clientConfiguration.getSignerOverride());
}
@Test(timeout = 10_000L)
public void testDdbSpecificSignerOverride() throws IOException {
ClientConfiguration clientConfiguration = null;
Configuration config;
String signerOverride = "testSigner";
String ddbSignerOverride = "testDdbSigner";
// Default SIGNING_ALGORITHM, overridden for S3
config = new Configuration();
config.set(SIGNING_ALGORITHM_DDB, ddbSignerOverride);
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_DDB);
Assert.assertEquals(ddbSignerOverride,
clientConfiguration.getSignerOverride());
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_S3);
Assert.assertNull(clientConfiguration.getSignerOverride());
// Configured base SIGNING_ALGORITHM, overridden for S3
config = new Configuration();
config.set(SIGNING_ALGORITHM, signerOverride);
config.set(SIGNING_ALGORITHM_DDB, ddbSignerOverride);
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_DDB);
Assert.assertEquals(ddbSignerOverride,
clientConfiguration.getSignerOverride());
clientConfiguration = S3AUtils
.createAwsConf(config, "dontcare", AWS_SERVICE_IDENTIFIER_S3);
Assert
.assertEquals(signerOverride, clientConfiguration.getSignerOverride());
}
}

View File

@ -368,7 +368,7 @@ public <E extends Exception> E expectedSessionRequestFailure(
DurationInfo ignored = new DurationInfo(LOG, "requesting credentials")) {
Configuration conf = new Configuration(getContract().getConf());
ClientConfiguration awsConf =
S3AUtils.createAwsConf(conf, null);
S3AUtils.createAwsConf(conf, null, AWS_SERVICE_IDENTIFIER_STS);
return intercept(clazz, exceptionText,
() -> {
AWSSecurityTokenService tokenService =

View File

@ -680,7 +680,7 @@ public static MarshalledCredentials requestSessionCredentials(
MarshalledCredentials sc = MarshalledCredentialBinding
.requestSessionCredentials(
buildAwsCredentialsProvider(conf),
S3AUtils.createAwsConf(conf, bucket),
S3AUtils.createAwsConf(conf, bucket, AWS_SERVICE_IDENTIFIER_STS),
conf.getTrimmed(ASSUMED_ROLE_STS_ENDPOINT,
DEFAULT_ASSUMED_ROLE_STS_ENDPOINT),
conf.getTrimmed(ASSUMED_ROLE_STS_ENDPOINT_REGION,

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SignableRequest;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.Signer;
import com.amazonaws.auth.SignerFactory;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
/**
* Tests for the SignerManager.
*/
public class TestSignerManager {
@Rule
public Timeout testTimeout = new Timeout(
10_000L, TimeUnit.MILLISECONDS
);
@Test
public void testCustomSignerFailureIfNotRegistered() throws Exception {
LambdaTestUtils.intercept(Exception.class,
() -> SignerFactory.createSigner("testsignerUnregistered", null));
// Expecting generic Exception.class to handle future implementation
// changes.
// For now, this is an NPE
}
@Test
public void testCustomSignerInitialization() {
Configuration config = new Configuration();
SignerForTest1.reset();
SignerForTest2.reset();
config.set(CUSTOM_SIGNERS, "testsigner1:" + SignerForTest1.class.getName());
SignerManager signerManager = new SignerManager();
signerManager.initCustomSigners(config);
Signer s1 = SignerFactory.createSigner("testsigner1", null);
s1.sign(null, null);
Assertions.assertThat(SignerForTest1.initialized)
.as(SignerForTest1.class.getName() + " not initialized")
.isEqualTo(true);
}
@Test
public void testMultipleCustomSignerInitialization() {
Configuration config = new Configuration();
SignerForTest1.reset();
SignerForTest2.reset();
config.set(CUSTOM_SIGNERS,
"testsigner1:" + SignerForTest1.class.getName() + "," + "testsigner2:"
+ SignerForTest2.class.getName());
SignerManager signerManager = new SignerManager();
signerManager.initCustomSigners(config);
Signer s1 = SignerFactory.createSigner("testsigner1", null);
s1.sign(null, null);
Assertions.assertThat(SignerForTest1.initialized)
.as(SignerForTest1.class.getName() + " not initialized")
.isEqualTo(true);
Signer s2 = SignerFactory.createSigner("testsigner2", null);
s2.sign(null, null);
Assertions.assertThat(SignerForTest2.initialized)
.as(SignerForTest2.class.getName() + " not initialized")
.isEqualTo(true);
}
/**
* SignerForTest1.
*/
@Private
public static class SignerForTest1 implements Signer {
private static boolean initialized = false;
@Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
initialized = true;
}
public static void reset() {
initialized = false;
}
}
/**
* SignerForTest2.
*/
@Private
public static class SignerForTest2 implements Signer {
private static boolean initialized = false;
@Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
initialized = true;
}
public static void reset() {
initialized = false;
}
}
}