HADOOP-13381. KMS clients should use KMS Delegation Tokens from current UGI. Contributed by Xiao Chen.

This commit is contained in:
Xiao Chen 2016-07-28 18:23:51 -07:00
parent 7086fc72ee
commit 8ebf2e95d2
2 changed files with 146 additions and 2 deletions

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
@ -536,8 +537,12 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
UserGroupInformation.AuthenticationMethod.PROXY)
? currentUgi.getShortUserName() : null;
// creating the HTTP connection using the current UGI at constructor time
conn = actualUgi.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
// If current UGI contains kms-dt && is not proxy, doAs it to use its dt.
// Otherwise, create the HTTP connection using the UGI at constructor time
UserGroupInformation ugiToUse =
(currentUgiContainsKmsDt() && doAsUser == null) ?
currentUgi : actualUgi;
conn = ugiToUse.doAs(new PrivilegedExceptionAction<HttpURLConnection>() {
@Override
public HttpURLConnection run() throws Exception {
DelegationTokenAuthenticatedURL authUrl =
@ -1041,6 +1046,20 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return dtService;
}
private boolean currentUgiContainsKmsDt() throws IOException {
// Add existing credentials from current UGI, since provider is cached.
Credentials creds = UserGroupInformation.getCurrentUser().
getCredentials();
if (!creds.getAllTokens().isEmpty()) {
org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
dToken = creds.getToken(getDelegationTokenService());
if (dToken != null) {
return true;
}
}
return false;
}
/**
* Shutdown valueQueue executor threads
*/

View File

@ -38,7 +38,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -50,6 +52,8 @@ import org.slf4j.LoggerFactory;
import javax.security.auth.login.AppConfigurationEntry;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@ -62,8 +66,10 @@ import java.net.URI;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -1876,6 +1882,125 @@ public class TestKMS {
});
}
@Test
public void testDelegationTokensUpdatedInUGI() throws Exception {
Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);
File confDir = getTestDir();
conf = createBaseKMSConf(confDir);
conf.set(
"hadoop.kms.authentication.delegation-token.max-lifetime.sec", "5");
conf.set(
"hadoop.kms.authentication.delegation-token.renew-interval.sec", "5");
writeConf(confDir, conf);
// Running as a service (e.g. Yarn in practice).
runServer(null, null, confDir, new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration clientConf = new Configuration();
final URI uri = createKMSUri(getKMSUrl());
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
createKMSUri(getKMSUrl()).toString());
final KeyProvider kp = createProvider(uri, clientConf);
final KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(kp);
final InetSocketAddress kmsAddr =
new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort());
// Job 1 (e.g. Yarn log aggregation job), with user DT.
final Collection<Token<?>> job1Token = new HashSet<>();
doAs("client", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// Get a DT and use it.
final Credentials credentials = new Credentials();
kpdte.addDelegationTokens("client", credentials);
Assert.assertEquals(1, credentials.getAllTokens().size());
Assert.assertEquals(KMSClientProvider.TOKEN_KIND, credentials.
getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
UserGroupInformation.getCurrentUser().addCredentials(credentials);
LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
getCurrentUser().getCredentials().getAllTokens());
Token<?> token =
UserGroupInformation.getCurrentUser().getCredentials()
.getToken(SecurityUtil.buildTokenService(kmsAddr));
Assert.assertNotNull(token);
job1Token.add(token);
// Decode the token to get max time.
ByteArrayInputStream buf =
new ByteArrayInputStream(token.getIdentifier());
DataInputStream dis = new DataInputStream(buf);
DelegationTokenIdentifier id =
new DelegationTokenIdentifier(token.getKind());
id.readFields(dis);
dis.close();
final long maxTime = id.getMaxDate();
// wait for token to expire.
Thread.sleep(5100);
Assert.assertTrue("maxTime " + maxTime + " is not less than now.",
maxTime > 0 && maxTime < Time.now());
try {
kp.getKeys();
Assert.fail("Operation should fail since dt is expired.");
} catch (Exception e) {
LOG.info("Expected error.", e);
}
return null;
}
});
Assert.assertFalse(job1Token.isEmpty());
// job 2 (e.g. Another Yarn log aggregation job, with user DT.
doAs("client", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// Get a new DT, but don't use it yet.
final Credentials newCreds = new Credentials();
kpdte.addDelegationTokens("client", newCreds);
Assert.assertEquals(1, newCreds.getAllTokens().size());
Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
getKind());
// Using job 1's DT should fail.
final Credentials oldCreds = new Credentials();
for (Token<?> token : job1Token) {
if (token.getKind().equals(KMSClientProvider.TOKEN_KIND)) {
oldCreds
.addToken(SecurityUtil.buildTokenService(kmsAddr), token);
}
}
UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
LOG.info("Added old kms dt to credentials: {}", UserGroupInformation
.getCurrentUser().getCredentials().getAllTokens());
try {
kp.getKeys();
Assert.fail("Operation should fail since dt is expired.");
} catch (Exception e) {
LOG.info("Expected error.", e);
}
// Using the new DT should succeed.
Assert.assertEquals(1, newCreds.getAllTokens().size());
Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
getKind());
UserGroupInformation.getCurrentUser().addCredentials(newCreds);
LOG.info("Credetials now are: {}", UserGroupInformation
.getCurrentUser().getCredentials().getAllTokens());
kp.getKeys();
return null;
}
});
return null;
}
});
}
@Test
public void testKMSWithZKSigner() throws Exception {
doKMSWithZK(true, false);