HADOOP-16599. Allow a SignerInitializer to be specified along with a Custom Signer

This commit is contained in:
Siddharth Seth 2019-10-02 16:03:48 -07:00 committed by GitHub
parent b09d389001
commit 559ee277f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1060 additions and 161 deletions

View File

@ -352,26 +352,30 @@ public final class Constants {
/**
* List of custom Signers. The signer class will be loaded, and the signer
* name will be associated with this signer class in the S3 SDK. e.g. Single
* CustomSigner {@literal ->} 'CustomSigner:org.apache...CustomSignerClass Multiple
* CustomSigners {@literal ->} 'CSigner1:CustomSignerClass1,CSigner2:CustomerSignerClass2
* name will be associated with this signer class in the S3 SDK.
* Examples
* CustomSigner {@literal ->} 'CustomSigner:org.apache...CustomSignerClass'
* CustomSigners {@literal ->} 'CSigner1:CSigner1Class,CSigner2:CSigner2Class'
* Initializer {@literal ->} 'CSigner1:CSigner1Class:CSigner1InitializerClass'
* With Existing {@literal ->} 'AWS4Signer,CSigner1,CSigner2:CSigner2Class'
*/
public static final String CUSTOM_SIGNERS = "fs.s3a.custom.signers";
/**
* There's 3 parameters that can be used to specify a non-default signing
* algorithm. fs.s3a.signing-algorithm - This property has existed for the
* longest time. If specified, without either of the other 2 properties being
* specified, this signing algorithm will be used for S3 and DDB (S3Guard).
* The other 2 properties override this value for S3 or DDB.
* algorithm.<br>
* fs.s3a.signing-algorithm - This property has existed for the longest time.
* If specified, without either of the other 2 properties being specified,
* this signing algorithm will be used for S3 and DDB (S3Guard). <br>
* The other 2 properties override this value for S3 or DDB. <br>
* fs.s3a.s3.signing-algorithm - Allows overriding the S3 Signing algorithm.
* This does not affect DDB. Specifying this property without specifying
* fs.s3a.signing-algorithm will only update the signing algorithm for S3
* requests, and the default will be used for DDB fs.s3a.ddb.signing-algorithm
* - Allows overriding the DDB Signing algorithm. This does not affect S3.
* Specifying this property without specifying fs.s3a.signing-algorithm will
* only update the signing algorithm for DDB requests, and the default will be
* used for S3
* requests, and the default will be used for DDB.<br>
* fs.s3a.ddb.signing-algorithm - Allows overriding the DDB Signing algorithm.
* This does not affect S3. Specifying this property without specifying
* fs.s3a.signing-algorithm will only update the signing algorithm for
* DDB requests, and the default will be used for S3.
*/
public static final String SIGNING_ALGORITHM = "fs.s3a.signing-algorithm";

View File

@ -95,6 +95,8 @@ import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Globber;
import org.apache.hadoop.fs.s3a.auth.SignerManager;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
@ -108,6 +110,7 @@ import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -182,7 +185,7 @@ import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AFileSystem extends FileSystem implements StreamCapabilities,
AWSPolicyProvider {
AWSPolicyProvider, DelegationTokenProvider {
/**
* Default blocksize as used in blocksize and FS status queries.
*/
@ -362,8 +365,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
}
useListV1 = (listVersion == 1);
signerManager = new SignerManager();
signerManager.initCustomSigners(conf);
signerManager = new SignerManager(bucket, this, conf, owner);
signerManager.initCustomSigners();
// creates the AWS client, including overriding auth chain if
// the FS came with a DT
@ -1335,6 +1338,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
return renameOperation.execute();
}
@Override public Token<? extends TokenIdentifier> getFsDelegationToken()
throws IOException {
return getDelegationToken(null);
}
/**
* The callbacks made by the rename and delete operations.
* This separation allows the operation to be factored out and

View File

@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Interface which can be implemented to allow initialization of any custom
* signers which may be used by the {@link S3AFileSystem}.
*/
public interface AwsSignerInitializer {
/**
* Register a store instance.
*
* @param bucketName the bucket name
* @param storeConf the store configuration
* @param dtProvider delegation token provider for the store
* @param storeUgi ugi under which the store is operating
*/
void registerStore(String bucketName, Configuration storeConf,
DelegationTokenProvider dtProvider, UserGroupInformation storeUgi);
/**
* Unregister a store instance.
*
* @param bucketName the bucket name
* @param storeConf the store configuration
* @param dtProvider delegation token provider for the store
* @param storeUgi ugi under which the store is operating
*/
void unregisterStore(String bucketName, Configuration storeConf,
DelegationTokenProvider dtProvider, UserGroupInformation storeUgi);
}

View File

@ -15,16 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
package org.apache.hadoop.fs.s3a.auth;
import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import com.amazonaws.auth.Signer;
import com.amazonaws.auth.SignerFactory;
import java.io.Closeable;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
@ -36,17 +42,28 @@ public class SignerManager implements Closeable {
private static final Logger LOG = LoggerFactory
.getLogger(SignerManager.class);
private final List<AwsSignerInitializer> initializers = new LinkedList<>();
public SignerManager() {
private final String bucketName;
private final DelegationTokenProvider delegationTokenProvider;
private final Configuration ownerConf;
private final UserGroupInformation ownerUgi;
public SignerManager(String bucketName,
DelegationTokenProvider delegationTokenProvider, Configuration ownerConf,
UserGroupInformation ownerUgi) {
this.bucketName = bucketName;
this.delegationTokenProvider = delegationTokenProvider;
this.ownerConf = ownerConf;
this.ownerUgi = ownerUgi;
}
/**
* Initialize custom signers and register them with the AWS SDK.
*
* @param conf Hadoop configuration
*/
public void initCustomSigners(Configuration conf) {
String[] customSigners = conf.getTrimmedStrings(CUSTOM_SIGNERS);
public void initCustomSigners() {
String[] customSigners = ownerConf.getTrimmedStrings(CUSTOM_SIGNERS);
if (customSigners == null || customSigners.length == 0) {
// No custom signers specified, nothing to do.
LOG.debug("No custom signers specified");
@ -55,15 +72,41 @@ public class SignerManager implements Closeable {
for (String customSigner : customSigners) {
String[] parts = customSigner.split(":");
if (parts.length != 2) {
String message =
"Invalid format (Expected name:SignerClass) for CustomSigner: ["
+ customSigner
+ "]";
if (!(parts.length == 1 || parts.length == 2 || parts.length == 3)) {
String message = "Invalid format (Expected name, name:SignerClass,"
+ " name:SignerClass:SignerInitializerClass)"
+ " for CustomSigner: [" + customSigner + "]";
LOG.error(message);
throw new IllegalArgumentException(message);
}
maybeRegisterSigner(parts[0], parts[1], conf);
if (parts.length == 1) {
// Nothing to do. Trying to use a pre-defined Signer
} else {
// Register any custom Signer
maybeRegisterSigner(parts[0], parts[1], ownerConf);
// If an initializer is specified, take care of instantiating it and
// setting it up
if (parts.length == 3) {
Class<? extends AwsSignerInitializer> clazz = null;
try {
clazz = (Class<? extends AwsSignerInitializer>) ownerConf
.getClassByName(parts[2]);
} catch (ClassNotFoundException e) {
throw new RuntimeException(String.format(
"SignerInitializer class" + " [%s] not found for signer [%s]",
parts[2], parts[0]), e);
}
LOG.debug("Creating signer initializer: [{}] for signer: [{}]",
parts[2], parts[0]);
AwsSignerInitializer signerInitializer = ReflectionUtils
.newInstance(clazz, null);
initializers.add(signerInitializer);
signerInitializer
.registerStore(bucketName, ownerConf, delegationTokenProvider,
ownerUgi);
}
}
}
}
@ -93,7 +136,12 @@ public class SignerManager implements Closeable {
}
}
@Override
public void close() throws IOException {
@Override public void close() throws IOException {
LOG.debug("Unregistering fs from {} initializers", initializers.size());
for (AwsSignerInitializer initializer : initializers) {
initializer
.unregisterStore(bucketName, ownerConf, delegationTokenProvider,
ownerUgi);
}
}
}
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth.delegation;
import java.io.IOException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
/**
* Interface for S3A Delegation Token access.
*/
public interface DelegationTokenProvider {
Token<? extends TokenIdentifier> getFsDelegationToken() throws IOException;
}

View File

@ -1879,3 +1879,61 @@ To disable checksum verification in `distcp`, use the `-skipcrccheck` option:
hadoop distcp -update -skipcrccheck -numListstatusThreads 40 /user/alice/datasets s3a://alice-backup/datasets
```
### <a name="customsigners"></a> Advanced - Custom Signers
AWS uees request signing to authenticate requests. In general, there should
be no need to override the signers, and the defaults work out of the box.
If, however, this is required - this section talks about how to configure
custom signers. Theres 2 broad config categories to be set - one for
registering a custom signer and another to specify usage.
#### Registering Custom Signers
```xml
<property>
<name>fs.s3a.custom.signers</name>
<value>comma separated list of signers</value>
<!-- Example
<value>AWS4SignerType,CS1:CS1ClassName,CS2:CS2ClassName:CS2InitClass</value>
-->
</property>
```
Acceptable value for each custom signer
`SignerName`- this is used in case one of the default signers is being used.
(E.g `AWS4SignerType`, `QueryStringSignerType`, `AWSS3V4SignerType`).
If no custom signers are being used - this value does not need to be set.
`SignerName:SignerClassName` - register a new signer with the specified name,
and the class for this signer.
The Signer Class must implement `com.amazonaws.auth.Signer`.
`SignerName:SignerClassName:SignerInitializerClassName` - similar time above
except also allows for a custom SignerInitializer
(`org.apache.hadoop.fs.s3a.AwsSignerInitializer`) class to be specified.
#### Usage of the Signers
Signers can be set at a per service level(S3, dynamodb, etc) or a common
signer for all services.
```xml
<property>
<name>fs.s3a.s3.signing-algorithm</name>
<value>${S3SignerName}</value>
<description>Specify the signer for S3</description>
</property>
<property>
<name>fs.s3a.ddb.signing-algorithm</name>
<value>${DdbSignerName}</value>
<description>Specify the signer for DDB</description>
</property>
<property>
<name>fs.s3a.signing-algorithm</name>
<value>${SignerName}</value>
</property>
```
For a specific service, the service specific signer is looked up first.
If that is not specified, the common signer is looked up. If this is
not specified as well, SDK settings are used.

View File

@ -1,130 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.util.concurrent.TimeUnit;
import com.amazonaws.SignableRequest;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.Signer;
import com.amazonaws.auth.SignerFactory;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
/**
* Tests for the SignerManager.
*/
public class TestSignerManager {
@Rule
public Timeout testTimeout = new Timeout(
10_000L, TimeUnit.MILLISECONDS
);
@Test
public void testCustomSignerFailureIfNotRegistered() throws Exception {
LambdaTestUtils.intercept(Exception.class,
() -> SignerFactory.createSigner("testsignerUnregistered", null));
// Expecting generic Exception.class to handle future implementation
// changes.
// For now, this is an NPE
}
@Test
public void testCustomSignerInitialization() {
Configuration config = new Configuration();
SignerForTest1.reset();
SignerForTest2.reset();
config.set(CUSTOM_SIGNERS, "testsigner1:" + SignerForTest1.class.getName());
SignerManager signerManager = new SignerManager();
signerManager.initCustomSigners(config);
Signer s1 = SignerFactory.createSigner("testsigner1", null);
s1.sign(null, null);
Assertions.assertThat(SignerForTest1.initialized)
.as(SignerForTest1.class.getName() + " not initialized")
.isEqualTo(true);
}
@Test
public void testMultipleCustomSignerInitialization() {
Configuration config = new Configuration();
SignerForTest1.reset();
SignerForTest2.reset();
config.set(CUSTOM_SIGNERS,
"testsigner1:" + SignerForTest1.class.getName() + "," + "testsigner2:"
+ SignerForTest2.class.getName());
SignerManager signerManager = new SignerManager();
signerManager.initCustomSigners(config);
Signer s1 = SignerFactory.createSigner("testsigner1", null);
s1.sign(null, null);
Assertions.assertThat(SignerForTest1.initialized)
.as(SignerForTest1.class.getName() + " not initialized")
.isEqualTo(true);
Signer s2 = SignerFactory.createSigner("testsigner2", null);
s2.sign(null, null);
Assertions.assertThat(SignerForTest2.initialized)
.as(SignerForTest2.class.getName() + " not initialized")
.isEqualTo(true);
}
/**
* SignerForTest1.
*/
@Private
public static class SignerForTest1 implements Signer {
private static boolean initialized = false;
@Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
initialized = true;
}
public static void reset() {
initialized = false;
}
}
/**
* SignerForTest2.
*/
@Private
public static class SignerForTest2 implements Signer {
private static boolean initialized = false;
@Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
initialized = true;
}
public static void reset() {
initialized = false;
}
}
}

View File

@ -0,0 +1,237 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import com.amazonaws.SignableRequest;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.Signer;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.internal.AWSS3V4Signer;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
import org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider;
import org.apache.hadoop.fs.s3a.auth.ITestCustomSigner.CustomSignerInitializer.StoreValue;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.security.UserGroupInformation;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
import static org.apache.hadoop.fs.s3a.Constants.SIGNING_ALGORITHM_S3;
/**
* Tests for custom Signers and SignerInitializers.
*/
public class ITestCustomSigner extends AbstractS3ATestBase {
private static final Logger LOG = LoggerFactory
.getLogger(ITestCustomSigner.class);
private static final String TEST_ID_KEY = "TEST_ID_KEY";
private static final String TEST_REGION_KEY = "TEST_REGION_KEY";
private String regionName;
@Override
public void setup() throws Exception {
super.setup();
regionName = determineRegion(getFileSystem().getBucket());
LOG.info("Determined region name to be [{}] for bucket [{}]", regionName,
getFileSystem().getBucket());
}
@Test
public void testCustomSignerAndInitializer()
throws IOException, InterruptedException {
UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser("user1");
FileSystem fs1 = runMkDirAndVerify(ugi1, "/customsignerpath1", "id1");
UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser("user2");
FileSystem fs2 = runMkDirAndVerify(ugi2, "/customsignerpath2", "id2");
Assertions.assertThat(CustomSignerInitializer.knownStores.size())
.as("Num registered stores mismatch").isEqualTo(2);
fs1.close();
Assertions.assertThat(CustomSignerInitializer.knownStores.size())
.as("Num registered stores mismatch").isEqualTo(1);
fs2.close();
Assertions.assertThat(CustomSignerInitializer.knownStores.size())
.as("Num registered stores mismatch").isEqualTo(0);
}
private FileSystem runMkDirAndVerify(UserGroupInformation ugi,
String pathString, String identifier)
throws IOException, InterruptedException {
Configuration conf = createTestConfig(identifier);
Path path = new Path(pathString);
path = path.makeQualified(getFileSystem().getUri(),
getFileSystem().getWorkingDirectory());
Path finalPath = path;
return ugi.doAs((PrivilegedExceptionAction<FileSystem>) () -> {
int invocationCount = CustomSigner.invocationCount;
FileSystem fs = finalPath.getFileSystem(conf);
fs.mkdirs(finalPath);
Assertions.assertThat(CustomSigner.invocationCount)
.as("Invocation count lower than expected")
.isGreaterThan(invocationCount);
Assertions.assertThat(CustomSigner.lastStoreValue)
.as("Store value should not be null").isNotNull();
Assertions.assertThat(CustomSigner.lastStoreValue.conf)
.as("Configuration should not be null").isNotNull();
Assertions.assertThat(CustomSigner.lastStoreValue.conf.get(TEST_ID_KEY))
.as("Configuration TEST_KEY mismatch").isEqualTo(identifier);
return fs;
});
}
private Configuration createTestConfig(String identifier) {
Configuration conf = createConfiguration();
conf.set(CUSTOM_SIGNERS,
"CustomS3Signer:" + CustomSigner.class.getName() + ":"
+ CustomSignerInitializer.class.getName());
conf.set(SIGNING_ALGORITHM_S3, "CustomS3Signer");
conf.set(TEST_ID_KEY, identifier);
conf.set(TEST_REGION_KEY, regionName);
return conf;
}
private String determineRegion(String bucketName) throws IOException {
AmazonS3 s3 = AmazonS3ClientBuilder.standard().withCredentials(
new SimpleAWSCredentialsProvider(null, createConfiguration()))
.withForceGlobalBucketAccessEnabled(true).withRegion("us-east-1")
.build();
String region = s3.getBucketLocation(bucketName);
// See: https://forums.aws.amazon.com/thread.jspa?messageID=796829&tstart=0
if (region.equals("US")) {
region = "us-east-1";
}
return region;
}
@Private
public static final class CustomSigner implements Signer {
private static int invocationCount = 0;
private static StoreValue lastStoreValue;
@Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
invocationCount++;
String host = request.getEndpoint().getHost();
String bucketName = host.split("\\.")[0];
try {
lastStoreValue = CustomSignerInitializer
.getStoreValue(bucketName, UserGroupInformation.getCurrentUser());
} catch (IOException e) {
throw new RuntimeException("Failed to get current Ugi", e);
}
AWSS3V4Signer realSigner = new AWSS3V4Signer();
realSigner.setServiceName("s3");
realSigner.setRegionName(lastStoreValue.conf.get(TEST_REGION_KEY));
realSigner.sign(request, credentials);
}
}
@Private
public static final class CustomSignerInitializer
implements AwsSignerInitializer {
private static final Map<StoreKey, StoreValue> knownStores = new HashMap<>();
@Override
public void registerStore(String bucketName, Configuration storeConf,
DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) {
StoreKey storeKey = new StoreKey(bucketName, storeUgi);
StoreValue storeValue = new StoreValue(storeConf, dtProvider);
knownStores.put(storeKey, storeValue);
}
@Override
public void unregisterStore(String bucketName, Configuration storeConf,
DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) {
StoreKey storeKey = new StoreKey(bucketName, storeUgi);
knownStores.remove(storeKey);
}
public static StoreValue getStoreValue(String bucketName,
UserGroupInformation ugi) {
StoreKey storeKey = new StoreKey(bucketName, ugi);
return knownStores.get(storeKey);
}
private static class StoreKey {
private final String bucketName;
private final UserGroupInformation ugi;
public StoreKey(String bucketName, UserGroupInformation ugi) {
this.bucketName = bucketName;
this.ugi = ugi;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StoreKey storeKey = (StoreKey) o;
return Objects.equals(bucketName, storeKey.bucketName) && Objects
.equals(ugi, storeKey.ugi);
}
@Override
public int hashCode() {
return Objects.hash(bucketName, ugi);
}
}
static class StoreValue {
private final Configuration conf;
private final DelegationTokenProvider dtProvider;
public StoreValue(Configuration conf,
DelegationTokenProvider dtProvider) {
this.conf = conf;
this.dtProvider = dtProvider;
}
}
}
}

View File

@ -0,0 +1,590 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.fs.s3a.auth;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.DefaultRequest;
import com.amazonaws.SignableRequest;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.Signer;
import com.amazonaws.auth.SignerFactory;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.auth.TestSignerManager.SignerInitializerForTest.StoreValue;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.LambdaTestUtils;
import static org.apache.hadoop.fs.s3a.Constants.CUSTOM_SIGNERS;
/**
* Tests for the SignerManager.
*/
public class TestSignerManager {
private static final Text TEST_TOKEN_KIND = new Text("TestTokenKind");
private static final Text TEST_TOKEN_SERVICE = new Text("TestTokenService");
private static final String TEST_KEY_IDENTIFIER = "TEST_KEY_IDENTIFIER";
private static final String BUCKET1 = "bucket1";
private static final String BUCKET2 = "bucket2";
private static final String TESTUSER1 = "testuser1";
private static final String TESTUSER2 = "testuser2";
@Rule public Timeout testTimeout = new Timeout(10_000L,
TimeUnit.MILLISECONDS);
@Before
public void beforeTest() {
SignerForTest1.reset();
SignerForTest2.reset();
SignerInitializerForTest.reset();
SignerForInitializerTest.reset();
SignerInitializer2ForTest.reset();
}
@Test
public void testPredefinedSignerInitialization() throws IOException {
// Try initializing a pre-defined Signer type.
// Should run through without an exception.
Configuration config = new Configuration();
// Pre-defined signer types as of AWS-SDK 1.11.563
// AWS4SignerType, QueryStringSignerType, AWSS3V4SignerType
config.set(CUSTOM_SIGNERS, "AWS4SignerType");
SignerManager signerManager = new SignerManager("dontcare", null, config,
UserGroupInformation.getCurrentUser());
signerManager.initCustomSigners();
}
@Test
public void testCustomSignerFailureIfNotRegistered() throws Exception {
Configuration config = new Configuration();
config.set(CUSTOM_SIGNERS, "testsignerUnregistered");
SignerManager signerManager = new SignerManager("dontcare", null, config,
UserGroupInformation.getCurrentUser());
// Make sure the config is respected.
signerManager.initCustomSigners();
// Simulate a call from the AWS SDK to create the signer.
LambdaTestUtils.intercept(Exception.class,
() -> SignerFactory.createSigner("testsignerUnregistered", null));
// Expecting generic Exception.class to handle future implementation
// changes.
// For now, this is an NPE
}
@Test
public void testCustomSignerInitialization() throws IOException {
Configuration config = new Configuration();
config.set(CUSTOM_SIGNERS, "testsigner1:" + SignerForTest1.class.getName());
SignerManager signerManager = new SignerManager("dontcare", null, config,
UserGroupInformation.getCurrentUser());
signerManager.initCustomSigners();
Signer s1 = SignerFactory.createSigner("testsigner1", null);
s1.sign(null, null);
Assertions.assertThat(SignerForTest1.initialized)
.as(SignerForTest1.class.getName() + " not initialized")
.isEqualTo(true);
}
@Test
public void testMultipleCustomSignerInitialization() throws IOException {
Configuration config = new Configuration();
config.set(CUSTOM_SIGNERS,
"testsigner1:" + SignerForTest1.class.getName() + "," + "testsigner2:"
+ SignerForTest2.class.getName());
SignerManager signerManager = new SignerManager("dontcare", null, config,
UserGroupInformation.getCurrentUser());
signerManager.initCustomSigners();
Signer s1 = SignerFactory.createSigner("testsigner1", null);
s1.sign(null, null);
Assertions.assertThat(SignerForTest1.initialized)
.as(SignerForTest1.class.getName() + " not initialized")
.isEqualTo(true);
Signer s2 = SignerFactory.createSigner("testsigner2", null);
s2.sign(null, null);
Assertions.assertThat(SignerForTest2.initialized)
.as(SignerForTest2.class.getName() + " not initialized")
.isEqualTo(true);
}
@Test
public void testSimpleSignerInitializer() throws IOException {
Configuration config = new Configuration();
config.set(CUSTOM_SIGNERS,
"testsigner1:" + SignerForTest1.class.getName() + ":"
+ SignerInitializerForTest.class.getName());
Token<? extends TokenIdentifier> token = createTokenForTest("identifier");
DelegationTokenProvider dtProvider = new DelegationTokenProviderForTest(
token);
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser("testuser");
SignerManager signerManager = new SignerManager("bucket1", dtProvider,
config, ugi);
signerManager.initCustomSigners();
Assertions.assertThat(SignerInitializerForTest.instanceCount)
.as(SignerInitializerForTest.class.getName()
+ " creation count mismatch").isEqualTo(1);
Assertions.assertThat(SignerInitializerForTest.registerCount)
.as(SignerInitializerForTest.class.getName()
+ " registration count mismatch").isEqualTo(1);
Assertions.assertThat(SignerInitializerForTest.unregisterCount)
.as(SignerInitializerForTest.class.getName()
+ " registration count mismatch").isEqualTo(0);
signerManager.close();
Assertions.assertThat(SignerInitializerForTest.unregisterCount)
.as(SignerInitializerForTest.class.getName()
+ " registration count mismatch").isEqualTo(1);
}
@Test
public void testMultipleSignerInitializers() throws IOException {
Configuration config = new Configuration();
config.set(CUSTOM_SIGNERS,
"testsigner1:" + SignerForTest1.class.getName() + ":"
+ SignerInitializerForTest.class.getName() + "," // 2nd signer
+ "testsigner2:" + SignerForTest2.class.getName() + ","
// 3rd signer
+ "testsigner3:" + SignerForTest2.class.getName() + ":"
+ SignerInitializer2ForTest.class.getName());
Token<? extends TokenIdentifier> token = createTokenForTest("identifier");
DelegationTokenProvider dtProvider = new DelegationTokenProviderForTest(
token);
UserGroupInformation ugi = UserGroupInformation
.createRemoteUser("testuser");
SignerManager signerManager = new SignerManager("bucket1", dtProvider,
config, ugi);
signerManager.initCustomSigners();
Assertions.assertThat(SignerInitializerForTest.instanceCount)
.as(SignerInitializerForTest.class.getName()
+ " creation count mismatch").isEqualTo(1);
Assertions.assertThat(SignerInitializerForTest.registerCount)
.as(SignerInitializerForTest.class.getName()
+ " registration count mismatch").isEqualTo(1);
Assertions.assertThat(SignerInitializerForTest.unregisterCount)
.as(SignerInitializerForTest.class.getName()
+ " registration count mismatch").isEqualTo(0);
Assertions.assertThat(SignerInitializer2ForTest.instanceCount)
.as(SignerInitializer2ForTest.class.getName()
+ " creation count mismatch").isEqualTo(1);
Assertions.assertThat(SignerInitializer2ForTest.registerCount)
.as(SignerInitializer2ForTest.class.getName()
+ " registration count mismatch").isEqualTo(1);
Assertions.assertThat(SignerInitializer2ForTest.unregisterCount)
.as(SignerInitializer2ForTest.class.getName()
+ " registration count mismatch").isEqualTo(0);
signerManager.close();
Assertions.assertThat(SignerInitializerForTest.unregisterCount)
.as(SignerInitializerForTest.class.getName()
+ " registration count mismatch").isEqualTo(1);
Assertions.assertThat(SignerInitializer2ForTest.unregisterCount)
.as(SignerInitializer2ForTest.class.getName()
+ " registration count mismatch").isEqualTo(1);
}
@Test
public void testSignerInitializerMultipleInstances()
throws IOException, InterruptedException {
String id1 = "id1";
String id2 = "id2";
String id3 = "id3";
UserGroupInformation ugiU1 = UserGroupInformation
.createRemoteUser(TESTUSER1);
UserGroupInformation ugiU2 = UserGroupInformation
.createRemoteUser(TESTUSER2);
SignerManager signerManagerU1B1 = fakeS3AInstanceCreation(id1,
SignerForInitializerTest.class, SignerInitializerForTest.class, BUCKET1,
ugiU1);
SignerManager signerManagerU2B1 = fakeS3AInstanceCreation(id2,
SignerForInitializerTest.class, SignerInitializerForTest.class, BUCKET1,
ugiU2);
SignerManager signerManagerU2B2 = fakeS3AInstanceCreation(id3,
SignerForInitializerTest.class, SignerInitializerForTest.class, BUCKET2,
ugiU2);
Assertions.assertThat(SignerInitializerForTest.instanceCount)
.as(SignerInitializerForTest.class.getName()
+ " creation count mismatch").isEqualTo(3);
Assertions.assertThat(SignerInitializerForTest.registerCount)
.as(SignerInitializerForTest.class.getName()
+ " registration count mismatch").isEqualTo(3);
Assertions.assertThat(SignerInitializerForTest.unregisterCount)
.as(SignerInitializerForTest.class.getName()
+ " registration count mismatch").isEqualTo(0);
// Simulate U1B1 making a request
attemptSignAndVerify(id1, BUCKET1, ugiU1, false);
// Simulate U2B1 making a request
attemptSignAndVerify(id2, BUCKET1, ugiU2, false);
// Simulate U2B2 making a request
attemptSignAndVerify(id3, BUCKET2, ugiU2, false);
// Simulate U1B2 (not defined - so Signer should get a null)
attemptSignAndVerify("dontcare", BUCKET2, ugiU1, true);
closeAndVerifyNull(signerManagerU1B1, BUCKET1, ugiU1, 2);
closeAndVerifyNull(signerManagerU2B2, BUCKET2, ugiU2, 1);
closeAndVerifyNull(signerManagerU2B1, BUCKET1, ugiU2, 0);
Assertions.assertThat(SignerInitializerForTest.unregisterCount)
.as(SignerInitializerForTest.class.getName()
+ " registration count mismatch").isEqualTo(3);
}
private void attemptSignAndVerify(String identifier, String bucket,
UserGroupInformation ugi, boolean expectNullStoreInfo)
throws IOException, InterruptedException {
ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
Signer signer = new SignerForInitializerTest();
SignableRequest<?> signableRequest = constructSignableRequest(bucket);
signer.sign(signableRequest, null);
verifyStoreValueInSigner(expectNullStoreInfo, bucket, identifier);
return null;
});
}
private void verifyStoreValueInSigner(boolean expectNull, String bucketName,
String identifier) throws IOException {
if (expectNull) {
Assertions.assertThat(SignerForInitializerTest.retrievedStoreValue)
.as("Retrieved store value expected to be null").isNull();
} else {
StoreValue storeValue = SignerForInitializerTest.retrievedStoreValue;
Assertions.assertThat(storeValue).as("StoreValue should not be null")
.isNotNull();
Assertions.assertThat(storeValue.getBucketName())
.as("Bucket Name mismatch").isEqualTo(bucketName);
Configuration conf = storeValue.getStoreConf();
Assertions.assertThat(conf).as("Configuration should not be null")
.isNotNull();
Assertions.assertThat(conf.get(TEST_KEY_IDENTIFIER))
.as("Identifier mistmatch").isEqualTo(identifier);
Token<? extends TokenIdentifier> token = storeValue.getDtProvider()
.getFsDelegationToken();
String tokenId = new String(token.getIdentifier(),
StandardCharsets.UTF_8);
Assertions.assertThat(tokenId)
.as("Mismatch in delegation token identifier").isEqualTo(
createTokenIdentifierString(identifier, bucketName,
UserGroupInformation.getCurrentUser().getShortUserName()));
}
}
private void closeAndVerifyNull(Closeable closeable, String bucketName,
UserGroupInformation ugi, int expectedCount)
throws IOException, InterruptedException {
closeable.close();
attemptSignAndVerify("dontcare", bucketName, ugi, true);
Assertions.assertThat(SignerInitializerForTest.storeCache.size())
.as("StoreCache size mismatch").isEqualTo(expectedCount);
}
/**
* SignerForTest1.
*/
@Private
public static class SignerForTest1 implements Signer {
private static boolean initialized = false;
@Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
initialized = true;
}
public static void reset() {
initialized = false;
}
}
/**
* SignerForTest2.
*/
@Private
public static class SignerForTest2 implements Signer {
private static boolean initialized = false;
@Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
initialized = true;
}
public static void reset() {
initialized = false;
}
}
/**
* SignerInitializerForTest.
*/
@Private
public static class SignerInitializerForTest implements AwsSignerInitializer {
private static int registerCount = 0;
private static int unregisterCount = 0;
private static int instanceCount = 0;
private static final Map<StoreKey, StoreValue> storeCache = new HashMap<>();
public SignerInitializerForTest() {
instanceCount++;
}
@Override
public void registerStore(String bucketName, Configuration storeConf,
DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) {
registerCount++;
StoreKey storeKey = new StoreKey(bucketName, storeUgi);
StoreValue storeValue = new StoreValue(bucketName, storeConf, dtProvider);
storeCache.put(storeKey, storeValue);
}
@Override
public void unregisterStore(String bucketName, Configuration storeConf,
DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) {
unregisterCount++;
StoreKey storeKey = new StoreKey(bucketName, storeUgi);
storeCache.remove(storeKey);
}
public static void reset() {
registerCount = 0;
unregisterCount = 0;
instanceCount = 0;
storeCache.clear();
}
public static StoreValue getStoreInfo(String bucketName,
UserGroupInformation storeUgi) {
StoreKey storeKey = new StoreKey(bucketName, storeUgi);
return storeCache.get(storeKey);
}
private static class StoreKey {
private final String bucketName;
private final UserGroupInformation ugi;
public StoreKey(String bucketName, UserGroupInformation ugi) {
this.bucketName = bucketName;
this.ugi = ugi;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StoreKey storeKey = (StoreKey) o;
return Objects.equals(bucketName, storeKey.bucketName) && Objects
.equals(ugi, storeKey.ugi);
}
@Override
public int hashCode() {
return Objects.hash(bucketName, ugi);
}
}
static class StoreValue {
private final String bucketName;
private final Configuration storeConf;
private final DelegationTokenProvider dtProvider;
public StoreValue(String bucketName, Configuration storeConf,
DelegationTokenProvider dtProvider) {
this.bucketName = bucketName;
this.storeConf = storeConf;
this.dtProvider = dtProvider;
}
String getBucketName() {
return bucketName;
}
Configuration getStoreConf() {
return storeConf;
}
DelegationTokenProvider getDtProvider() {
return dtProvider;
}
}
}
/**
* To be used in conjunction with {@link SignerInitializerForTest}.
*/
@Private
public static class SignerForInitializerTest implements Signer {
private static StoreValue retrievedStoreValue;
@Override
public void sign(SignableRequest<?> request, AWSCredentials credentials) {
String bucketName = request.getEndpoint().getHost();
try {
retrievedStoreValue = SignerInitializerForTest
.getStoreInfo(bucketName, UserGroupInformation.getCurrentUser());
} catch (IOException e) {
throw new RuntimeException("Failed to get current ugi", e);
}
}
public static void reset() {
retrievedStoreValue = null;
}
}
/**
* DelegationTokenProviderForTest.
*/
@Private
private static class DelegationTokenProviderForTest
implements DelegationTokenProvider {
private final Token<? extends TokenIdentifier> token;
private DelegationTokenProviderForTest(
Token<? extends TokenIdentifier> token) {
this.token = token;
}
@Override
public Token<? extends TokenIdentifier> getFsDelegationToken()
throws IOException {
return this.token;
}
}
/**
* SignerInitializer2ForTest.
*/
@Private
public static class SignerInitializer2ForTest
implements AwsSignerInitializer {
private static int registerCount = 0;
private static int unregisterCount = 0;
private static int instanceCount = 0;
public SignerInitializer2ForTest() {
instanceCount++;
}
@Override
public void registerStore(String bucketName, Configuration storeConf,
DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) {
registerCount++;
}
@Override
public void unregisterStore(String bucketName, Configuration storeConf,
DelegationTokenProvider dtProvider, UserGroupInformation storeUgi) {
unregisterCount++;
}
public static void reset() {
registerCount = 0;
unregisterCount = 0;
instanceCount = 0;
}
}
private Token<? extends TokenIdentifier> createTokenForTest(String idString) {
byte[] identifier = idString.getBytes(StandardCharsets.UTF_8);
byte[] password = "notapassword".getBytes(StandardCharsets.UTF_8);
Token<? extends TokenIdentifier> token = new Token<>(identifier, password,
TEST_TOKEN_KIND, TEST_TOKEN_SERVICE);
return token;
}
private SignerManager fakeS3AInstanceCreation(String identifier,
Class<? extends Signer> signerClazz,
Class<? extends AwsSignerInitializer> signerInitializerClazz,
String bucketName, UserGroupInformation ugi) {
// Simulate new S3A instance interactions.
Objects.requireNonNull(signerClazz, "SignerClazz missing");
Objects.requireNonNull(signerInitializerClazz,
"SignerInitializerClazzMissing");
Configuration config = new Configuration();
config.set(TEST_KEY_IDENTIFIER, identifier);
config.set(CUSTOM_SIGNERS,
signerClazz.getCanonicalName() + ":" + signerClazz.getName() + ":"
+ signerInitializerClazz.getName());
Token<? extends TokenIdentifier> token1 = createTokenForTest(
createTokenIdentifierString(identifier, bucketName,
ugi.getShortUserName()));
DelegationTokenProvider dtProvider1 = new DelegationTokenProviderForTest(
token1);
SignerManager signerManager = new SignerManager(bucketName, dtProvider1,
config, ugi);
signerManager.initCustomSigners();
return signerManager;
}
private String createTokenIdentifierString(String identifier,
String bucketName, String user) {
return identifier + "_" + bucketName + "_" + user;
}
private SignableRequest<?> constructSignableRequest(String bucketName)
throws URISyntaxException {
DefaultRequest signableRequest = new DefaultRequest(
AmazonWebServiceRequest.NOOP, "fakeservice");
URI uri = new URI("s3://" + bucketName + "/");
signableRequest.setEndpoint(uri);
return signableRequest;
}
}