HADOOP-16645. S3A Delegation Token extension point to use StoreContext.

Contributed by Steve Loughran.

This is part of the ongoing refactoring of the S3A codebase, with the
delegation token support (HADOOP-14556) no longer given a direct reference
to the owning S3AFileSystem. Instead it gets a StoreContext and a new
interface, DelegationOperations, to access those operations offered by S3AFS
which are specifically needed by the DT bindings.

The sole operation needed is listAWSPolicyRules(), which is used to allow
S3A FS and the S3Guard metastore to return the AWS policy rules needed to
access their specific services/buckets/tables, allowing the AssumedRole
delegation token to be locked down.

As further restructuring takes place, that interface's implementation
can be moved to wherever the new home for those operations ends up.

Although it changes the API of an extension point, that feature (S3
Delegation Tokens) has not shipped; backwards compatibility is not a
problem except for anyone who has implemented DT support against trunk.
To those developers: sorry.

Change-Id: I770f58b49ff7634a34875ba37b7d51c94d7c21da
This commit is contained in:
Steve Loughran 2020-01-07 11:15:53 +00:00
parent 59aac00283
commit 2bbf73f1df
11 changed files with 111 additions and 38 deletions

View File

@ -96,6 +96,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Globber;
import org.apache.hadoop.fs.s3a.auth.SignerManager;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
@ -541,7 +542,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
LOG.debug("Using delegation tokens");
S3ADelegationTokens tokens = new S3ADelegationTokens();
this.delegationTokens = Optional.of(tokens);
tokens.bindToFileSystem(getCanonicalUri(), this);
tokens.bindToFileSystem(getCanonicalUri(),
createStoreContext(),
createDelegationOperations());
tokens.init(conf);
tokens.start();
// switch to the DT provider and bypass all other configured
@ -574,6 +577,26 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
.createS3Client(getUri(), bucket, credentials, uaSuffix);
}
/**
* Implementation of all operations used by delegation tokens.
*/
private class DelegationOperationsImpl implements DelegationOperations {
@Override
public List<RoleModel.Statement> listAWSPolicyRules(final Set<AccessLevel> access) {
return S3AFileSystem.this.listAWSPolicyRules(access);
}
}
/**
* Create an instance of the delegation operations.
* @return callbacks for DT support.
*/
@VisibleForTesting
public DelegationOperations createDelegationOperations() {
return new DelegationOperationsImpl();
}
/**
* Set the encryption secrets for requests.
* @param secrets secrets

View File

@ -24,7 +24,7 @@ import java.net.URI;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
@ -53,22 +53,27 @@ public abstract class AbstractDTService
/**
* URI of the filesystem.
* Valid after {@link #bindToFileSystem(URI, S3AFileSystem)}.
* Valid after {@link #bindToFileSystem(URI, StoreContext, DelegationOperations)}.
*/
private URI canonicalUri;
/**
* The owning filesystem.
* Valid after {@link #bindToFileSystem(URI, S3AFileSystem)}.
*/
private S3AFileSystem fileSystem;
/**
* Owner of the filesystem.
* Valid after {@link #bindToFileSystem(URI, S3AFileSystem)}.
* Valid after {@link #bindToFileSystem(URI, StoreContext, DelegationOperations)}.
*/
private UserGroupInformation owner;
/**
* Store Context for callbacks into the FS.
* Valid after {@link #bindToFileSystem(URI, StoreContext, DelegationOperations)}.
*/
private StoreContext storeContext;
/**
* Callbacks for DT-related operations.
*/
private DelegationOperations policyProvider;
/**
* Protected constructor.
* @param name service name.
@ -88,18 +93,21 @@ public abstract class AbstractDTService
* is not live for actual use and will not yet have interacted with
* AWS services.
* @param uri the canonical URI of the FS.
* @param fs owning FS.
* @param context store context
* @param delegationOperations delegation operations
* @throws IOException failure.
*/
public void bindToFileSystem(
final URI uri,
final S3AFileSystem fs) throws IOException {
final StoreContext context,
final DelegationOperations delegationOperations) throws IOException {
requireServiceState(STATE.NOTINITED);
Preconditions.checkState(canonicalUri == null,
"bindToFileSystem called twice");
this.canonicalUri = requireNonNull(uri);
this.fileSystem = requireNonNull(fs);
this.owner = fs.getOwner();
this.storeContext = requireNonNull(context);
this.owner = context.getOwner();
this.policyProvider = delegationOperations;
}
/**
@ -111,14 +119,6 @@ public abstract class AbstractDTService
return canonicalUri;
}
/**
* Get the owner of the FS.
* @return the owner fs
*/
protected S3AFileSystem getFileSystem() {
return fileSystem;
}
/**
* Get the owner of this Service.
* @return owner; non-null after binding to an FS.
@ -127,6 +127,14 @@ public abstract class AbstractDTService
return owner;
}
protected StoreContext getStoreContext() {
return storeContext;
}
protected DelegationOperations getPolicyProvider() {
return policyProvider;
}
/**
* Require that the service is in a given state.
* @param state desired state.

View File

@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.auth.RoleModel;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.SecretManager;
@ -55,7 +54,8 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DURAT
* instance which created it --which itself follows the lifecycle of the FS.
*
* One big difference is that
* {@link #bindToFileSystem(URI, S3AFileSystem)} will be called
* {@link AbstractDTService#bindToFileSystem(URI, org.apache.hadoop.fs.s3a.impl.StoreContext, DelegationOperations)}
* will be called
* before the {@link #init(Configuration)} operation, this is where
* the owning FS is passed in.
*

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.auth.delegation;
/**
* All operations used for delegation which aren't in the store context.
* Initially this is just the AWS policy operations; it's
* here so that any future evolution isn't going to break the signatures of
* external DT implementations.
*/
public interface DelegationOperations extends AWSPolicyProvider {
}

