HADOOP-14445. Delegation tokens are not shared between KMS instances. Contributed by Xiao Chen and Rushabh S Shah.

(cherry picked from commit 583fa6ed48)
This commit is contained in:
Xiao Chen 2018-04-10 15:26:33 -07:00
parent 46edbedd99
commit 72acda1449
18 changed files with 1181 additions and 204 deletions

View File

@ -36,8 +36,9 @@ import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.util.HttpExceptionUtils;
import org.apache.hadoop.util.KMSUtil;
@ -82,6 +83,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT;
import static org.apache.hadoop.util.KMSUtil.checkNotEmpty;
import static org.apache.hadoop.util.KMSUtil.checkNotNull;
import static org.apache.hadoop.util.KMSUtil.parseJSONEncKeyVersion;
@ -96,16 +99,13 @@ import static org.apache.hadoop.util.KMSUtil.parseJSONMetadata;
public class KMSClientProvider extends KeyProvider implements CryptoExtension,
KeyProviderDelegationTokenExtension.DelegationTokenExtension {
private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(KMSClientProvider.class);
private static final String INVALID_SIGNATURE = "Invalid signature";
private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed";
public static final String TOKEN_KIND_STR = KMSDelegationToken.TOKEN_KIND_STR;
public static final Text TOKEN_KIND = KMSDelegationToken.TOKEN_KIND;
public static final String SCHEME_NAME = "kms";
private static final String UTF8 = "UTF-8";
@ -133,12 +133,17 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
private static final ObjectWriter WRITER =
new ObjectMapper().writerWithDefaultPrettyPrinter();
/* dtService defines the token service value for the kms token.
* The value can be legacy format which is ip:port format or it can be uri.
* If it's uri format, then the value is read from
* CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH at key
* provider creation time, and set to token's Service field.
* When a token is renewed / canceled, its Service field will be used to
* instantiate a KeyProvider, eliminating the need to read configs
* at that time.
*/
private final Text dtService;
// Allow fallback to default kms server port 9600 for certain tests that do
// not specify the port explicitly in the kms provider url.
@VisibleForTesting
public static volatile boolean fallbackDefaultPortForTesting = false;
private final boolean copyLegacyToken;
private class EncryptedQueueRefiller implements
ValueQueue.QueueRefiller<EncryptedKeyVersion> {
@ -162,66 +167,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
}
}
/**
* The KMS implementation of {@link TokenRenewer}.
*/
public static class KMSTokenRenewer extends TokenRenewer {
private static final Logger LOG =
LoggerFactory.getLogger(KMSTokenRenewer.class);
@Override
public boolean handleKind(Text kind) {
return kind.equals(TOKEN_KIND);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
@Override
public long renew(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Renewing delegation token {}", token);
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
KeyProviderFactory.KEY_PROVIDER_PATH);
try {
if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
LOG.warn("keyProvider {} cannot renew dt.", keyProvider == null ?
"null" : keyProvider.getClass());
return 0;
}
return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
keyProvider).renewDelegationToken(token);
} finally {
if (keyProvider != null) {
keyProvider.close();
}
}
}
@Override
public void cancel(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Canceling delegation token {}", token);
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
KeyProviderFactory.KEY_PROVIDER_PATH);
try {
if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
LOG.warn("keyProvider {} cannot cancel dt.", keyProvider == null ?
"null" : keyProvider.getClass());
return;
}
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
keyProvider).cancelDelegationToken(token);
} finally {
if (keyProvider != null) {
keyProvider.close();
}
}
}
}
public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
public KMSEncryptedKeyVersion(String keyName, String keyVersionName,
byte[] iv, String encryptedVersionName, byte[] keyMaterial) {
@ -281,13 +226,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
}
hostsPart = t[0];
}
return createProvider(conf, origUrl, port, hostsPart);
return createProvider(conf, origUrl, port, hostsPart, providerUri);
}
return null;
}
private KeyProvider createProvider(Configuration conf,
URL origUrl, int port, String hostsPart) throws IOException {
private KeyProvider createProvider(Configuration conf, URL origUrl,
int port, String hostsPart, URI providerUri) throws IOException {
String[] hosts = hostsPart.split(";");
KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
for (int i = 0; i < hosts.length; i++) {
@ -295,7 +240,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
providers[i] =
new KMSClientProvider(
new URI("kms", origUrl.getProtocol(), hosts[i], port,
origUrl.getPath(), null, null), conf);
origUrl.getPath(), null, null), conf, providerUri);
} catch (URISyntaxException e) {
throw new IOException("Could not instantiate KMSProvider.", e);
}
@ -353,17 +298,10 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
}
}
public KMSClientProvider(URI uri, Configuration conf) throws IOException {
public KMSClientProvider(URI uri, Configuration conf, URI providerUri) throws
IOException {
super(conf);
kmsUrl = createServiceURL(extractKMSPath(uri));
int kmsPort = kmsUrl.getPort();
if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
kmsPort = 9600;
}
InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
dtService = SecurityUtil.buildTokenService(addr);
if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
try {
@ -376,6 +314,9 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_SECONDS,
CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_DEFAULT);
authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY);
copyLegacyToken = conf.getBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY,
KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT);
configurator = new TimeoutConnConfigurator(timeout, sslFactory);
encKeyVersionQueue =
new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
@ -400,6 +341,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
new EncryptedQueueRefiller());
authToken = new DelegationTokenAuthenticatedURL.Token();
dtService = new Text(providerUri.toString());
LOG.info("KMSClientProvider for KMS url: {} delegation token service: {}" +
" created.", kmsUrl, dtService);
}
@ -473,7 +415,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
@Override
public HttpURLConnection run() throws Exception {
DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator);
createKMSAuthenticatedURL();
return authUrl.openConnection(url, authToken, doAsUser);
}
});
@ -924,7 +866,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
LOG.debug("Renewing delegation token {} with url:{}, as:{}",
token, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator);
createKMSAuthenticatedURL();
return getActualUgi().doAs(
new PrivilegedExceptionAction<Long>() {
@Override
@ -956,7 +898,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
dToken, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator);
createKMSAuthenticatedURL();
authUrl.cancelDelegationToken(url, token, doAsUser);
return null;
}
@ -1008,6 +950,17 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return token;
}
@VisibleForTesting
DelegationTokenAuthenticatedURL createKMSAuthenticatedURL() {
return new DelegationTokenAuthenticatedURL(configurator) {
@Override
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
getDelegationToken(URL url, Credentials creds) {
return selectKMSDelegationToken(creds);
}
};
}
@Override
public Token<?>[] addDelegationTokens(final String renewer,
Credentials credentials) throws IOException {
@ -1016,7 +969,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
if (token == null) {
final URL url = createURL(null, null, null, null);
final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator);
createKMSAuthenticatedURL();
try {
final String doAsUser = getDoAsUser();
token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
@ -1030,9 +983,16 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
}
});
if (token != null) {
LOG.debug("New token received: ({})", token);
if (KMSDelegationToken.TOKEN_KIND.equals(token.getKind())) {
// do not set service for legacy kind, for compatibility.
token.setService(dtService);
}
LOG.info("New token created: ({})", token);
credentials.addToken(token.getService(), token);
tokens = new Token<?>[] { token };
Token<?> legacyToken = createAndAddLegacyToken(credentials, token);
tokens = legacyToken == null ?
new Token<?>[] {token} :
new Token<?>[] {token, legacyToken};
} else {
throw new IOException("Got NULL as delegation token");
}
@ -1049,13 +1009,75 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return tokens;
}
private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
// Add existing credentials from the UGI, since provider is cached.
Credentials creds = ugi.getCredentials();
/**
* If {@link CommonConfigurationKeysPublic#KMS_CLIENT_COPY_LEGACY_TOKEN_KEY}
* is true when creating the provider, then copy the passed-in token of
* {@link KMSDelegationToken#TOKEN_KIND} and create a new token of
* {@link KMSDelegationToken#TOKEN_LEGACY_KIND}, and add it to credentials.
*
* @return The legacy token, or null if one should not be created.
*/
private Token<?> createAndAddLegacyToken(Credentials credentials,
Token<?> token) {
if (!copyLegacyToken || !KMSDelegationToken.TOKEN_KIND
.equals(token.getKind())) {
LOG.debug("Not creating legacy token because copyLegacyToken={}, "
+ "token={}", copyLegacyToken, token);
return null;
}
// copy a KMS_DELEGATION_TOKEN and create a new kms-dt with the same
// underlying token for backwards-compatibility. Old clients/renewers
// does not parse the new token and can only work with kms-dt.
final Token<?> legacyToken = token.copyToken();
legacyToken.setKind(KMSDelegationToken.TOKEN_LEGACY_KIND);
final InetSocketAddress addr =
new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
legacyToken.setService(fallBackServiceText);
LOG.info("Copied token to legacy kind: {}", legacyToken);
credentials.addToken(legacyToken.getService(), legacyToken);
return legacyToken;
}
@VisibleForTesting
public Text getDelegationTokenService() {
return dtService;
}
/**
* Given a list of tokens, return the token that should be used for KMS
* authentication.
*/
@VisibleForTesting
Token selectKMSDelegationToken(Credentials creds) {
// always look for TOKEN_KIND first
final TokenSelector<AbstractDelegationTokenIdentifier> tokenSelector =
new AbstractDelegationTokenSelector<AbstractDelegationTokenIdentifier>(
KMSDelegationToken.TOKEN_KIND) {
};
Token token = tokenSelector.selectToken(dtService, creds.getAllTokens());
LOG.debug("Searching service {} found token {}", dtService, token);
if (token != null) {
return token;
}
// fall back to look for token by service, regardless of kind.
// this is old behavior, keeping for compatibility reasons (for example,
// even if KMS server is new, if the job is submitted with an old kms
// client, job runners on new version should be able to find the token).
final InetSocketAddress addr =
new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
token = creds.getToken(fallBackServiceText);
LOG.debug("Selected delegation token {} using service:{}", token,
fallBackServiceText);
return token;
}
private boolean containsKmsDt(UserGroupInformation ugi) {
final Credentials creds = ugi.getCredentials();
if (!creds.getAllTokens().isEmpty()) {
LOG.debug("Searching for token that matches service: {}", dtService);
org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
dToken = creds.getToken(dtService);
final Token dToken = selectKMSDelegationToken(creds);
if (dToken != null) {
return true;
}

View File

@ -27,7 +27,10 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier
@InterfaceAudience.Private
public final class KMSDelegationToken {
public static final String TOKEN_KIND_STR = "kms-dt";
public static final String TOKEN_LEGACY_KIND_STR = "kms-dt";
public static final Text TOKEN_LEGACY_KIND = new Text(TOKEN_LEGACY_KIND_STR);
public static final String TOKEN_KIND_STR = "KMS_DELEGATION_TOKEN";
public static final Text TOKEN_KIND = new Text(TOKEN_KIND_STR);
// Utility class is not supposed to be instantiated.
@ -49,4 +52,21 @@ public final class KMSDelegationToken {
return TOKEN_KIND;
}
}
/**
* DelegationTokenIdentifier used for the KMS for legacy tokens.
*/
@Deprecated
public static class KMSLegacyDelegationTokenIdentifier
extends DelegationTokenIdentifier {
public KMSLegacyDelegationTokenIdentifier() {
super(TOKEN_LEGACY_KIND);
}
@Override
public Text getKind() {
return TOKEN_LEGACY_KIND;
}
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key.kms;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.KMSUtil;
import java.io.IOException;
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
/**
* The {@link KMSTokenRenewer} that supports legacy tokens.
*/
@InterfaceAudience.Private
@Deprecated
public class KMSLegacyTokenRenewer extends KMSTokenRenewer {
@Override
public boolean handleKind(Text kind) {
return kind.equals(TOKEN_LEGACY_KIND);
}
/**
* Create a key provider for token renewal / cancellation.
* Caller is responsible for closing the key provider.
*/
@Override
protected KeyProvider createKeyProvider(Token<?> token,
Configuration conf) throws IOException {
assert token.getKind().equals(TOKEN_LEGACY_KIND);
// Legacy tokens get service from configuration.
return KMSUtil.createKeyProvider(conf,
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key.kms;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.KMSUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
/**
* The KMS implementation of {@link TokenRenewer}.
*/
@InterfaceAudience.Private
public class KMSTokenRenewer extends TokenRenewer {
public static final Logger LOG = LoggerFactory
.getLogger(org.apache.hadoop.crypto.key.kms.KMSTokenRenewer.class);
@Override
public boolean handleKind(Text kind) {
return kind.equals(TOKEN_KIND);
}
@Override
public boolean isManaged(Token<?> token) throws IOException {
return true;
}
@Override
public long renew(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Renewing delegation token {}", token);
final KeyProvider keyProvider = createKeyProvider(token, conf);
try {
if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
LOG.warn("keyProvider {} cannot renew token {}.",
keyProvider == null ? "null" : keyProvider.getClass(), token);
return 0;
}
return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
keyProvider).renewDelegationToken(token);
} finally {
if (keyProvider != null) {
keyProvider.close();
}
}
}
@Override
public void cancel(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Canceling delegation token {}", token);
final KeyProvider keyProvider = createKeyProvider(token, conf);
try {
if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
LOG.warn("keyProvider {} cannot cancel token {}.",
keyProvider == null ? "null" : keyProvider.getClass(), token);
return;
}
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
keyProvider).cancelDelegationToken(token);
} finally {
if (keyProvider != null) {
keyProvider.close();
}
}
}
/**
* Create a key provider for token renewal / cancellation.
* Caller is responsible for closing the key provider.
*/
protected KeyProvider createKeyProvider(Token<?> token,
Configuration conf) throws IOException {
return KMSUtil
.createKeyProviderFromTokenService(conf, token.getService().toString());
}
}

View File

@ -0,0 +1,18 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key.kms;

View File

@ -770,6 +770,16 @@ public class CommonConfigurationKeysPublic {
/** Default value is 100 ms. */
public static final int KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT = 100;
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
* core-default.xml</a>
*/
public static final String KMS_CLIENT_COPY_LEGACY_TOKEN_KEY =
"hadoop.security.kms.client.copy.legacy.token";
/** Default value is true. */
public static final boolean KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT = true;
/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">

View File

@ -300,11 +300,7 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
creds.getAllTokens());
}
if (!creds.getAllTokens().isEmpty()) {
InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
url.getPort());
Text service = SecurityUtil.buildTokenService(serviceAddr);
dToken = creds.getToken(service);
LOG.debug("Using delegation token {} from service:{}", dToken, service);
dToken = getDelegationToken(url, creds);
if (dToken != null) {
if (useQueryStringForDelegationToken()) {
// delegation token will go in the query string, injecting it
@ -340,6 +336,21 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
return conn;
}
/**
* Select a delegation token from all tokens in credentials, based on url.
*/
@InterfaceAudience.Private
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
getDelegationToken(URL url, Credentials creds) {
final InetSocketAddress serviceAddr =
new InetSocketAddress(url.getHost(), url.getPort());
final Text service = SecurityUtil.buildTokenService(serviceAddr);
org.apache.hadoop.security.token.Token<? extends TokenIdentifier> dToken =
creds.getToken(service);
LOG.debug("Selected delegation token {} using service:{}", dToken, service);
return dToken;
}
/**
* Requests a delegation token using the configured <code>Authenticator</code>
* for authentication.

View File

@ -81,7 +81,7 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceStability.Evolving
public abstract class DelegationTokenAuthenticationHandler
implements AuthenticationHandler {
private static final Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(DelegationTokenAuthenticationHandler.class);
protected static final String TYPE_POSTFIX = "-dt";
@ -224,7 +224,8 @@ public abstract class DelegationTokenAuthenticationHandler
HttpServletRequest request, HttpServletResponse response)
throws IOException, AuthenticationException {
boolean requestContinues = true;
LOG.trace("Processing operation for req=({}), token: {}", request, token);
LOG.trace("Processing operation for req=({}), token: {}",
request.getRequestURL(), token);
String op = ServletUtils.getParameter(request,
KerberosDelegationTokenAuthenticator.OP_PARAM);
op = (op != null) ? StringUtils.toUpperCase(op) : null;
@ -404,7 +405,8 @@ public abstract class DelegationTokenAuthenticationHandler
HttpServletResponse.SC_FORBIDDEN, new AuthenticationException(ex));
}
} else {
LOG.debug("Falling back to {} (req={})", authHandler.getClass(), request);
LOG.debug("Falling back to {} (req={})", authHandler.getClass(),
request.getRequestURL());
token = authHandler.authenticate(request, response);
}
return token;

View File

@ -49,7 +49,7 @@ import java.util.Map;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class DelegationTokenAuthenticator implements Authenticator {
private static Logger LOG =
public static final Logger LOG =
LoggerFactory.getLogger(DelegationTokenAuthenticator.class);
private static final String CONTENT_TYPE = "Content-Type";

View File

@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@ -41,8 +42,7 @@ import java.util.Map;
*/
@InterfaceAudience.Private
public final class KMSUtil {
public static final Logger LOG =
LoggerFactory.getLogger(KMSUtil.class);
public static final Logger LOG = LoggerFactory.getLogger(KMSUtil.class);
private KMSUtil() { /* Hidden constructor */ }
@ -64,6 +64,13 @@ public final class KMSUtil {
if (providerUriStr == null || providerUriStr.isEmpty()) {
return null;
}
KeyProvider kp = KMSUtilFaultInjector.get().createKeyProviderForTests(
providerUriStr, conf);
if (kp != null) {
LOG.info("KeyProvider is created with uri: {}. This should happen only " +
"in tests.", providerUriStr);
return kp;
}
return createKeyProviderFromUri(conf, URI.create(providerUriStr));
}
@ -205,4 +212,38 @@ public final class KMSUtil {
}
return metadata;
}
/**
* Creates a key provider from token service field, which must be URI format.
*
* @param conf
* @param tokenServiceValue
* @return new KeyProvider or null
* @throws IOException
*/
public static KeyProvider createKeyProviderFromTokenService(
final Configuration conf, final String tokenServiceValue)
throws IOException {
LOG.debug("Creating key provider from token service value {}. ",
tokenServiceValue);
final KeyProvider kp = KMSUtilFaultInjector.get()
.createKeyProviderForTests(tokenServiceValue, conf);
if (kp != null) {
LOG.info("KeyProvider is created with uri: {}. This should happen only "
+ "in tests.", tokenServiceValue);
return kp;
}
if (!tokenServiceValue.contains("://")) {
throw new IllegalArgumentException(
"Invalid token service " + tokenServiceValue);
}
final URI tokenServiceUri;
try {
tokenServiceUri = new URI(tokenServiceValue);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(
"Invalid token service " + tokenServiceValue, e);
}
return createKeyProviderFromUri(conf, tokenServiceUri);
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import java.io.IOException;
/**
* Used for returning custom KeyProvider from test methods.
*/
@VisibleForTesting
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class KMSUtilFaultInjector {
private static KMSUtilFaultInjector instance = new KMSUtilFaultInjector();
public static KMSUtilFaultInjector get() {
return instance;
}
public static void set(KMSUtilFaultInjector injector) {
instance = injector;
}
public KeyProvider createKeyProviderForTests(String value, Configuration conf)
throws IOException {
return null;
}
}

View File

@ -12,3 +12,4 @@
# limitations under the License.
#
org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSDelegationTokenIdentifier
org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSLegacyDelegationTokenIdentifier

View File

@ -11,4 +11,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.hadoop.crypto.key.kms.KMSClientProvider$KMSTokenRenewer
org.apache.hadoop.crypto.key.kms.KMSTokenRenewer
org.apache.hadoop.crypto.key.kms.KMSLegacyTokenRenewer

View File

@ -2391,6 +2391,26 @@
</description>
</property>
<property>
<name>hadoop.security.kms.client.copy.legacy.token</name>
<value>true</value>
<description>
Expert only. Whether the KMS client provider should copy a token to legacy
kind. This is for KMS_DELEGATION_TOKEN to be backwards compatible. With the
default value set to true, the client will locally duplicate the
KMS_DELEGATION_TOKEN token and create a kms-dt token, with the service field
conforming to kms-dt. All other parts of the token remain the same.
Then the new clients will use KMS_DELEGATION_TOKEN and old clients will
use kms-dt to authenticate. Default value is true.
You should only change this to false if you know all the KMS servers
, clients (including both job submitters and job runners) and the
token renewers (usually Yarn RM) are on a version that supports
KMS_DELEGATION_TOKEN.
Turning this off prematurely may result in old clients failing to
authenticate with new servers.
</description>
</property>
<property>
<name>hadoop.security.kms.client.failover.sleep.max.millis</name>
<value>2000</value>

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key.kms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.event.Level;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.net.URL;
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
/**
* Unit test for {@link KMSClientProvider} class.
*/
public class TestKMSClientProvider {
public static final Logger LOG =
LoggerFactory.getLogger(TestKMSClientProvider.class);
private final Token token = new Token();
private final Token legacyToken = new Token();
private final String uriString = "kms://https@host:16000/kms";
private final String legacyTokenService = "host:16000";
@Rule
public Timeout globalTimeout = new Timeout(30000);
{
GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
}
@Before
public void setup() {
SecurityUtil.setTokenServiceUseIp(false);
token.setKind(TOKEN_KIND);
token.setService(new Text(uriString));
legacyToken.setKind(TOKEN_LEGACY_KIND);
legacyToken.setService(new Text(legacyTokenService));
}
@Test
public void testNotCopyFromLegacyToken() throws Exception {
final DelegationTokenAuthenticatedURL url =
mock(DelegationTokenAuthenticatedURL.class);
final Configuration conf = new Configuration();
final URI uri = new URI(uriString);
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
try {
final KMSClientProvider spyKp = spy(kp);
when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
when(url.getDelegationToken(any(URL.class),
any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
any(String.class))).thenReturn(legacyToken);
final Credentials creds = new Credentials();
final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
LOG.info("Got tokens: {}", tokens);
assertEquals(1, tokens.length);
LOG.info("uri:" + uriString);
// if KMS server returned a legacy token, new client should leave the
// service being legacy and not set uri string
assertEquals(legacyTokenService, tokens[0].getService().toString());
} finally {
kp.close();
}
}
@Test
public void testCopyFromToken() throws Exception {
final DelegationTokenAuthenticatedURL url =
mock(DelegationTokenAuthenticatedURL.class);
final Configuration conf = new Configuration();
final URI uri = new URI(uriString);
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
try {
final KMSClientProvider spyKp = spy(kp);
when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
when(url.getDelegationToken(any(URL.class),
any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
any(String.class))).thenReturn(token);
final Credentials creds = new Credentials();
final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
LOG.info("Got tokens: {}", tokens);
assertEquals(2, tokens.length);
assertTrue(creds.getAllTokens().contains(token));
assertNotNull(creds.getToken(legacyToken.getService()));
} finally {
kp.close();
}
}
@Test
public void testSelectTokenWhenBothExist() throws Exception {
final Credentials creds = new Credentials();
final Configuration conf = new Configuration();
final URI uri = new URI(uriString);
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
try {
creds.addToken(token.getService(), token);
creds.addToken(legacyToken.getService(), legacyToken);
Token t = kp.selectKMSDelegationToken(creds);
assertEquals(token, t);
} finally {
kp.close();
}
}
@Test
public void testSelectTokenLegacyService() throws Exception {
final Configuration conf = new Configuration();
final URI uri = new URI(uriString);
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
try {
Text legacyService = new Text(legacyTokenService);
token.setService(legacyService);
final Credentials creds = new Credentials();
creds.addToken(legacyService, token);
Token t = kp.selectKMSDelegationToken(creds);
assertEquals(token, t);
} finally {
kp.close();
}
}
}

View File

@ -42,7 +42,8 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.junit.After;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.KMSUtilFaultInjector;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
@ -56,33 +57,68 @@ public class TestLoadBalancingKMSClientProvider {
SecurityUtil.setTokenServiceUseIp(false);
}
@After
public void teardown() throws IOException {
KMSClientProvider.fallbackDefaultPortForTesting = false;
private void setKMSUtilFaultInjector() {
KMSUtilFaultInjector injector = new KMSUtilFaultInjector() {
@Override
public KeyProvider createKeyProviderForTests(
String value, Configuration conf) throws IOException {
return TestLoadBalancingKMSClientProvider
.createKeyProviderForTests(value, conf);
}
};
KMSUtilFaultInjector.set(injector);
}
public static KeyProvider createKeyProviderForTests(
String value, Configuration conf) throws IOException {
// The syntax for kms servers will be
// kms://http@localhost:port1/kms,kms://http@localhost:port2/kms
if (!value.contains(",")) {
return null;
}
String[] keyProviderUrisStr = value.split(",");
KMSClientProvider[] keyProviderArr =
new KMSClientProvider[keyProviderUrisStr.length];
int i = 0;
for (String keyProviderUri: keyProviderUrisStr) {
KMSClientProvider kmcp =
new KMSClientProvider(URI.create(keyProviderUri), conf, URI
.create(value));
keyProviderArr[i] = kmcp;
i++;
}
LoadBalancingKMSClientProvider lbkcp =
new LoadBalancingKMSClientProvider(keyProviderArr, conf);
return lbkcp;
}
@Test
public void testCreation() throws Exception {
Configuration conf = new Configuration();
KMSClientProvider.fallbackDefaultPortForTesting = true;
KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
"kms://http@host1/kms/foo"), conf);
"kms://http@host1:9600/kms/foo"), conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
KMSClientProvider[] providers =
((LoadBalancingKMSClientProvider) kp).getProviders();
assertEquals(1, providers.length);
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"),
assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"),
Sets.newHashSet(providers[0].getKMSUrl()));
kp = new KMSClientProvider.Factory().createProvider(new URI(
"kms://http@host1;host2;host3/kms/foo"), conf);
setKMSUtilFaultInjector();
String uriStr = "kms://http@host1:9600/kms/foo," +
"kms://http@host2:9600/kms/foo," +
"kms://http@host3:9600/kms/foo";
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
uriStr);
kp = KMSUtil.createKeyProvider(conf, CommonConfigurationKeysPublic
.HADOOP_SECURITY_KEY_PROVIDER_PATH);
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
providers =
((LoadBalancingKMSClientProvider) kp).getProviders();
assertEquals(3, providers.length);
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/",
"http://host2/kms/foo/v1/",
"http://host3/kms/foo/v1/"),
assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/",
"http://host2:9600/kms/foo/v1/",
"http://host3:9600/kms/foo/v1/"),
Sets.newHashSet(providers[0].getKMSUrl(),
providers[1].getKMSUrl(),
providers[2].getKMSUrl()));
@ -208,7 +244,7 @@ public class TestLoadBalancingKMSClientProvider {
private class MyKMSClientProvider extends KMSClientProvider {
public MyKMSClientProvider(URI uri, Configuration conf) throws IOException {
super(uri, conf);
super(uri, conf, uri);
}
@Override
@ -245,9 +281,8 @@ public class TestLoadBalancingKMSClientProvider {
@Test
public void testClassCastException() throws Exception {
Configuration conf = new Configuration();
KMSClientProvider.fallbackDefaultPortForTesting = true;
KMSClientProvider p1 = new MyKMSClientProvider(
new URI("kms://http@host1/kms/foo"), conf);
new URI("kms://http@host1:9600/kms/foo"), conf);
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
new KMSClientProvider[] {p1}, 0, conf);
try {

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
* Test {@link KMSUtil}.
*/
public class TestKMSUtil {
public static final Logger LOG = LoggerFactory.getLogger(TestKMSUtil.class);
@Rule
public Timeout globalTimeout = new Timeout(90000);
@Test
public void testCreateKeyProviderFromTokenService() throws Exception {
final Configuration conf = new Configuration();
KeyProvider kp = KMSUtil.createKeyProviderFromTokenService(conf,
"kms://https@localhost:9600/kms");
assertNotNull(kp);
kp.close();
kp = KMSUtil.createKeyProviderFromTokenService(conf,
"kms://https@localhost:9600/kms,kms://localhost1:9600/kms");
assertNotNull(kp);
kp.close();
String invalidService = "whatever:9600";
try {
KMSUtil.createKeyProviderFromTokenService(conf, invalidService);
} catch (Exception ex) {
LOG.info("Expected exception:", ex);
assertTrue(ex instanceof IllegalArgumentException);
GenericTestUtils.assertExceptionContains(
"Invalid token service " + invalidService, ex);
}
}
}

View File

@ -1,3 +1,4 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -31,26 +32,35 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
import org.apache.hadoop.crypto.key.kms.KMSDelegationToken;
import org.apache.hadoop.crypto.key.kms.KMSTokenRenewer;
import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
import org.apache.hadoop.crypto.key.kms.TestLoadBalancingKMSClientProvider;
import org.apache.hadoop.crypto.key.kms.ValueQueue;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.KMSUtilFaultInjector;
import org.apache.hadoop.util.Time;
import org.apache.http.client.utils.URIBuilder;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
@ -71,7 +81,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Writer;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.net.URI;
@ -96,6 +105,10 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@ -113,6 +126,20 @@ public class TestKMS {
private SSLFactory sslFactory;
private final KMSUtilFaultInjector oldInjector =
KMSUtilFaultInjector.get();
// Injector to create providers with different ports. Can only happen in tests
private final KMSUtilFaultInjector testInjector =
new KMSUtilFaultInjector() {
@Override
public KeyProvider createKeyProviderForTests(String value,
Configuration conf) throws IOException {
return TestLoadBalancingKMSClientProvider
.createKeyProviderForTests(value, conf);
}
};
// Keep track of all key providers created during a test case, so they can be
// closed at test tearDown.
private List<KeyProvider> providersCreated = new LinkedList<>();
@ -122,7 +149,12 @@ public class TestKMS {
@Before
public void setUp() throws Exception {
setUpMiniKdc();
GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
GenericTestUtils
.setLogLevel(DelegationTokenAuthenticationHandler.LOG, Level.TRACE);
GenericTestUtils
.setLogLevel(DelegationTokenAuthenticator.LOG, Level.TRACE);
GenericTestUtils.setLogLevel(KMSUtil.LOG, Level.TRACE);
// resetting kerberos security
Configuration conf = new Configuration();
UserGroupInformation.setConfiguration(conf);
@ -141,24 +173,78 @@ public class TestKMS {
}
public static abstract class KMSCallable<T> implements Callable<T> {
private URL kmsUrl;
private List<URL> kmsUrl;
protected URL getKMSUrl() {
return kmsUrl;
return kmsUrl.get(0);
}
protected URL[] getKMSHAUrl() {
URL[] urls = new URL[kmsUrl.size()];
return kmsUrl.toArray(urls);
}
protected void addKMSUrl(URL url) {
if (kmsUrl == null) {
kmsUrl = new ArrayList<URL>();
}
kmsUrl.add(url);
}
/*
* The format of the returned value will be
* kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
*/
protected String generateLoadBalancingKeyProviderUriString() {
if (kmsUrl == null || kmsUrl.size() == 0) {
return null;
}
StringBuffer sb = new StringBuffer();
for (int i = 0; i < kmsUrl.size(); i++) {
sb.append(KMSClientProvider.SCHEME_NAME + "://" +
kmsUrl.get(0).getProtocol() + "@");
URL url = kmsUrl.get(i);
sb.append(url.getAuthority());
if (url.getPath() != null) {
sb.append(url.getPath());
}
if (i < kmsUrl.size() - 1) {
sb.append(",");
}
}
return sb.toString();
}
}
protected KeyProvider createProvider(URI uri, Configuration conf)
throws IOException {
final KeyProvider ret = new LoadBalancingKMSClientProvider(
new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf);
new KMSClientProvider[] {new KMSClientProvider(uri, conf, uri)}, conf);
providersCreated.add(ret);
return ret;
}
/**
* create a LoadBalancingKMSClientProvider from an array of URIs.
* @param uris an array of KMS URIs
* @param conf configuration object
* @return a LoadBalancingKMSClientProvider object
* @throws IOException
*/
protected LoadBalancingKMSClientProvider createHAProvider(URI[] uris,
Configuration conf, String originalUri) throws IOException {
KMSClientProvider[] providers = new KMSClientProvider[uris.length];
for (int i = 0; i < providers.length; i++) {
providers[i] =
new KMSClientProvider(uris[i], conf, URI.create(originalUri));
}
return new LoadBalancingKMSClientProvider(providers, conf);
}
private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf)
throws IOException {
final KMSClientProvider ret = new KMSClientProvider(uri, conf);
final KMSClientProvider ret = new KMSClientProvider(uri, conf, uri);
providersCreated.add(ret);
return ret;
}
@ -170,22 +256,33 @@ public class TestKMS {
protected <T> T runServer(int port, String keystore, String password, File confDir,
KMSCallable<T> callable) throws Exception {
return runServer(new int[] {port}, keystore, password, confDir, callable);
}
protected <T> T runServer(int[] ports, String keystore, String password,
File confDir, KMSCallable<T> callable) throws Exception {
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
.setLog4jConfFile("log4j.properties");
if (keystore != null) {
miniKMSBuilder.setSslConf(new File(keystore), password);
}
if (port > 0) {
miniKMSBuilder.setPort(port);
final List<MiniKMS> kmsList = new ArrayList<>();
for (int i=0; i< ports.length; i++) {
if (ports[i] > 0) {
miniKMSBuilder.setPort(ports[i]);
}
MiniKMS miniKMS = miniKMSBuilder.build();
kmsList.add(miniKMS);
miniKMS.start();
LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
callable.addKMSUrl(miniKMS.getKMSUrl());
}
MiniKMS miniKMS = miniKMSBuilder.build();
miniKMS.start();
try {
System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
callable.kmsUrl = miniKMS.getKMSUrl();
return callable.call();
} finally {
miniKMS.stop();
for (MiniKMS miniKMS: kmsList) {
miniKMS.stop();
}
}
}
@ -240,6 +337,13 @@ public class TestKMS {
return new URI("kms://" + str);
}
public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
URI[] uris = new URI[kmsUrls.length];
for (int i = 0; i < kmsUrls.length; i++) {
uris[i] = createKMSUri(kmsUrls[i]);
}
return uris;
}
private static class KerberosConfiguration
extends javax.security.auth.login.Configuration {
@ -315,19 +419,17 @@ public class TestKMS {
principals.toArray(new String[principals.size()]));
}
private void setUpMiniKdc() throws Exception {
@BeforeClass
public static void setUpMiniKdc() throws Exception {
Properties kdcConf = MiniKdc.createConf();
setUpMiniKdc(kdcConf);
}
@After
public void tearDown() throws Exception {
if (kdc != null) {
kdc.stop();
kdc = null;
}
UserGroupInformation.setShouldRenewImmediatelyForTests(false);
UserGroupInformation.reset();
KMSUtilFaultInjector.set(oldInjector);
if (!providersCreated.isEmpty()) {
final MultipleIOException.Builder b = new MultipleIOException.Builder();
for (KeyProvider kp : providersCreated) {
@ -345,6 +447,14 @@ public class TestKMS {
}
}
@AfterClass
public static void shutdownMiniKdc() {
if (kdc != null) {
kdc.stop();
kdc = null;
}
}
private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
throws Exception {
UserGroupInformation.loginUserFromKeytab(user, keytab.getAbsolutePath());
@ -501,8 +611,10 @@ public class TestKMS {
Token<?>[] tokens =
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
.addDelegationTokens("myuser", new Credentials());
Assert.assertEquals(1, tokens.length);
Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
assertEquals(2, tokens.length);
assertEquals(KMSDelegationToken.TOKEN_KIND,
tokens[0].getKind());
kp.close();
return null;
}
});
@ -518,8 +630,9 @@ public class TestKMS {
Token<?>[] tokens =
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
.addDelegationTokens("myuser", new Credentials());
Assert.assertEquals(1, tokens.length);
Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
assertEquals(2, tokens.length);
assertEquals(KMSDelegationToken.TOKEN_KIND, tokens[0].getKind());
kp.close();
}
return null;
}
@ -2011,7 +2124,6 @@ public class TestKMS {
return null;
}
});
nonKerberosUgi.addCredentials(credentials);
try {
@ -2067,6 +2179,17 @@ public class TestKMS {
testDelegationTokensOps(true, true);
}
private Text getTokenService(KeyProvider provider) {
assertTrue("KeyProvider should be an instance of KMSClientProvider",
(provider instanceof LoadBalancingKMSClientProvider));
assertEquals("Num client providers should be 1", 1,
((LoadBalancingKMSClientProvider)provider).getProviders().length);
Text tokenService =
(((LoadBalancingKMSClientProvider)provider).getProviders()[0])
.getDelegationTokenService();
return tokenService;
}
private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
throws Exception {
final File confDir = getTestDir();
@ -2098,11 +2221,16 @@ public class TestKMS {
final URI uri = createKMSUri(getKMSUrl());
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
createKMSUri(getKMSUrl()).toString());
clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
doAs("client", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KeyProvider kp = createProvider(uri, clientConf);
// Unset the conf value for key provider path just to be sure that
// the key provider created for renew and cancel token is from
// token service field.
clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
// test delegation token retrieval
KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
@ -2110,13 +2238,10 @@ public class TestKMS {
final Credentials credentials = new Credentials();
final Token<?>[] tokens =
kpdte.addDelegationTokens("client1", credentials);
Assert.assertEquals(1, credentials.getAllTokens().size());
InetSocketAddress kmsAddr =
new InetSocketAddress(getKMSUrl().getHost(),
getKMSUrl().getPort());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
getKind());
Text tokenService = getTokenService(kp);
assertEquals(1, credentials.getAllTokens().size());
assertEquals(TOKEN_KIND,
credentials.getToken(tokenService).getKind());
// Test non-renewer user cannot renew.
for (Token<?> token : tokens) {
@ -2243,12 +2368,11 @@ public class TestKMS {
final URI uri = createKMSUri(getKMSUrl());
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
createKMSUri(getKMSUrl()).toString());
clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
final KeyProvider kp = createProvider(uri, clientConf);
final KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(kp);
final InetSocketAddress kmsAddr =
new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort());
// Job 1 (e.g. YARN log aggregation job), with user DT.
final Collection<Token<?>> job1Token = new HashSet<>();
@ -2258,16 +2382,17 @@ public class TestKMS {
// Get a DT and use it.
final Credentials credentials = new Credentials();
kpdte.addDelegationTokens("client", credentials);
Text tokenService = getTokenService(kp);
Assert.assertEquals(1, credentials.getAllTokens().size());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
UserGroupInformation.getCurrentUser().addCredentials(credentials);
LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
getCurrentUser().getCredentials().getAllTokens());
Token<?> token =
final Token<?> token =
UserGroupInformation.getCurrentUser().getCredentials()
.getToken(SecurityUtil.buildTokenService(kmsAddr));
Assert.assertNotNull(token);
.getToken(tokenService);
assertNotNull(token);
assertEquals(TOKEN_KIND, token.getKind());
job1Token.add(token);
// Decode the token to get max time.
@ -2302,17 +2427,16 @@ public class TestKMS {
// Get a new DT, but don't use it yet.
final Credentials newCreds = new Credentials();
kpdte.addDelegationTokens("client", newCreds);
Assert.assertEquals(1, newCreds.getAllTokens().size());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
getKind());
assertEquals(1, newCreds.getAllTokens().size());
final Text tokenService = getTokenService(kp);
assertEquals(TOKEN_KIND,
newCreds.getToken(tokenService).getKind());
// Using job 1's DT should fail.
final Credentials oldCreds = new Credentials();
for (Token<?> token : job1Token) {
if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
oldCreds
.addToken(SecurityUtil.buildTokenService(kmsAddr), token);
if (token.getKind().equals(TOKEN_KIND)) {
oldCreds.addToken(tokenService, token);
}
}
UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
@ -2326,12 +2450,11 @@ public class TestKMS {
}
// Using the new DT should succeed.
Assert.assertEquals(1, newCreds.getAllTokens().size());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
getKind());
assertEquals(1, newCreds.getAllTokens().size());
assertEquals(TOKEN_KIND,
newCreds.getToken(tokenService).getKind());
UserGroupInformation.getCurrentUser().addCredentials(newCreds);
LOG.info("Credetials now are: {}", UserGroupInformation
LOG.info("Credentials now are: {}", UserGroupInformation
.getCurrentUser().getCredentials().getAllTokens());
kp.getKeys();
return null;
@ -2357,7 +2480,13 @@ public class TestKMS {
doKMSWithZK(true, true);
}
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
KMSCallable<T> callable) throws Exception {
return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
}
private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
KMSCallable<T> callable, int kmsSize) throws Exception {
TestingServer zkServer = null;
try {
zkServer = new TestingServer();
@ -2403,43 +2532,265 @@ public class TestKMS {
writeConf(testDir, conf);
KMSCallable<KeyProvider> c =
new KMSCallable<KeyProvider>() {
@Override
public KeyProvider call() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
final URI uri = createKMSUri(getKMSUrl());
final KeyProvider kp =
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<KeyProvider>() {
@Override
public KeyProvider run() throws Exception {
KeyProvider kp = createProvider(uri, conf);
kp.createKey("k1", new byte[16],
new KeyProvider.Options(conf));
kp.createKey("k2", new byte[16],
new KeyProvider.Options(conf));
kp.createKey("k3", new byte[16],
new KeyProvider.Options(conf));
return kp;
}
});
return kp;
}
};
runServer(null, null, testDir, c);
int[] ports = new int[kmsSize];
for (int i = 0; i < ports.length; i++) {
ports[i] = -1;
}
return runServer(ports, null, null, testDir, callable);
} finally {
if (zkServer != null) {
zkServer.stop();
zkServer.close();
}
}
}
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
KMSCallable<KeyProvider> c =
new KMSCallable<KeyProvider>() {
@Override
public KeyProvider call() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
final URI uri = createKMSUri(getKMSUrl());
final KeyProvider kp =
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<KeyProvider>() {
@Override
public KeyProvider run() throws Exception {
KeyProvider kp = createProvider(uri, conf);
kp.createKey("k1", new byte[16],
new KeyProvider.Options(conf));
kp.createKey("k2", new byte[16],
new KeyProvider.Options(conf));
kp.createKey("k3", new byte[16],
new KeyProvider.Options(conf));
return kp;
}
});
return kp;
}
};
runServerWithZooKeeper(zkDTSM, zkSigner, c);
}
@Test
public void doKMSHAZKWithDelegationTokenAccess() throws Exception {
KMSCallable<Void> c = new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
final URI[] uris = createKMSHAUri(getKMSHAUrl());
final Credentials credentials = new Credentials();
final String lbUri = generateLoadBalancingKeyProviderUriString();
final LoadBalancingKMSClientProvider lbkp =
createHAProvider(uris, conf, lbUri);
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
// Login as a Kerberos user principal using keytab.
// Connect to KMS to create a delegation token and add it to credentials
final String keyName = "k0";
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(lbkp);
kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
kpdte.createKey(keyName, new KeyProvider.Options(conf));
return null;
}
});
assertTokenIdentifierEquals(credentials);
final LoadBalancingKMSClientProvider lbkp1 =
createHAProvider(uris, conf, lbUri);
// verify both tokens can be used to authenticate
for (Token t : credentials.getAllTokens()) {
assertTokenAccess(lbkp1, keyName, t);
}
return null;
}
};
runServerWithZooKeeper(true, true, c, 2);
}
/**
* Assert that the passed in credentials have 2 tokens, of kind
* {@link KMSDelegationToken#TOKEN_KIND} and
* {@link KMSDelegationToken#TOKEN_LEGACY_KIND}. Assert that the 2 tokens have
* the same identifier.
*/
private void assertTokenIdentifierEquals(Credentials credentials)
throws IOException {
// verify the 2 tokens have the same identifier
assertEquals(2, credentials.getAllTokens().size());
Token token = null;
Token legacyToken = null;
for (Token t : credentials.getAllTokens()) {
if (KMSDelegationToken.TOKEN_KIND.equals(t.getKind())) {
token = t;
} else if (KMSDelegationToken.TOKEN_LEGACY_KIND.equals(t.getKind())) {
legacyToken = t;
}
}
assertNotNull(token);
assertNotNull(legacyToken);
final DelegationTokenIdentifier tokenId =
(DelegationTokenIdentifier) token.decodeIdentifier();
final DelegationTokenIdentifier legacyTokenId =
(DelegationTokenIdentifier) legacyToken.decodeIdentifier();
assertEquals("KMS DT and legacy dt should have identical identifier",
tokenId, legacyTokenId);
}
/**
* Tests token access with each providers in the
* {@link LoadBalancingKMSClientProvider}. This is to make sure the 2 token
* kinds are compatible and can both be used to authenticate.
*/
private void assertTokenAccess(final LoadBalancingKMSClientProvider lbkp,
final String keyName, final Token token) throws Exception {
UserGroupInformation tokenUgi =
UserGroupInformation.createUserForTesting("test", new String[] {});
// Verify the tokens can authenticate to any KMS
tokenUgi.addToken(token);
tokenUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// Create a kms client with one provider at a time. Must use one
// provider so that if it fails to authenticate, it does not fall
// back to the next KMS instance.
// It should succeed because its delegation token can access any
// KMS instances.
for (KMSClientProvider provider : lbkp.getProviders()) {
if (token.getKind().equals(TOKEN_LEGACY_KIND) && !token.getService()
.equals(provider.getDelegationTokenService())) {
// Historically known issue: Legacy token can only work with the
// key provider specified in the token's Service
continue;
}
LOG.info("Rolling key {} via provider {} with token {}.", keyName,
provider, token);
provider.rollNewVersion(keyName);
}
return null;
}
});
}
@Test
public void testKMSHAZKDelegationTokenRenewCancel() throws Exception {
testKMSHAZKDelegationTokenRenewCancel(TOKEN_KIND);
}
@Test
public void testKMSHAZKDelegationTokenRenewCancelLegacy() throws Exception {
testKMSHAZKDelegationTokenRenewCancel(TOKEN_LEGACY_KIND);
}
private void testKMSHAZKDelegationTokenRenewCancel(final Text tokenKind)
throws Exception {
GenericTestUtils.setLogLevel(KMSTokenRenewer.LOG, Level.TRACE);
assertTrue(tokenKind == TOKEN_KIND || tokenKind == TOKEN_LEGACY_KIND);
KMSCallable<Void> c = new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
final URI[] uris = createKMSHAUri(getKMSHAUrl());
final Credentials credentials = new Credentials();
// Create a UGI without Kerberos auth. It will be authenticated with
// delegation token.
final UserGroupInformation nonKerberosUgi =
UserGroupInformation.getCurrentUser();
final String lbUri = generateLoadBalancingKeyProviderUriString();
final LoadBalancingKMSClientProvider lbkp =
createHAProvider(uris, conf, lbUri);
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
// Login as a Kerberos user principal using keytab.
// Connect to KMS to create a delegation token and add it to credentials
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(lbkp);
kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
return null;
}
});
// Test token renewal and cancellation
final Collection<Token<? extends TokenIdentifier>> tokens =
credentials.getAllTokens();
doAs("SET_KEY_MATERIAL", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
Assert.assertEquals(2, tokens.size());
boolean tokenFound = false;
for (Token token : tokens) {
if (!tokenKind.equals(token.getKind())) {
continue;
} else {
tokenFound = true;
}
KMSUtilFaultInjector.set(testInjector);
setupConfForToken(token.getKind(), conf, lbUri);
LOG.info("Testing token: {}", token);
long tokenLife = token.renew(conf);
LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
Thread.sleep(10);
long newTokenLife = token.renew(conf);
LOG.info("Renewed token {}, new lifetime:{}", token,
newTokenLife);
assertTrue(newTokenLife > tokenLife);
boolean canceled = false;
// test delegation token cancellation
if (!canceled) {
token.cancel(conf);
LOG.info("Cancelled token {}", token);
canceled = true;
}
assertTrue("token should have been canceled", canceled);
try {
token.renew(conf);
fail("should not be able to renew a canceled token " + token);
} catch (Exception e) {
LOG.info("Expected exception when renewing token", e);
}
}
assertTrue("Should have found token kind " + tokenKind + " from "
+ tokens, tokenFound);
return null;
}
});
return null;
}
};
runServerWithZooKeeper(true, true, c, 2);
}
/**
* Set or unset the key provider configuration based on token kind.
*/
private void setupConfForToken(Text tokenKind, Configuration conf,
String lbUri) {
if (tokenKind.equals(TOKEN_KIND)) {
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
} else {
// conf is only required for legacy tokens to create provider,
// new tokens create provider by parsing its own Service field
assertEquals(TOKEN_LEGACY_KIND, tokenKind);
conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
}
}
@Test
public void testProxyUserKerb() throws Exception {
@ -2558,6 +2909,16 @@ public class TestKMS {
@Test
public void testTGTRenewal() throws Exception {
shutdownMiniKdc();
try {
testTgtRenewalInt();
} finally {
shutdownMiniKdc();
setUpMiniKdc();
}
}
private void testTgtRenewalInt() throws Exception {
Properties kdcConf = MiniKdc.createConf();
kdcConf.setProperty(MiniKdc.MAX_TICKET_LIFETIME, "3");
kdcConf.setProperty(MiniKdc.MIN_TICKET_LIFETIME, "3");