HADOOP-16658. S3A connector does not support including the token renewer in the token identifier.

Contributed by Phil Zampino.

Change-Id: Iea9d5028dcf58bda4da985604f5cd3ac283619bd
This commit is contained in:
Phil Zampino 2019-10-23 16:32:08 +01:00 committed by Steve Loughran
parent a901405ad8
commit 1d5d7d0989
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
14 changed files with 119 additions and 32 deletions

View File

@ -110,6 +110,7 @@ import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState;
import org.apache.hadoop.fs.s3a.select.InternalSelectConstants;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
@ -3186,7 +3187,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
entryPoint(Statistic.INVOCATION_GET_DELEGATION_TOKEN);
LOG.debug("Delegation token requested");
if (delegationTokens.isPresent()) {
return delegationTokens.get().getBoundOrNewDT(encryptionSecrets);
return delegationTokens.get().getBoundOrNewDT(encryptionSecrets,
(renewer != null ? new Text(renewer) : new Text()));
} else {
// Delegation token support is not set up
LOG.debug("Token support is not enabled");

View File

@ -129,15 +129,17 @@ public abstract class AbstractDelegationTokenBinding extends AbstractDTService {
* filesystem has been deployed unbonded.
* @param policy minimum policy to use, if known.
* @param encryptionSecrets encryption secrets for the token.
* @param renewer the principal permitted to renew the token.
* @return the token or null if the back end does not want to issue one.
* @throws IOException if one cannot be created
*/
public Token<AbstractS3ATokenIdentifier> createDelegationToken(
final Optional<RoleModel.Policy> policy,
final EncryptionSecrets encryptionSecrets) throws IOException {
final EncryptionSecrets encryptionSecrets,
final Text renewer) throws IOException {
requireServiceStarted();
final AbstractS3ATokenIdentifier tokenIdentifier =
createTokenIdentifier(policy, encryptionSecrets);
createTokenIdentifier(policy, encryptionSecrets, renewer);
if (tokenIdentifier != null) {
Token<AbstractS3ATokenIdentifier> token =
new Token<>(tokenIdentifier, secretManager);
@ -157,17 +159,19 @@ public abstract class AbstractDelegationTokenBinding extends AbstractDTService {
* This will only be called if a new DT is needed, that is: the
* filesystem has been deployed unbonded.
*
* If {@link #createDelegationToken(Optional, EncryptionSecrets)}
* If {@link #createDelegationToken(Optional, EncryptionSecrets, Text)}
* is overridden, this method can be replaced with a stub.
*
* @param policy minimum policy to use, if known.
* @param encryptionSecrets encryption secrets for the token.
* @param renewer the principal permitted to renew the token.
* @return the token data to include in the token identifier.
* @throws IOException failure creating the token data.
*/
public abstract AbstractS3ATokenIdentifier createTokenIdentifier(
Optional<RoleModel.Policy> policy,
EncryptionSecrets encryptionSecrets) throws IOException;
EncryptionSecrets encryptionSecrets,
Text renewer) throws IOException;
/**
* Verify that a token identifier is of a specific class.

View File

@ -103,7 +103,8 @@ public abstract class AbstractS3ATokenIdentifier
* Constructor.
* @param kind token kind.
* @param uri filesystem URI.
* @param owner token owner
* @param owner token owner.
* @param renewer token renewer.
* @param origin origin text for diagnostics.
* @param encryptionSecrets encryption secrets to set.
*/
@ -111,9 +112,14 @@ public abstract class AbstractS3ATokenIdentifier
final Text kind,
final URI uri,
final Text owner,
final Text renewer,
final String origin,
final EncryptionSecrets encryptionSecrets) {
this(kind, owner, new Text(), new Text(), uri);
this(kind,
owner,
(renewer != null ? renewer : new Text()),
new Text(),
uri);
this.origin = requireNonNull(origin);
this.encryptionSecrets = requireNonNull(encryptionSecrets);
}
@ -237,6 +243,7 @@ public abstract class AbstractS3ATokenIdentifier
sb.append(getKind());
sb.append("; uri=").append(uri);
sb.append("; timestamp=").append(created);
sb.append("; renewer=").append(getRenewer());
sb.append("; encryption=").append(encryptionSecrets.toString());
sb.append("; ").append(uuid);
sb.append("; ").append(origin);

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialProvider;
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
import org.apache.hadoop.fs.s3a.auth.RoleModel;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.Text;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.FULL_TOKEN_KIND;
@ -61,7 +62,7 @@ public class FullCredentialsTokenBinding extends
private String credentialOrigin;
/**
* Constructor, uses name of {@link #name} and token kind of
* Constructor, uses name of {@link #NAME} and token kind of
* {@link DelegationConstants#FULL_TOKEN_KIND}.
*
*/
@ -138,11 +139,13 @@ public class FullCredentialsTokenBinding extends
@Override
public AbstractS3ATokenIdentifier createTokenIdentifier(
final Optional<RoleModel.Policy> policy,
final EncryptionSecrets encryptionSecrets) throws IOException {
final EncryptionSecrets encryptionSecrets,
final Text renewer) throws IOException {
requireServiceStarted();
return new FullCredentialsTokenIdentifier(getCanonicalUri(),
getOwnerText(),
renewer,
awsCredentials,
encryptionSecrets,
credentialOrigin);

View File

@ -37,11 +37,13 @@ public class FullCredentialsTokenIdentifier extends SessionTokenIdentifier {
public FullCredentialsTokenIdentifier(final URI uri,
final Text owner,
final Text renewer,
final MarshalledCredentials marshalledCredentials,
final EncryptionSecrets encryptionSecrets,
String origin) {
super(DelegationConstants.FULL_TOKEN_KIND,
owner,
renewer,
uri,
marshalledCredentials,
encryptionSecrets,

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.s3a.auth.MarshalledCredentialProvider;
import org.apache.hadoop.fs.s3a.auth.MarshalledCredentials;
import org.apache.hadoop.fs.s3a.auth.RoleModel;
import org.apache.hadoop.fs.s3a.auth.STSClientFactory;
import org.apache.hadoop.io.Text;
import static org.apache.hadoop.fs.s3a.auth.MarshalledCredentialBinding.fromSTSCredentials;
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.DELEGATION_TOKEN_CREDENTIALS_PROVIDER;
@ -75,7 +76,7 @@ public class RoleTokenBinding extends SessionTokenBinding {
/**
* Constructor.
* Name is {@link #name}; token kind is
* Name is {@link #NAME}; token kind is
* {@link DelegationConstants#ROLE_TOKEN_KIND}.
*/
public RoleTokenBinding() {
@ -129,7 +130,8 @@ public class RoleTokenBinding extends SessionTokenBinding {
@Retries.RetryTranslated
public RoleTokenIdentifier createTokenIdentifier(
final Optional<RoleModel.Policy> policy,
final EncryptionSecrets encryptionSecrets) throws IOException {
final EncryptionSecrets encryptionSecrets,
final Text renewer) throws IOException {
requireServiceStarted();
Preconditions.checkState(!roleArn.isEmpty(), E_NO_ARN);
String policyJson = policy.isPresent() ?
@ -152,6 +154,7 @@ public class RoleTokenBinding extends SessionTokenBinding {
return new RoleTokenIdentifier(
getCanonicalUri(),
getOwnerText(),
renewer,
fromSTSCredentials(credentials),
encryptionSecrets,
AbstractS3ATokenIdentifier.createDefaultOriginMessage()

View File

@ -35,11 +35,13 @@ public class RoleTokenIdentifier extends SessionTokenIdentifier {
public RoleTokenIdentifier(final URI uri,
final Text owner,
final Text renewer,
final MarshalledCredentials marshalledCredentials,
final EncryptionSecrets encryptionSecrets,
final String origin) {
super(DelegationConstants.ROLE_TOKEN_KIND,
owner,
renewer,
uri,
marshalledCredentials,
encryptionSecrets,

View File

@ -352,7 +352,7 @@ public class S3ADelegationTokens extends AbstractDTService {
/**
* Predicate: will this binding issue a DT if requested
* in a call to {@link #getBoundOrNewDT(EncryptionSecrets)}?
* in a call to {@link #getBoundOrNewDT(EncryptionSecrets, Text)}?
* That is: should the filesystem declare that it is issuing
* delegation tokens?
* @return a declaration of what will happen when asked for a token.
@ -368,10 +368,12 @@ public class S3ADelegationTokens extends AbstractDTService {
* @return a delegation token.
* @throws IOException if one cannot be created
* @param encryptionSecrets encryption secrets for any new token.
* @param renewer the token renewer.
*/
@SuppressWarnings("OptionalGetWithoutIsPresent")
public Token<AbstractS3ATokenIdentifier> getBoundOrNewDT(
final EncryptionSecrets encryptionSecrets)
final EncryptionSecrets encryptionSecrets,
final Text renewer)
throws IOException {
LOG.debug("Delegation token requested");
if (isBoundToDT()) {
@ -382,13 +384,13 @@ public class S3ADelegationTokens extends AbstractDTService {
// not bound to a token, so create a new one.
// issued DTs are not cached so that long-lived filesystems can
// reliably issue session/role tokens.
return createDelegationToken(encryptionSecrets);
return createDelegationToken(encryptionSecrets, renewer);
}
}
/**
* How many delegation tokens have been issued?
* @return the number times {@link #createDelegationToken(EncryptionSecrets)}
* @return the number times {@link #createDelegationToken(EncryptionSecrets, Text)}
* returned a token.
*/
public int getCreationCount() {
@ -400,12 +402,14 @@ public class S3ADelegationTokens extends AbstractDTService {
* This will only be called if a new DT is needed, that is: the
* filesystem has been deployed unbonded.
* @param encryptionSecrets encryption secrets for the token.
* @param renewer the token renewer
* @return the token
* @throws IOException if one cannot be created
*/
@VisibleForTesting
public Token<AbstractS3ATokenIdentifier> createDelegationToken(
final EncryptionSecrets encryptionSecrets) throws IOException {
final EncryptionSecrets encryptionSecrets,
final Text renewer) throws IOException {
requireServiceStarted();
checkArgument(encryptionSecrets != null,
"Null encryption secrets");
@ -420,7 +424,7 @@ public class S3ADelegationTokens extends AbstractDTService {
try(DurationInfo ignored = new DurationInfo(LOG, DURATION_LOG_AT_INFO,
"Creating New Delegation Token", tokenBinding.getKind())) {
Token<AbstractS3ATokenIdentifier> token
= tokenBinding.createDelegationToken(rolePolicy, encryptionSecrets);
= tokenBinding.createDelegationToken(rolePolicy, encryptionSecrets, renewer);
if (token != null) {
token.setService(service);
noteTokenCreated(token);

View File

@ -353,7 +353,8 @@ public class SessionTokenBinding extends AbstractDelegationTokenBinding {
@Retries.RetryTranslated
public SessionTokenIdentifier createTokenIdentifier(
final Optional<RoleModel.Policy> policy,
final EncryptionSecrets encryptionSecrets) throws IOException {
final EncryptionSecrets encryptionSecrets,
final Text renewer) throws IOException {
requireServiceStarted();
final MarshalledCredentials marshalledCredentials;
@ -384,11 +385,12 @@ public class SessionTokenBinding extends AbstractDelegationTokenBinding {
}
}
return new SessionTokenIdentifier(getKind(),
getOwnerText(),
getCanonicalUri(),
marshalledCredentials,
encryptionSecrets,
origin);
getOwnerText(),
renewer,
getCanonicalUri(),
marshalledCredentials,
encryptionSecrets,
origin);
}
@Override

View File

@ -73,7 +73,8 @@ public class SessionTokenIdentifier extends
/**
* Constructor.
* @param kind token kind.
* @param owner token owner
* @param owner token owner.
* @param renewer token renewer.
* @param uri filesystem URI.
* @param marshalledCredentials credentials to marshall
* @param encryptionSecrets encryption secrets
@ -82,11 +83,12 @@ public class SessionTokenIdentifier extends
public SessionTokenIdentifier(
final Text kind,
final Text owner,
final Text renewer,
final URI uri,
final MarshalledCredentials marshalledCredentials,
final EncryptionSecrets encryptionSecrets,
final String origin) {
super(kind, uri, owner, origin, encryptionSecrets);
super(kind, uri, owner, renewer, origin, encryptionSecrets);
this.marshalledCredentials = marshalledCredentials;
}

View File

@ -46,7 +46,7 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.looku
*/
public abstract class AbstractDelegationIT extends AbstractS3ATestBase {
protected static final String YARN_RM = "yarn-rm@EXAMPLE";
protected static final String YARN_RM = "yarn-rm@EXAMPLE.COM";
private static final Logger LOG =
LoggerFactory.getLogger(AbstractDelegationIT.class);

View File

@ -101,7 +101,7 @@ public class ITestRoleDelegationTokens extends ITestSessionDelegationTokens {
intercept(IllegalStateException.class,
E_NO_ARN,
() -> delegationTokens2.createDelegationToken(
new EncryptionSecrets()));
new EncryptionSecrets(), null));
}
}

View File

@ -118,7 +118,7 @@ public class ITestSessionDelegationTokens extends AbstractDelegationIT {
EncryptionSecrets encryptionSecrets = new EncryptionSecrets(
S3AEncryptionMethods.SSE_KMS, KMS_KEY);
Token<AbstractS3ATokenIdentifier> dt
= delegationTokens.createDelegationToken(encryptionSecrets);
= delegationTokens.createDelegationToken(encryptionSecrets, null);
final SessionTokenIdentifier origIdentifier
= (SessionTokenIdentifier) dt.decodeIdentifier();
assertEquals("kind in " + dt, getTokenKind(), dt.getKind());
@ -173,7 +173,7 @@ public class ITestSessionDelegationTokens extends AbstractDelegationIT {
EncryptionSecrets secrets = new EncryptionSecrets(
S3AEncryptionMethods.SSE_KMS, KMS_KEY);
Token<AbstractS3ATokenIdentifier> originalDT
= delegationTokens.createDelegationToken(secrets);
= delegationTokens.createDelegationToken(secrets, null);
assertEquals("Token kind mismatch", getTokenKind(), originalDT.getKind());
// decode to get the binding info
@ -195,12 +195,12 @@ public class ITestSessionDelegationTokens extends AbstractDelegationIT {
awsSessionCreds);
Token<AbstractS3ATokenIdentifier> boundDT =
dt2.getBoundOrNewDT(secrets);
dt2.getBoundOrNewDT(secrets, null);
assertEquals("Delegation Tokens", originalDT, boundDT);
// simulate marshall and transmission
creds = roundTrip(origCreds, conf);
SessionTokenIdentifier reissued
= (SessionTokenIdentifier) dt2.createDelegationToken(secrets)
= (SessionTokenIdentifier) dt2.createDelegationToken(secrets, null)
.decodeIdentifier();
reissued.validate();
String userAgentField = dt2.getUserAgentField();
@ -214,6 +214,31 @@ public class ITestSessionDelegationTokens extends AbstractDelegationIT {
verifyCredentialPropagation(fs, creds, new Configuration(conf));
}
@Test
public void testCreateWithRenewer() throws Throwable {
describe("Create a Delegation Token, round trip then reuse");
final S3AFileSystem fs = getFileSystem();
final Configuration conf = fs.getConf();
final Text renewer = new Text("yarn");
assertNull("Current User has delegation token",
delegationTokens.selectTokenFromFSOwner());
EncryptionSecrets secrets = new EncryptionSecrets(
S3AEncryptionMethods.SSE_KMS, KMS_KEY);
Token<AbstractS3ATokenIdentifier> dt
= delegationTokens.createDelegationToken(secrets, renewer);
assertEquals("Token kind mismatch", getTokenKind(), dt.getKind());
// decode to get the binding info
SessionTokenIdentifier issued =
requireNonNull(
(SessionTokenIdentifier) dt.decodeIdentifier(),
() -> "no identifier in " + dt);
issued.validate();
assertEquals("Token renewer mismatch", renewer, issued.getRenewer());
}
/**
* This verifies that AWS Session credentials can be picked up and
* returned in a DT.
@ -243,7 +268,7 @@ public class ITestSessionDelegationTokens extends AbstractDelegationIT {
delegationTokens2.start();
final Token<AbstractS3ATokenIdentifier> newDT
= delegationTokens2.getBoundOrNewDT(new EncryptionSecrets());
= delegationTokens2.getBoundOrNewDT(new EncryptionSecrets(), null);
delegationTokens2.resetTokenBindingToDT(newDT);
final AbstractS3ATokenIdentifier boundId
= delegationTokens2.getDecodedIdentifier().get();

View File

@ -37,6 +37,7 @@ import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.FULL_
import static org.apache.hadoop.fs.s3a.auth.delegation.DelegationConstants.SESSION_TOKEN_KIND;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/**
* Unit tests related to S3A DT support.
@ -60,9 +61,11 @@ public class TestS3ADelegationTokenSupport {
@Test
public void testSessionTokenDecode() throws Throwable {
Text alice = new Text("alice");
Text renewer = new Text("yarn");
AbstractS3ATokenIdentifier identifier
= new SessionTokenIdentifier(SESSION_TOKEN_KIND,
alice,
renewer,
new URI("s3a://landsat-pds/"),
new MarshalledCredentials("a", "b", ""),
new EncryptionSecrets(S3AEncryptionMethods.SSE_S3, ""),
@ -82,6 +85,7 @@ public class TestS3ADelegationTokenSupport {
assertEquals("name of " + decodedUser,
"alice",
decodedUser.getUserName());
assertEquals("renewer", renewer, decoded.getRenewer());
assertEquals("Authentication method of " + decodedUser,
UserGroupInformation.AuthenticationMethod.TOKEN,
decodedUser.getAuthenticationMethod());
@ -97,9 +101,11 @@ public class TestS3ADelegationTokenSupport {
@Test
public void testSessionTokenIdentifierRoundTrip() throws Throwable {
Text renewer = new Text("yarn");
SessionTokenIdentifier id = new SessionTokenIdentifier(
SESSION_TOKEN_KIND,
new Text(),
renewer,
landsatUri,
new MarshalledCredentials("a", "b", "c"),
new EncryptionSecrets(), "");
@ -110,6 +116,26 @@ public class TestS3ADelegationTokenSupport {
assertEquals("credentials in " + ids,
id.getMarshalledCredentials(),
result.getMarshalledCredentials());
assertEquals("renewer in " + ids, renewer, id.getRenewer());
}
@Test
public void testSessionTokenIdentifierRoundTripNoRenewer() throws Throwable {
SessionTokenIdentifier id = new SessionTokenIdentifier(
SESSION_TOKEN_KIND,
new Text(),
null,
landsatUri,
new MarshalledCredentials("a", "b", "c"),
new EncryptionSecrets(), "");
SessionTokenIdentifier result = S3ATestUtils.roundTrip(id, null);
String ids = id.toString();
assertEquals("URI in " + ids, id.getUri(), result.getUri());
assertEquals("credentials in " + ids,
id.getMarshalledCredentials(),
result.getMarshalledCredentials());
assertEquals("renewer in " + ids, new Text(), id.getRenewer());
}
@Test
@ -117,6 +143,7 @@ public class TestS3ADelegationTokenSupport {
RoleTokenIdentifier id = new RoleTokenIdentifier(
landsatUri,
new Text(),
new Text(),
new MarshalledCredentials("a", "b", "c"),
new EncryptionSecrets(), "");
@ -126,13 +153,16 @@ public class TestS3ADelegationTokenSupport {
assertEquals("credentials in " + ids,
id.getMarshalledCredentials(),
result.getMarshalledCredentials());
assertEquals("renewer in " + ids, new Text(), id.getRenewer());
}
@Test
public void testFullTokenIdentifierRoundTrip() throws Throwable {
Text renewer = new Text("renewerName");
FullCredentialsTokenIdentifier id = new FullCredentialsTokenIdentifier(
landsatUri,
new Text(),
renewer,
new MarshalledCredentials("a", "b", ""),
new EncryptionSecrets(), "");
@ -142,6 +172,7 @@ public class TestS3ADelegationTokenSupport {
assertEquals("credentials in " + ids,
id.getMarshalledCredentials(),
result.getMarshalledCredentials());
assertEquals("renewer in " + ids, renewer, result.getRenewer());
}
/**