View File

@ -120,7 +120,7 @@ public class FullCredentialsTokenBinding extends
"Full Credentials Token Binding",
new MarshalledCredentialProvider(
FULL_TOKEN,
getFileSystem().getUri(),
getStoreContext().getFsURI(),
getConfig(),
awsCredentials,
MarshalledCredentials.CredentialTypeRequired.AnyNonEmpty));
@ -159,9 +159,10 @@ public class FullCredentialsTokenBinding extends
convertTokenIdentifier(retrievedIdentifier,
FullCredentialsTokenIdentifier.class);
return new AWSCredentialProviderList(
"", new MarshalledCredentialProvider(
"Full Credentials Token Binding",
new MarshalledCredentialProvider(
FULL_TOKEN,
getFileSystem().getUri(),
getStoreContext().getFsURI(),
getConfig(),
tokenIdentifier.getMarshalledCredentials(),
MarshalledCredentials.CredentialTypeRequired.AnyNonEmpty));

View File

@ -110,7 +110,8 @@ public class RoleTokenBinding extends SessionTokenBinding {
return new AWSCredentialProviderList(
"Role Token Binding",
new MarshalledCredentialProvider(
COMPONENT, getFileSystem().getUri(),
COMPONENT,
getStoreContext().getFsURI(),
getConfig(),
marshalledCredentials,
MarshalledCredentials.CredentialTypeRequired.SessionOnly));

View File

@ -34,9 +34,9 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.AWSCredentialProviderList;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.auth.RoleModel;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
@ -154,11 +154,13 @@ public class S3ADelegationTokens extends AbstractDTService {
}
@Override
public void bindToFileSystem(final URI uri, final S3AFileSystem fs)
public void bindToFileSystem(final URI uri,
final StoreContext context,
final DelegationOperations delegationOperations)
throws IOException {
super.bindToFileSystem(uri, fs);
super.bindToFileSystem(uri, context, delegationOperations);
service = getTokenService(getCanonicalUri());
stats = fs.getInstrumentation().newDelegationTokenStatistics();
stats = context.getInstrumentation().newDelegationTokenStatistics();
}
/**
@ -179,7 +181,9 @@ public class S3ADelegationTokens extends AbstractDTService {
SessionTokenBinding.class,
AbstractDelegationTokenBinding.class);
tokenBinding = binding.newInstance();
tokenBinding.bindToFileSystem(getCanonicalUri(), getFileSystem());
tokenBinding.bindToFileSystem(getCanonicalUri(),
getStoreContext(),
getPolicyProvider());
tokenBinding.init(conf);
tokenBindingName = tokenBinding.getKind().toString();
LOG.debug("Filesystem {} is using delegation tokens of kind {}",
@ -415,7 +419,7 @@ public class S3ADelegationTokens extends AbstractDTService {
"Null encryption secrets");
// this isn't done in in advance as it needs S3Guard initialized in the
// filesystem before it can generate complete policies.
List<RoleModel.Statement> statements = getFileSystem()
List<RoleModel.Statement> statements = getPolicyProvider()
.listAWSPolicyRules(ACCESS_POLICY);
Optional<RoleModel.Policy> rolePolicy =
statements.isEmpty() ?

View File

@ -226,7 +226,7 @@ public class SessionTokenBinding extends AbstractDelegationTokenBinding {
"Session Token Binding",
new MarshalledCredentialProvider(
SESSION_TOKEN,
getFileSystem().getUri(),
getStoreContext().getFsURI(),
getConfig(),
marshalledCredentials,
MarshalledCredentials.CredentialTypeRequired.SessionOnly));

View File

@ -205,7 +205,10 @@ public abstract class AbstractDelegationIT extends AbstractS3ATestBase {
throws IOException {
S3AFileSystem fs = getFileSystem();
S3ADelegationTokens tokens = new S3ADelegationTokens();
tokens.bindToFileSystem(fs.getCanonicalUri(), fs);
tokens.bindToFileSystem(
fs.getCanonicalUri(),
fs.createStoreContext(),
fs.createDelegationOperations());
tokens.init(conf);
return tokens;
}

View File

@ -93,7 +93,9 @@ public class ITestRoleDelegationTokens extends ITestSessionDelegationTokens {
conf.unset(DelegationConstants.DELEGATION_TOKEN_ROLE_ARN);
try (S3ADelegationTokens delegationTokens2 = new S3ADelegationTokens()) {
final S3AFileSystem fs = getFileSystem();
delegationTokens2.bindToFileSystem(fs.getUri(), fs);
delegationTokens2.bindToFileSystem(fs.getUri(),
fs.createStoreContext(),
fs.createDelegationOperations());
delegationTokens2.init(conf);
delegationTokens2.start();

View File

@ -263,7 +263,10 @@ public class ITestSessionDelegationTokens extends AbstractDelegationIT {
TemporaryAWSCredentialsProvider.NAME);
session.setSecretsInConfiguration(conf);
try(S3ADelegationTokens delegationTokens2 = new S3ADelegationTokens()) {
delegationTokens2.bindToFileSystem(fs.getCanonicalUri(), fs);
delegationTokens2.bindToFileSystem(
fs.getCanonicalUri(),
fs.createStoreContext(),
fs.createDelegationOperations());
delegationTokens2.init(conf);
delegationTokens2.start();