HADOOP-13251. Authenticate with Kerberos credentials when renewing KMS delegation token. Contributed by Xiao Chen.
This commit is contained in:
parent
9683eab0e1
commit
771f798edf
|
@ -339,7 +339,7 @@ public class UserGroupInformation {
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static void reset() {
|
public static void reset() {
|
||||||
authenticationMethod = null;
|
authenticationMethod = null;
|
||||||
conf = null;
|
conf = null;
|
||||||
groups = null;
|
groups = null;
|
||||||
|
|
|
@ -58,6 +58,7 @@ public abstract class DelegationTokenAuthenticator implements Authenticator {
|
||||||
private static final String HTTP_PUT = "PUT";
|
private static final String HTTP_PUT = "PUT";
|
||||||
|
|
||||||
public static final String OP_PARAM = "op";
|
public static final String OP_PARAM = "op";
|
||||||
|
private static final String OP_PARAM_EQUALS = OP_PARAM + "=";
|
||||||
|
|
||||||
public static final String DELEGATION_TOKEN_HEADER =
|
public static final String DELEGATION_TOKEN_HEADER =
|
||||||
"X-Hadoop-Delegation-Token";
|
"X-Hadoop-Delegation-Token";
|
||||||
|
@ -285,27 +286,41 @@ public abstract class DelegationTokenAuthenticator implements Authenticator {
|
||||||
}
|
}
|
||||||
url = new URL(sb.toString());
|
url = new URL(sb.toString());
|
||||||
AuthenticatedURL aUrl = new AuthenticatedURL(this, connConfigurator);
|
AuthenticatedURL aUrl = new AuthenticatedURL(this, connConfigurator);
|
||||||
HttpURLConnection conn = aUrl.openConnection(url, token);
|
org.apache.hadoop.security.token.Token<AbstractDelegationTokenIdentifier>
|
||||||
conn.setRequestMethod(operation.getHttpMethod());
|
dt = null;
|
||||||
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
if (token instanceof DelegationTokenAuthenticatedURL.Token
|
||||||
if (hasResponse) {
|
&& operation.requiresKerberosCredentials()) {
|
||||||
String contentType = conn.getHeaderField(CONTENT_TYPE);
|
// Unset delegation token to trigger fall-back authentication.
|
||||||
contentType = (contentType != null) ? StringUtils.toLowerCase(contentType)
|
dt = ((DelegationTokenAuthenticatedURL.Token) token).getDelegationToken();
|
||||||
: null;
|
((DelegationTokenAuthenticatedURL.Token) token).setDelegationToken(null);
|
||||||
if (contentType != null &&
|
}
|
||||||
contentType.contains(APPLICATION_JSON_MIME)) {
|
try {
|
||||||
try {
|
HttpURLConnection conn = aUrl.openConnection(url, token);
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
conn.setRequestMethod(operation.getHttpMethod());
|
||||||
ret = mapper.readValue(conn.getInputStream(), Map.class);
|
HttpExceptionUtils.validateResponse(conn, HttpURLConnection.HTTP_OK);
|
||||||
} catch (Exception ex) {
|
if (hasResponse) {
|
||||||
throw new AuthenticationException(String.format(
|
String contentType = conn.getHeaderField(CONTENT_TYPE);
|
||||||
"'%s' did not handle the '%s' delegation token operation: %s",
|
contentType =
|
||||||
url.getAuthority(), operation, ex.getMessage()), ex);
|
(contentType != null) ? StringUtils.toLowerCase(contentType) : null;
|
||||||
|
if (contentType != null &&
|
||||||
|
contentType.contains(APPLICATION_JSON_MIME)) {
|
||||||
|
try {
|
||||||
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
|
ret = mapper.readValue(conn.getInputStream(), Map.class);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
throw new AuthenticationException(String.format(
|
||||||
|
"'%s' did not handle the '%s' delegation token operation: %s",
|
||||||
|
url.getAuthority(), operation, ex.getMessage()), ex);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw new AuthenticationException(String.format("'%s' did not " +
|
||||||
|
"respond with JSON to the '%s' delegation token operation",
|
||||||
|
url.getAuthority(), operation));
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
throw new AuthenticationException(String.format("'%s' did not " +
|
} finally {
|
||||||
"respond with JSON to the '%s' delegation token operation",
|
if (dt != null) {
|
||||||
url.getAuthority(), operation));
|
((DelegationTokenAuthenticatedURL.Token) token).setDelegationToken(dt);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
|
import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -41,6 +43,8 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class DelegationTokenManager {
|
public class DelegationTokenManager {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(DelegationTokenManager.class);
|
||||||
|
|
||||||
public static final String ENABLE_ZK_KEY = "zk-dt-secret-manager.enable";
|
public static final String ENABLE_ZK_KEY = "zk-dt-secret-manager.enable";
|
||||||
|
|
||||||
|
@ -156,6 +160,7 @@ public class DelegationTokenManager {
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public Token<? extends AbstractDelegationTokenIdentifier> createToken(
|
public Token<? extends AbstractDelegationTokenIdentifier> createToken(
|
||||||
UserGroupInformation ugi, String renewer) {
|
UserGroupInformation ugi, String renewer) {
|
||||||
|
LOG.debug("Creating token with ugi:{}, renewer:{}.", ugi, renewer);
|
||||||
renewer = (renewer == null) ? ugi.getShortUserName() : renewer;
|
renewer = (renewer == null) ? ugi.getShortUserName() : renewer;
|
||||||
String user = ugi.getUserName();
|
String user = ugi.getUserName();
|
||||||
Text owner = new Text(user);
|
Text owner = new Text(user);
|
||||||
|
@ -175,6 +180,7 @@ public class DelegationTokenManager {
|
||||||
public long renewToken(
|
public long renewToken(
|
||||||
Token<? extends AbstractDelegationTokenIdentifier> token, String renewer)
|
Token<? extends AbstractDelegationTokenIdentifier> token, String renewer)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
LOG.debug("Renewing token:{} with renewer:{}.", token, renewer);
|
||||||
return secretManager.renewToken(token, renewer);
|
return secretManager.renewToken(token, renewer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -182,6 +188,7 @@ public class DelegationTokenManager {
|
||||||
public void cancelToken(
|
public void cancelToken(
|
||||||
Token<? extends AbstractDelegationTokenIdentifier> token,
|
Token<? extends AbstractDelegationTokenIdentifier> token,
|
||||||
String canceler) throws IOException {
|
String canceler) throws IOException {
|
||||||
|
LOG.debug("Cancelling token:{} with canceler:{}.", token, canceler);
|
||||||
canceler = (canceler != null) ? canceler :
|
canceler = (canceler != null) ? canceler :
|
||||||
verifyToken(token).getShortUserName();
|
verifyToken(token).getShortUserName();
|
||||||
secretManager.cancelToken(token, canceler);
|
secretManager.cancelToken(token, canceler);
|
||||||
|
|
|
@ -585,7 +585,7 @@ $H3 KMS Delegation Token Configuration
|
||||||
|
|
||||||
KMS supports delegation tokens to authenticate to the key providers from processes without Kerberos credentials.
|
KMS supports delegation tokens to authenticate to the key providers from processes without Kerberos credentials.
|
||||||
|
|
||||||
KMS delegation token authentication extends the default Hadoop authentication. See [Hadoop Auth](../hadoop-auth/index.html) page for more details.
|
KMS delegation token authentication extends the default Hadoop authentication. Same as Hadoop authentication, KMS delegation tokens must not be fetched or renewed using delegation token authentication. See [Hadoop Auth](../hadoop-auth/index.html) page for more details.
|
||||||
|
|
||||||
Additionally, KMS delegation token secret manager can be configured with the following properties:
|
Additionally, KMS delegation token secret manager can be configured with the following properties:
|
||||||
|
|
||||||
|
|
|
@ -35,13 +35,10 @@ import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
|
||||||
import org.apache.hadoop.security.authentication.client.Authenticator;
|
|
||||||
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
|
|
||||||
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
|
|
||||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -73,11 +70,6 @@ import java.util.Properties;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Mockito.doNothing;
|
|
||||||
import static org.mockito.Mockito.doThrow;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
public class TestKMS {
|
public class TestKMS {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(TestKMS.class);
|
private static final Logger LOG = LoggerFactory.getLogger(TestKMS.class);
|
||||||
|
|
||||||
|
@ -262,6 +254,7 @@ public class TestKMS {
|
||||||
kdc = null;
|
kdc = null;
|
||||||
}
|
}
|
||||||
UserGroupInformation.setShouldRenewImmediatelyForTests(false);
|
UserGroupInformation.setShouldRenewImmediatelyForTests(false);
|
||||||
|
UserGroupInformation.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
|
private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
|
||||||
|
@ -1748,93 +1741,136 @@ public class TestKMS {
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokensOpsSimple() throws Exception {
|
public void testDelegationTokensOpsSimple() throws Exception {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
final Authenticator mock = mock(PseudoAuthenticator.class);
|
testDelegationTokensOps(conf, false);
|
||||||
testDelegationTokensOps(conf, mock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDelegationTokensOpsKerberized() throws Exception {
|
public void testDelegationTokensOpsKerberized() throws Exception {
|
||||||
final Configuration conf = new Configuration();
|
final Configuration conf = new Configuration();
|
||||||
conf.set("hadoop.security.authentication", "kerberos");
|
conf.set("hadoop.security.authentication", "kerberos");
|
||||||
final Authenticator mock = mock(KerberosAuthenticator.class);
|
testDelegationTokensOps(conf, true);
|
||||||
testDelegationTokensOps(conf, mock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void testDelegationTokensOps(Configuration conf,
|
private void testDelegationTokensOps(Configuration conf,
|
||||||
final Authenticator mockAuthenticator) throws Exception {
|
final boolean useKrb) throws Exception {
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
File confDir = getTestDir();
|
File confDir = getTestDir();
|
||||||
conf = createBaseKMSConf(confDir);
|
conf = createBaseKMSConf(confDir);
|
||||||
|
if (useKrb) {
|
||||||
|
conf.set("hadoop.kms.authentication.type", "kerberos");
|
||||||
|
conf.set("hadoop.kms.authentication.kerberos.keytab",
|
||||||
|
keytab.getAbsolutePath());
|
||||||
|
conf.set("hadoop.kms.authentication.kerberos.principal",
|
||||||
|
"HTTP/localhost");
|
||||||
|
conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
|
||||||
|
}
|
||||||
writeConf(confDir, conf);
|
writeConf(confDir, conf);
|
||||||
doNothing().when(mockAuthenticator).authenticate(any(URL.class),
|
|
||||||
any(AuthenticatedURL.Token.class));
|
|
||||||
|
|
||||||
runServer(null, null, confDir, new KMSCallable<Void>() {
|
runServer(null, null, confDir, new KMSCallable<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void call() throws Exception {
|
public Void call() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
final Configuration clientConf = new Configuration();
|
||||||
URI uri = createKMSUri(getKMSUrl());
|
final URI uri = createKMSUri(getKMSUrl());
|
||||||
KeyProvider kp = createProvider(uri, conf);
|
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||||
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
|
||||||
createKMSUri(getKMSUrl()).toString());
|
createKMSUri(getKMSUrl()).toString());
|
||||||
|
|
||||||
// test delegation token retrieval
|
doAs("client", new PrivilegedExceptionAction<Void>() {
|
||||||
KeyProviderDelegationTokenExtension kpdte =
|
@Override
|
||||||
KeyProviderDelegationTokenExtension.
|
public Void run() throws Exception {
|
||||||
createKeyProviderDelegationTokenExtension(kp);
|
KeyProvider kp = createProvider(uri, clientConf);
|
||||||
Credentials credentials = new Credentials();
|
// test delegation token retrieval
|
||||||
final Token<?>[] tokens = kpdte.addDelegationTokens(
|
KeyProviderDelegationTokenExtension kpdte =
|
||||||
UserGroupInformation.getCurrentUser().getUserName(), credentials);
|
KeyProviderDelegationTokenExtension.
|
||||||
Assert.assertEquals(1, credentials.getAllTokens().size());
|
createKeyProviderDelegationTokenExtension(kp);
|
||||||
InetSocketAddress kmsAddr = new InetSocketAddress(getKMSUrl().getHost(),
|
final Credentials credentials = new Credentials();
|
||||||
getKMSUrl().getPort());
|
final Token<?>[] tokens =
|
||||||
Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
|
kpdte.addDelegationTokens("client1", credentials);
|
||||||
credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
Assert.assertEquals(1, credentials.getAllTokens().size());
|
||||||
getKind());
|
InetSocketAddress kmsAddr =
|
||||||
|
new InetSocketAddress(getKMSUrl().getHost(),
|
||||||
|
getKMSUrl().getPort());
|
||||||
|
Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
|
||||||
|
credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
||||||
|
getKind());
|
||||||
|
|
||||||
// After this point, we're supposed to use the delegation token to auth.
|
// Test non-renewer user cannot renew.
|
||||||
doThrow(new IOException("Authenticator should not fall back"))
|
for (Token<?> token : tokens) {
|
||||||
.when(mockAuthenticator).authenticate(any(URL.class),
|
if (!(token.getKind().equals(KMSClientProvider.TOKEN_KIND))) {
|
||||||
any(AuthenticatedURL.Token.class));
|
LOG.info("Skipping token {}", token);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("Got dt for " + uri + "; " + token);
|
||||||
|
try {
|
||||||
|
token.renew(clientConf);
|
||||||
|
Assert.fail("client should not be allowed to renew token with"
|
||||||
|
+ "renewer=client1");
|
||||||
|
} catch (Exception e) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"tries to renew a token with renewer", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// test delegation token renewal
|
final UserGroupInformation otherUgi;
|
||||||
boolean renewed = false;
|
if (useKrb) {
|
||||||
for (Token<?> token : tokens) {
|
UserGroupInformation
|
||||||
if (!(token.getKind().equals(KMSClientProvider.TOKEN_KIND))) {
|
.loginUserFromKeytab("client1", keytab.getAbsolutePath());
|
||||||
LOG.info("Skipping token {}", token);
|
otherUgi = UserGroupInformation.getLoginUser();
|
||||||
continue;
|
} else {
|
||||||
}
|
otherUgi = UserGroupInformation.createUserForTesting("client1",
|
||||||
LOG.info("Got dt for " + uri + "; " + token);
|
new String[] {"other group"});
|
||||||
long tokenLife = token.renew(conf);
|
}
|
||||||
LOG.info("Renewed token of kind {}, new lifetime:{}",
|
try {
|
||||||
token.getKind(), tokenLife);
|
// test delegation token renewal via renewer
|
||||||
Thread.sleep(100);
|
otherUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
long newTokenLife = token.renew(conf);
|
@Override
|
||||||
LOG.info("Renewed token of kind {}, new lifetime:{}",
|
public Void run() throws Exception {
|
||||||
token.getKind(), newTokenLife);
|
boolean renewed = false;
|
||||||
Assert.assertTrue(newTokenLife > tokenLife);
|
for (Token<?> token : tokens) {
|
||||||
renewed = true;
|
if (!(token.getKind()
|
||||||
}
|
.equals(KMSClientProvider.TOKEN_KIND))) {
|
||||||
Assert.assertTrue(renewed);
|
LOG.info("Skipping token {}", token);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("Got dt for " + uri + "; " + token);
|
||||||
|
long tokenLife = token.renew(clientConf);
|
||||||
|
LOG.info("Renewed token of kind {}, new lifetime:{}",
|
||||||
|
token.getKind(), tokenLife);
|
||||||
|
Thread.sleep(100);
|
||||||
|
long newTokenLife = token.renew(clientConf);
|
||||||
|
LOG.info("Renewed token of kind {}, new lifetime:{}",
|
||||||
|
token.getKind(), newTokenLife);
|
||||||
|
Assert.assertTrue(newTokenLife > tokenLife);
|
||||||
|
renewed = true;
|
||||||
|
}
|
||||||
|
Assert.assertTrue(renewed);
|
||||||
|
|
||||||
// test delegation token cancellation
|
// test delegation token cancellation
|
||||||
for (Token<?> token : tokens) {
|
for (Token<?> token : tokens) {
|
||||||
if (!(token.getKind().equals(KMSClientProvider.TOKEN_KIND))) {
|
if (!(token.getKind()
|
||||||
LOG.info("Skipping token {}", token);
|
.equals(KMSClientProvider.TOKEN_KIND))) {
|
||||||
continue;
|
LOG.info("Skipping token {}", token);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("Got dt for " + uri + "; " + token);
|
||||||
|
token.cancel(clientConf);
|
||||||
|
LOG.info("Cancelled token of kind {}", token.getKind());
|
||||||
|
try {
|
||||||
|
token.renew(clientConf);
|
||||||
|
Assert
|
||||||
|
.fail("should not be able to renew a canceled token");
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Expected exception when renewing token", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
} finally {
|
||||||
|
otherUgi.logoutUserFromKeytab();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Got dt for " + uri + "; " + token);
|
});
|
||||||
token.cancel(conf);
|
|
||||||
LOG.info("Cancelled token of kind {}", token.getKind());
|
|
||||||
doNothing().when(mockAuthenticator).
|
|
||||||
authenticate(any(URL.class), any(AuthenticatedURL.Token.class));
|
|
||||||
try {
|
|
||||||
token.renew(conf);
|
|
||||||
Assert.fail("should not be able to renew a canceled token");
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.info("Expected exception when trying to renew token", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in New Issue