From 583fa6ed48ad3df40bcaa9c591d5ccd07ce3ea81 Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Tue, 10 Apr 2018 15:26:33 -0700 Subject: [PATCH] HADOOP-14445. Delegation tokens are not shared between KMS instances. Contributed by Xiao Chen and Rushabh S Shah. --- .../crypto/key/kms/KMSClientProvider.java | 212 +++---- .../crypto/key/kms/KMSDelegationToken.java | 22 +- .../crypto/key/kms/KMSLegacyTokenRenewer.java | 56 ++ .../crypto/key/kms/KMSTokenRenewer.java | 103 ++++ .../hadoop/crypto/key/kms/package-info.java | 18 + .../fs/CommonConfigurationKeysPublic.java | 10 + .../web/DelegationTokenAuthenticatedURL.java | 21 +- .../DelegationTokenAuthenticationHandler.java | 8 +- .../web/DelegationTokenAuthenticator.java | 2 +- .../java/org/apache/hadoop/util/KMSUtil.java | 45 +- .../hadoop/util/KMSUtilFaultInjector.java | 49 ++ ...ache.hadoop.security.token.TokenIdentifier | 1 + ....apache.hadoop.security.token.TokenRenewer | 3 +- .../src/main/resources/core-default.xml | 20 + .../crypto/key/kms/TestKMSClientProvider.java | 162 ++++++ .../TestLoadBalancingKMSClientProvider.java | 67 ++- .../org/apache/hadoop/util/TestKMSUtil.java | 65 +++ .../hadoop/crypto/key/kms/server/TestKMS.java | 521 +++++++++++++++--- 18 files changed, 1181 insertions(+), 204 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index 2eb2e211377..f97fde7ac9c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -36,8 +36,9 @@ import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; import org.apache.hadoop.util.HttpExceptionUtils; import org.apache.hadoop.util.KMSUtil; @@ -82,6 +83,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT; import static org.apache.hadoop.util.KMSUtil.checkNotEmpty; import static org.apache.hadoop.util.KMSUtil.checkNotNull; import static org.apache.hadoop.util.KMSUtil.parseJSONEncKeyVersion; @@ -96,16 +99,13 @@ import static org.apache.hadoop.util.KMSUtil.parseJSONMetadata; public class KMSClientProvider extends KeyProvider implements CryptoExtension, KeyProviderDelegationTokenExtension.DelegationTokenExtension { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(KMSClientProvider.class); private static final String INVALID_SIGNATURE = "Invalid signature"; private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed"; - public static final String TOKEN_KIND_STR = KMSDelegationToken.TOKEN_KIND_STR; - public static final Text TOKEN_KIND = KMSDelegationToken.TOKEN_KIND; - public static final String SCHEME_NAME = "kms"; private static final String UTF8 = "UTF-8"; @@ -133,12 +133,17 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, private static final ObjectWriter WRITER = new ObjectMapper().writerWithDefaultPrettyPrinter(); + /* dtService defines the token service value for the kms token. + * The value can be legacy format which is ip:port format or it can be uri. + * If it's uri format, then the value is read from + * CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH at key + * provider creation time, and set to token's Service field. + * When a token is renewed / canceled, its Service field will be used to + * instantiate a KeyProvider, eliminating the need to read configs + * at that time. + */ private final Text dtService; - - // Allow fallback to default kms server port 9600 for certain tests that do - // not specify the port explicitly in the kms provider url. - @VisibleForTesting - public static volatile boolean fallbackDefaultPortForTesting = false; + private final boolean copyLegacyToken; private class EncryptedQueueRefiller implements ValueQueue.QueueRefiller { @@ -162,66 +167,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, } } - /** - * The KMS implementation of {@link TokenRenewer}. - */ - public static class KMSTokenRenewer extends TokenRenewer { - private static final Logger LOG = - LoggerFactory.getLogger(KMSTokenRenewer.class); - - @Override - public boolean handleKind(Text kind) { - return kind.equals(TOKEN_KIND); - } - - @Override - public boolean isManaged(Token token) throws IOException { - return true; - } - - @Override - public long renew(Token token, Configuration conf) throws IOException { - LOG.debug("Renewing delegation token {}", token); - KeyProvider keyProvider = KMSUtil.createKeyProvider(conf, - KeyProviderFactory.KEY_PROVIDER_PATH); - try { - if (!(keyProvider instanceof - KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { - LOG.warn("keyProvider {} cannot renew dt.", keyProvider == null ? - "null" : keyProvider.getClass()); - return 0; - } - return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension) - keyProvider).renewDelegationToken(token); - } finally { - if (keyProvider != null) { - keyProvider.close(); - } - } - } - - @Override - public void cancel(Token token, Configuration conf) throws IOException { - LOG.debug("Canceling delegation token {}", token); - KeyProvider keyProvider = KMSUtil.createKeyProvider(conf, - KeyProviderFactory.KEY_PROVIDER_PATH); - try { - if (!(keyProvider instanceof - KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { - LOG.warn("keyProvider {} cannot cancel dt.", keyProvider == null ? - "null" : keyProvider.getClass()); - return; - } - ((KeyProviderDelegationTokenExtension.DelegationTokenExtension) - keyProvider).cancelDelegationToken(token); - } finally { - if (keyProvider != null) { - keyProvider.close(); - } - } - } - } - public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion { public KMSEncryptedKeyVersion(String keyName, String keyVersionName, byte[] iv, String encryptedVersionName, byte[] keyMaterial) { @@ -281,13 +226,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, } hostsPart = t[0]; } - return createProvider(conf, origUrl, port, hostsPart); + return createProvider(conf, origUrl, port, hostsPart, providerUri); } return null; } - private KeyProvider createProvider(Configuration conf, - URL origUrl, int port, String hostsPart) throws IOException { + private KeyProvider createProvider(Configuration conf, URL origUrl, + int port, String hostsPart, URI providerUri) throws IOException { String[] hosts = hostsPart.split(";"); KMSClientProvider[] providers = new KMSClientProvider[hosts.length]; for (int i = 0; i < hosts.length; i++) { @@ -295,7 +240,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, providers[i] = new KMSClientProvider( new URI("kms", origUrl.getProtocol(), hosts[i], port, - origUrl.getPath(), null, null), conf); + origUrl.getPath(), null, null), conf, providerUri); } catch (URISyntaxException e) { throw new IOException("Could not instantiate KMSProvider.", e); } @@ -353,17 +298,10 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, } } - public KMSClientProvider(URI uri, Configuration conf) throws IOException { + public KMSClientProvider(URI uri, Configuration conf, URI providerUri) throws + IOException { super(conf); kmsUrl = createServiceURL(extractKMSPath(uri)); - int kmsPort = kmsUrl.getPort(); - if ((kmsPort == -1) && fallbackDefaultPortForTesting) { - kmsPort = 9600; - } - - InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort); - dtService = SecurityUtil.buildTokenService(addr); - if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) { sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); try { @@ -376,6 +314,9 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_SECONDS, CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_DEFAULT); authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY); + copyLegacyToken = conf.getBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, + KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT); + configurator = new TimeoutConnConfigurator(timeout, sslFactory); encKeyVersionQueue = new ValueQueue( @@ -400,6 +341,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT), new EncryptedQueueRefiller()); authToken = new DelegationTokenAuthenticatedURL.Token(); + dtService = new Text(providerUri.toString()); LOG.info("KMSClientProvider for KMS url: {} delegation token service: {}" + " created.", kmsUrl, dtService); } @@ -473,7 +415,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, @Override public HttpURLConnection run() throws Exception { DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(configurator); + createKMSAuthenticatedURL(); return authUrl.openConnection(url, authToken, doAsUser); } }); @@ -924,7 +866,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, LOG.debug("Renewing delegation token {} with url:{}, as:{}", token, url, doAsUser); final DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(configurator); + createKMSAuthenticatedURL(); return getActualUgi().doAs( new PrivilegedExceptionAction() { @Override @@ -956,7 +898,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, LOG.debug("Cancelling delegation token {} with url:{}, as:{}", dToken, url, doAsUser); final DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(configurator); + createKMSAuthenticatedURL(); authUrl.cancelDelegationToken(url, token, doAsUser); return null; } @@ -1008,6 +950,17 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, return token; } + @VisibleForTesting + DelegationTokenAuthenticatedURL createKMSAuthenticatedURL() { + return new DelegationTokenAuthenticatedURL(configurator) { + @Override + public org.apache.hadoop.security.token.Token + getDelegationToken(URL url, Credentials creds) { + return selectKMSDelegationToken(creds); + } + }; + } + @Override public Token[] addDelegationTokens(final String renewer, Credentials credentials) throws IOException { @@ -1016,7 +969,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, if (token == null) { final URL url = createURL(null, null, null, null); final DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(configurator); + createKMSAuthenticatedURL(); try { final String doAsUser = getDoAsUser(); token = getActualUgi().doAs(new PrivilegedExceptionAction>() { @@ -1030,9 +983,16 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, } }); if (token != null) { - LOG.debug("New token received: ({})", token); + if (KMSDelegationToken.TOKEN_KIND.equals(token.getKind())) { + // do not set service for legacy kind, for compatibility. + token.setService(dtService); + } + LOG.info("New token created: ({})", token); credentials.addToken(token.getService(), token); - tokens = new Token[] { token }; + Token legacyToken = createAndAddLegacyToken(credentials, token); + tokens = legacyToken == null ? + new Token[] {token} : + new Token[] {token, legacyToken}; } else { throw new IOException("Got NULL as delegation token"); } @@ -1049,13 +1009,75 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, return tokens; } - private boolean containsKmsDt(UserGroupInformation ugi) throws IOException { - // Add existing credentials from the UGI, since provider is cached. - Credentials creds = ugi.getCredentials(); + /** + * If {@link CommonConfigurationKeysPublic#KMS_CLIENT_COPY_LEGACY_TOKEN_KEY} + * is true when creating the provider, then copy the passed-in token of + * {@link KMSDelegationToken#TOKEN_KIND} and create a new token of + * {@link KMSDelegationToken#TOKEN_LEGACY_KIND}, and add it to credentials. + * + * @return The legacy token, or null if one should not be created. + */ + private Token createAndAddLegacyToken(Credentials credentials, + Token token) { + if (!copyLegacyToken || !KMSDelegationToken.TOKEN_KIND + .equals(token.getKind())) { + LOG.debug("Not creating legacy token because copyLegacyToken={}, " + + "token={}", copyLegacyToken, token); + return null; + } + // copy a KMS_DELEGATION_TOKEN and create a new kms-dt with the same + // underlying token for backwards-compatibility. Old clients/renewers + // does not parse the new token and can only work with kms-dt. + final Token legacyToken = token.copyToken(); + legacyToken.setKind(KMSDelegationToken.TOKEN_LEGACY_KIND); + final InetSocketAddress addr = + new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort()); + final Text fallBackServiceText = SecurityUtil.buildTokenService(addr); + legacyToken.setService(fallBackServiceText); + LOG.info("Copied token to legacy kind: {}", legacyToken); + credentials.addToken(legacyToken.getService(), legacyToken); + return legacyToken; + } + + @VisibleForTesting + public Text getDelegationTokenService() { + return dtService; + } + + /** + * Given a list of tokens, return the token that should be used for KMS + * authentication. + */ + @VisibleForTesting + Token selectKMSDelegationToken(Credentials creds) { + // always look for TOKEN_KIND first + final TokenSelector tokenSelector = + new AbstractDelegationTokenSelector( + KMSDelegationToken.TOKEN_KIND) { + }; + Token token = tokenSelector.selectToken(dtService, creds.getAllTokens()); + LOG.debug("Searching service {} found token {}", dtService, token); + if (token != null) { + return token; + } + + // fall back to look for token by service, regardless of kind. + // this is old behavior, keeping for compatibility reasons (for example, + // even if KMS server is new, if the job is submitted with an old kms + // client, job runners on new version should be able to find the token). + final InetSocketAddress addr = + new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort()); + final Text fallBackServiceText = SecurityUtil.buildTokenService(addr); + token = creds.getToken(fallBackServiceText); + LOG.debug("Selected delegation token {} using service:{}", token, + fallBackServiceText); + return token; + } + + private boolean containsKmsDt(UserGroupInformation ugi) { + final Credentials creds = ugi.getCredentials(); if (!creds.getAllTokens().isEmpty()) { - LOG.debug("Searching for token that matches service: {}", dtService); - org.apache.hadoop.security.token.Token - dToken = creds.getToken(dtService); + final Token dToken = selectKMSDelegationToken(creds); if (dToken != null) { return true; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java index adeebf21c52..2642e79d643 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java @@ -27,7 +27,10 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier @InterfaceAudience.Private public final class KMSDelegationToken { - public static final String TOKEN_KIND_STR = "kms-dt"; + public static final String TOKEN_LEGACY_KIND_STR = "kms-dt"; + public static final Text TOKEN_LEGACY_KIND = new Text(TOKEN_LEGACY_KIND_STR); + + public static final String TOKEN_KIND_STR = "KMS_DELEGATION_TOKEN"; public static final Text TOKEN_KIND = new Text(TOKEN_KIND_STR); // Utility class is not supposed to be instantiated. @@ -49,4 +52,21 @@ public final class KMSDelegationToken { return TOKEN_KIND; } } + + /** + * DelegationTokenIdentifier used for the KMS for legacy tokens. + */ + @Deprecated + public static class KMSLegacyDelegationTokenIdentifier + extends DelegationTokenIdentifier { + + public KMSLegacyDelegationTokenIdentifier() { + super(TOKEN_LEGACY_KIND); + } + + @Override + public Text getKind() { + return TOKEN_LEGACY_KIND; + } + } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java new file mode 100644 index 00000000000..fd27073f5bb --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto.key.kms; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.KMSUtil; + +import java.io.IOException; + +import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND; + +/** + * The {@link KMSTokenRenewer} that supports legacy tokens. + */ +@InterfaceAudience.Private +@Deprecated +public class KMSLegacyTokenRenewer extends KMSTokenRenewer { + + @Override + public boolean handleKind(Text kind) { + return kind.equals(TOKEN_LEGACY_KIND); + } + + /** + * Create a key provider for token renewal / cancellation. + * Caller is responsible for closing the key provider. + */ + @Override + protected KeyProvider createKeyProvider(Token token, + Configuration conf) throws IOException { + assert token.getKind().equals(TOKEN_LEGACY_KIND); + // Legacy tokens get service from configuration. + return KMSUtil.createKeyProvider(conf, + CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java new file mode 100644 index 00000000000..908ad39fe05 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto.key.kms; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.util.KMSUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND; + +/** + * The KMS implementation of {@link TokenRenewer}. + */ +@InterfaceAudience.Private +public class KMSTokenRenewer extends TokenRenewer { + + public static final Logger LOG = LoggerFactory + .getLogger(org.apache.hadoop.crypto.key.kms.KMSTokenRenewer.class); + + @Override + public boolean handleKind(Text kind) { + return kind.equals(TOKEN_KIND); + } + + @Override + public boolean isManaged(Token token) throws IOException { + return true; + } + + @Override + public long renew(Token token, Configuration conf) throws IOException { + LOG.debug("Renewing delegation token {}", token); + final KeyProvider keyProvider = createKeyProvider(token, conf); + try { + if (!(keyProvider instanceof + KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { + LOG.warn("keyProvider {} cannot renew token {}.", + keyProvider == null ? "null" : keyProvider.getClass(), token); + return 0; + } + return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension) + keyProvider).renewDelegationToken(token); + } finally { + if (keyProvider != null) { + keyProvider.close(); + } + } + } + + @Override + public void cancel(Token token, Configuration conf) throws IOException { + LOG.debug("Canceling delegation token {}", token); + final KeyProvider keyProvider = createKeyProvider(token, conf); + try { + if (!(keyProvider instanceof + KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { + LOG.warn("keyProvider {} cannot cancel token {}.", + keyProvider == null ? "null" : keyProvider.getClass(), token); + return; + } + ((KeyProviderDelegationTokenExtension.DelegationTokenExtension) + keyProvider).cancelDelegationToken(token); + } finally { + if (keyProvider != null) { + keyProvider.close(); + } + } + } + + /** + * Create a key provider for token renewal / cancellation. + * Caller is responsible for closing the key provider. + */ + protected KeyProvider createKeyProvider(Token token, + Configuration conf) throws IOException { + return KMSUtil + .createKeyProviderFromTokenService(conf, token.getService().toString()); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java new file mode 100644 index 00000000000..eea93c2eacf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto.key.kms; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 8cd753ab064..be1c7bc2486 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -770,6 +770,16 @@ public class CommonConfigurationKeysPublic { /** Default value is 100 ms. */ public static final int KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT = 100; + /** + * @see + * + * core-default.xml + */ + public static final String KMS_CLIENT_COPY_LEGACY_TOKEN_KEY = + "hadoop.security.kms.client.copy.legacy.token"; + /** Default value is true. */ + public static final boolean KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT = true; + /** * @see * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java index 0b1fdf80c9f..0ddc4fc356e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java @@ -300,11 +300,7 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL { creds.getAllTokens()); } if (!creds.getAllTokens().isEmpty()) { - InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(), - url.getPort()); - Text service = SecurityUtil.buildTokenService(serviceAddr); - dToken = creds.getToken(service); - LOG.debug("Using delegation token {} from service:{}", dToken, service); + dToken = getDelegationToken(url, creds); if (dToken != null) { if (useQueryStringForDelegationToken()) { // delegation token will go in the query string, injecting it @@ -340,6 +336,21 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL { return conn; } + /** + * Select a delegation token from all tokens in credentials, based on url. + */ + @InterfaceAudience.Private + public org.apache.hadoop.security.token.Token + getDelegationToken(URL url, Credentials creds) { + final InetSocketAddress serviceAddr = + new InetSocketAddress(url.getHost(), url.getPort()); + final Text service = SecurityUtil.buildTokenService(serviceAddr); + org.apache.hadoop.security.token.Token dToken = + creds.getToken(service); + LOG.debug("Selected delegation token {} using service:{}", dToken, service); + return dToken; + } + /** * Requests a delegation token using the configured Authenticator * for authentication. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java index 6ee59f1d175..0ef102e7d96 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java @@ -81,7 +81,7 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceStability.Evolving public abstract class DelegationTokenAuthenticationHandler implements AuthenticationHandler { - private static final Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(DelegationTokenAuthenticationHandler.class); protected static final String TYPE_POSTFIX = "-dt"; @@ -224,7 +224,8 @@ public abstract class DelegationTokenAuthenticationHandler HttpServletRequest request, HttpServletResponse response) throws IOException, AuthenticationException { boolean requestContinues = true; - LOG.trace("Processing operation for req=({}), token: {}", request, token); + LOG.trace("Processing operation for req=({}), token: {}", + request.getRequestURL(), token); String op = ServletUtils.getParameter(request, KerberosDelegationTokenAuthenticator.OP_PARAM); op = (op != null) ? StringUtils.toUpperCase(op) : null; @@ -407,7 +408,8 @@ public abstract class DelegationTokenAuthenticationHandler HttpServletResponse.SC_FORBIDDEN, new AuthenticationException(ex)); } } else { - LOG.debug("Falling back to {} (req={})", authHandler.getClass(), request); + LOG.debug("Falling back to {} (req={})", authHandler.getClass(), + request.getRequestURL()); token = authHandler.authenticate(request, response); } return token; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java index 617773b34d1..7e837817b27 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java @@ -50,7 +50,7 @@ import java.util.Map; @InterfaceAudience.Public @InterfaceStability.Evolving public abstract class DelegationTokenAuthenticator implements Authenticator { - private static Logger LOG = + public static final Logger LOG = LoggerFactory.getLogger(DelegationTokenAuthenticator.class); private static final String CONTENT_TYPE = "Content-Type"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java index c96c6fbde3d..80770761142 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java @@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -41,8 +42,7 @@ import java.util.Map; */ @InterfaceAudience.Private public final class KMSUtil { - public static final Logger LOG = - LoggerFactory.getLogger(KMSUtil.class); + public static final Logger LOG = LoggerFactory.getLogger(KMSUtil.class); private KMSUtil() { /* Hidden constructor */ } @@ -64,6 +64,13 @@ public final class KMSUtil { if (providerUriStr == null || providerUriStr.isEmpty()) { return null; } + KeyProvider kp = KMSUtilFaultInjector.get().createKeyProviderForTests( + providerUriStr, conf); + if (kp != null) { + LOG.info("KeyProvider is created with uri: {}. This should happen only " + + "in tests.", providerUriStr); + return kp; + } return createKeyProviderFromUri(conf, URI.create(providerUriStr)); } @@ -205,4 +212,38 @@ public final class KMSUtil { } return metadata; } + + /** + * Creates a key provider from token service field, which must be URI format. + * + * @param conf + * @param tokenServiceValue + * @return new KeyProvider or null + * @throws IOException + */ + public static KeyProvider createKeyProviderFromTokenService( + final Configuration conf, final String tokenServiceValue) + throws IOException { + LOG.debug("Creating key provider from token service value {}. ", + tokenServiceValue); + final KeyProvider kp = KMSUtilFaultInjector.get() + .createKeyProviderForTests(tokenServiceValue, conf); + if (kp != null) { + LOG.info("KeyProvider is created with uri: {}. This should happen only " + + "in tests.", tokenServiceValue); + return kp; + } + if (!tokenServiceValue.contains("://")) { + throw new IllegalArgumentException( + "Invalid token service " + tokenServiceValue); + } + final URI tokenServiceUri; + try { + tokenServiceUri = new URI(tokenServiceValue); + } catch (URISyntaxException e) { + throw new IllegalArgumentException( + "Invalid token service " + tokenServiceValue, e); + } + return createKeyProviderFromUri(conf, tokenServiceUri); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java new file mode 100644 index 00000000000..46d5069d0ab --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; + +import java.io.IOException; + +/** + * Used for returning custom KeyProvider from test methods. + */ +@VisibleForTesting +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class KMSUtilFaultInjector { + private static KMSUtilFaultInjector instance = new KMSUtilFaultInjector(); + + public static KMSUtilFaultInjector get() { + return instance; + } + + public static void set(KMSUtilFaultInjector injector) { + instance = injector; + } + + public KeyProvider createKeyProviderForTests(String value, Configuration conf) + throws IOException { + return null; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier index b65f15159d2..43d06e23932 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier +++ b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier @@ -12,3 +12,4 @@ # limitations under the License. # org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSDelegationTokenIdentifier +org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSLegacyDelegationTokenIdentifier diff --git a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer index 56320fb0fc5..5b6082c141a 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer +++ b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer @@ -11,4 +11,5 @@ # See the License for the specific language governing permissions and # limitations under the License. # -org.apache.hadoop.crypto.key.kms.KMSClientProvider$KMSTokenRenewer \ No newline at end of file +org.apache.hadoop.crypto.key.kms.KMSTokenRenewer +org.apache.hadoop.crypto.key.kms.KMSLegacyTokenRenewer \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index ad24f562432..f32268bee83 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2602,6 +2602,26 @@ + + hadoop.security.kms.client.copy.legacy.token + true + + Expert only. Whether the KMS client provider should copy a token to legacy + kind. This is for KMS_DELEGATION_TOKEN to be backwards compatible. With the + default value set to true, the client will locally duplicate the + KMS_DELEGATION_TOKEN token and create a kms-dt token, with the service field + conforming to kms-dt. All other parts of the token remain the same. + Then the new clients will use KMS_DELEGATION_TOKEN and old clients will + use kms-dt to authenticate. Default value is true. + You should only change this to false if you know all the KMS servers + , clients (including both job submitters and job runners) and the + token renewers (usually Yarn RM) are on a version that supports + KMS_DELEGATION_TOKEN. + Turning this off prematurely may result in old clients failing to + authenticate with new servers. + + + hadoop.security.kms.client.failover.sleep.max.millis 2000 diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java new file mode 100644 index 00000000000..56aace50f74 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto.key.kms; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.event.Level; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.URL; + +import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND; +import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +/** + * Unit test for {@link KMSClientProvider} class. + */ +public class TestKMSClientProvider { + + public static final Logger LOG = + LoggerFactory.getLogger(TestKMSClientProvider.class); + + private final Token token = new Token(); + private final Token legacyToken = new Token(); + private final String uriString = "kms://https@host:16000/kms"; + private final String legacyTokenService = "host:16000"; + + @Rule + public Timeout globalTimeout = new Timeout(30000); + + { + GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE); + } + + @Before + public void setup() { + SecurityUtil.setTokenServiceUseIp(false); + token.setKind(TOKEN_KIND); + token.setService(new Text(uriString)); + legacyToken.setKind(TOKEN_LEGACY_KIND); + legacyToken.setService(new Text(legacyTokenService)); + } + + @Test + public void testNotCopyFromLegacyToken() throws Exception { + final DelegationTokenAuthenticatedURL url = + mock(DelegationTokenAuthenticatedURL.class); + final Configuration conf = new Configuration(); + final URI uri = new URI(uriString); + final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri); + try { + final KMSClientProvider spyKp = spy(kp); + when(spyKp.createKMSAuthenticatedURL()).thenReturn(url); + when(url.getDelegationToken(any(URL.class), + any(DelegationTokenAuthenticatedURL.Token.class), any(String.class), + any(String.class))).thenReturn(legacyToken); + + final Credentials creds = new Credentials(); + final Token[] tokens = spyKp.addDelegationTokens("yarn", creds); + LOG.info("Got tokens: {}", tokens); + assertEquals(1, tokens.length); + LOG.info("uri:" + uriString); + // if KMS server returned a legacy token, new client should leave the + // service being legacy and not set uri string + assertEquals(legacyTokenService, tokens[0].getService().toString()); + } finally { + kp.close(); + } + } + + @Test + public void testCopyFromToken() throws Exception { + final DelegationTokenAuthenticatedURL url = + mock(DelegationTokenAuthenticatedURL.class); + final Configuration conf = new Configuration(); + final URI uri = new URI(uriString); + final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri); + try { + final KMSClientProvider spyKp = spy(kp); + when(spyKp.createKMSAuthenticatedURL()).thenReturn(url); + when(url.getDelegationToken(any(URL.class), + any(DelegationTokenAuthenticatedURL.Token.class), any(String.class), + any(String.class))).thenReturn(token); + + final Credentials creds = new Credentials(); + final Token[] tokens = spyKp.addDelegationTokens("yarn", creds); + LOG.info("Got tokens: {}", tokens); + assertEquals(2, tokens.length); + assertTrue(creds.getAllTokens().contains(token)); + assertNotNull(creds.getToken(legacyToken.getService())); + } finally { + kp.close(); + } + } + + @Test + public void testSelectTokenWhenBothExist() throws Exception { + final Credentials creds = new Credentials(); + final Configuration conf = new Configuration(); + final URI uri = new URI(uriString); + final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri); + try { + creds.addToken(token.getService(), token); + creds.addToken(legacyToken.getService(), legacyToken); + Token t = kp.selectKMSDelegationToken(creds); + assertEquals(token, t); + } finally { + kp.close(); + } + } + + @Test + public void testSelectTokenLegacyService() throws Exception { + final Configuration conf = new Configuration(); + final URI uri = new URI(uriString); + final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri); + try { + Text legacyService = new Text(legacyTokenService); + token.setService(legacyService); + final Credentials creds = new Credentials(); + creds.addToken(legacyService, token); + Token t = kp.selectKMSDelegationToken(creds); + assertEquals(token, t); + } finally { + kp.close(); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java index bd68dca22c7..e6a9fe0b55e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java @@ -42,7 +42,8 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authorize.AuthorizationException; -import org.junit.After; +import org.apache.hadoop.util.KMSUtil; +import org.apache.hadoop.util.KMSUtilFaultInjector; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.Mockito; @@ -56,33 +57,68 @@ public class TestLoadBalancingKMSClientProvider { SecurityUtil.setTokenServiceUseIp(false); } - @After - public void teardown() throws IOException { - KMSClientProvider.fallbackDefaultPortForTesting = false; + private void setKMSUtilFaultInjector() { + KMSUtilFaultInjector injector = new KMSUtilFaultInjector() { + @Override + public KeyProvider createKeyProviderForTests( + String value, Configuration conf) throws IOException { + return TestLoadBalancingKMSClientProvider + .createKeyProviderForTests(value, conf); + } + }; + KMSUtilFaultInjector.set(injector); + } + + public static KeyProvider createKeyProviderForTests( + String value, Configuration conf) throws IOException { + // The syntax for kms servers will be + // kms://http@localhost:port1/kms,kms://http@localhost:port2/kms + if (!value.contains(",")) { + return null; + } + String[] keyProviderUrisStr = value.split(","); + KMSClientProvider[] keyProviderArr = + new KMSClientProvider[keyProviderUrisStr.length]; + + int i = 0; + for (String keyProviderUri: keyProviderUrisStr) { + KMSClientProvider kmcp = + new KMSClientProvider(URI.create(keyProviderUri), conf, URI + .create(value)); + keyProviderArr[i] = kmcp; + i++; + } + LoadBalancingKMSClientProvider lbkcp = + new LoadBalancingKMSClientProvider(keyProviderArr, conf); + return lbkcp; } @Test public void testCreation() throws Exception { Configuration conf = new Configuration(); - KMSClientProvider.fallbackDefaultPortForTesting = true; KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI( - "kms://http@host1/kms/foo"), conf); + "kms://http@host1:9600/kms/foo"), conf); assertTrue(kp instanceof LoadBalancingKMSClientProvider); KMSClientProvider[] providers = ((LoadBalancingKMSClientProvider) kp).getProviders(); assertEquals(1, providers.length); - assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"), + assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"), Sets.newHashSet(providers[0].getKMSUrl())); - - kp = new KMSClientProvider.Factory().createProvider(new URI( - "kms://http@host1;host2;host3/kms/foo"), conf); + setKMSUtilFaultInjector(); + String uriStr = "kms://http@host1:9600/kms/foo," + + "kms://http@host2:9600/kms/foo," + + "kms://http@host3:9600/kms/foo"; + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, + uriStr); + kp = KMSUtil.createKeyProvider(conf, CommonConfigurationKeysPublic + .HADOOP_SECURITY_KEY_PROVIDER_PATH); assertTrue(kp instanceof LoadBalancingKMSClientProvider); providers = ((LoadBalancingKMSClientProvider) kp).getProviders(); assertEquals(3, providers.length); - assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/", - "http://host2/kms/foo/v1/", - "http://host3/kms/foo/v1/"), + assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/", + "http://host2:9600/kms/foo/v1/", + "http://host3:9600/kms/foo/v1/"), Sets.newHashSet(providers[0].getKMSUrl(), providers[1].getKMSUrl(), providers[2].getKMSUrl())); @@ -208,7 +244,7 @@ public class TestLoadBalancingKMSClientProvider { private class MyKMSClientProvider extends KMSClientProvider { public MyKMSClientProvider(URI uri, Configuration conf) throws IOException { - super(uri, conf); + super(uri, conf, uri); } @Override @@ -245,9 +281,8 @@ public class TestLoadBalancingKMSClientProvider { @Test public void testClassCastException() throws Exception { Configuration conf = new Configuration(); - KMSClientProvider.fallbackDefaultPortForTesting = true; KMSClientProvider p1 = new MyKMSClientProvider( - new URI("kms://http@host1/kms/foo"), conf); + new URI("kms://http@host1:9600/kms/foo"), conf); LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( new KMSClientProvider[] {p1}, 0, conf); try { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java new file mode 100644 index 00000000000..77f52ee9730 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Test {@link KMSUtil}. + */ +public class TestKMSUtil { + + public static final Logger LOG = LoggerFactory.getLogger(TestKMSUtil.class); + + @Rule + public Timeout globalTimeout = new Timeout(90000); + + @Test + public void testCreateKeyProviderFromTokenService() throws Exception { + final Configuration conf = new Configuration(); + KeyProvider kp = KMSUtil.createKeyProviderFromTokenService(conf, + "kms://https@localhost:9600/kms"); + assertNotNull(kp); + kp.close(); + + kp = KMSUtil.createKeyProviderFromTokenService(conf, + "kms://https@localhost:9600/kms,kms://localhost1:9600/kms"); + assertNotNull(kp); + kp.close(); + + String invalidService = "whatever:9600"; + try { + KMSUtil.createKeyProviderFromTokenService(conf, invalidService); + } catch (Exception ex) { + LOG.info("Expected exception:", ex); + assertTrue(ex instanceof IllegalArgumentException); + GenericTestUtils.assertExceptionContains( + "Invalid token service " + invalidService, ex); + } + } +} diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java index 1517b04b632..c1711439aac 100644 --- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java +++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -31,26 +32,35 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; import org.apache.hadoop.crypto.key.kms.KMSClientProvider; import org.apache.hadoop.crypto.key.kms.KMSDelegationToken; +import org.apache.hadoop.crypto.key.kms.KMSTokenRenewer; import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider; +import org.apache.hadoop.crypto.key.kms.TestLoadBalancingKMSClientProvider; import org.apache.hadoop.crypto.key.kms.ValueQueue; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.io.Text; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.util.KMSUtil; +import org.apache.hadoop.util.KMSUtilFaultInjector; import org.apache.hadoop.util.Time; import org.apache.http.client.utils.URIBuilder; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -71,7 +81,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.Writer; import java.net.InetAddress; -import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.SocketTimeoutException; import java.net.URI; @@ -96,6 +105,10 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH; +import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND; +import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -113,6 +126,20 @@ public class TestKMS { private SSLFactory sslFactory; + private final KMSUtilFaultInjector oldInjector = + KMSUtilFaultInjector.get(); + + // Injector to create providers with different ports. Can only happen in tests + private final KMSUtilFaultInjector testInjector = + new KMSUtilFaultInjector() { + @Override + public KeyProvider createKeyProviderForTests(String value, + Configuration conf) throws IOException { + return TestLoadBalancingKMSClientProvider + .createKeyProviderForTests(value, conf); + } + }; + // Keep track of all key providers created during a test case, so they can be // closed at test tearDown. private List providersCreated = new LinkedList<>(); @@ -122,7 +149,12 @@ public class TestKMS { @Before public void setUp() throws Exception { - setUpMiniKdc(); + GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE); + GenericTestUtils + .setLogLevel(DelegationTokenAuthenticationHandler.LOG, Level.TRACE); + GenericTestUtils + .setLogLevel(DelegationTokenAuthenticator.LOG, Level.TRACE); + GenericTestUtils.setLogLevel(KMSUtil.LOG, Level.TRACE); // resetting kerberos security Configuration conf = new Configuration(); UserGroupInformation.setConfiguration(conf); @@ -141,24 +173,78 @@ public class TestKMS { } public static abstract class KMSCallable implements Callable { - private URL kmsUrl; + private List kmsUrl; protected URL getKMSUrl() { - return kmsUrl; + return kmsUrl.get(0); + } + + protected URL[] getKMSHAUrl() { + URL[] urls = new URL[kmsUrl.size()]; + return kmsUrl.toArray(urls); + } + + protected void addKMSUrl(URL url) { + if (kmsUrl == null) { + kmsUrl = new ArrayList(); + } + kmsUrl.add(url); + } + + /* + * The format of the returned value will be + * kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2 + */ + protected String generateLoadBalancingKeyProviderUriString() { + if (kmsUrl == null || kmsUrl.size() == 0) { + return null; + } + StringBuffer sb = new StringBuffer(); + + for (int i = 0; i < kmsUrl.size(); i++) { + sb.append(KMSClientProvider.SCHEME_NAME + "://" + + kmsUrl.get(0).getProtocol() + "@"); + URL url = kmsUrl.get(i); + sb.append(url.getAuthority()); + if (url.getPath() != null) { + sb.append(url.getPath()); + } + if (i < kmsUrl.size() - 1) { + sb.append(","); + } + } + return sb.toString(); } } protected KeyProvider createProvider(URI uri, Configuration conf) throws IOException { final KeyProvider ret = new LoadBalancingKMSClientProvider( - new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf); + new KMSClientProvider[] {new KMSClientProvider(uri, conf, uri)}, conf); providersCreated.add(ret); return ret; } + /** + * create a LoadBalancingKMSClientProvider from an array of URIs. + * @param uris an array of KMS URIs + * @param conf configuration object + * @return a LoadBalancingKMSClientProvider object + * @throws IOException + */ + protected LoadBalancingKMSClientProvider createHAProvider(URI[] uris, + Configuration conf, String originalUri) throws IOException { + KMSClientProvider[] providers = new KMSClientProvider[uris.length]; + for (int i = 0; i < providers.length; i++) { + providers[i] = + new KMSClientProvider(uris[i], conf, URI.create(originalUri)); + } + return new LoadBalancingKMSClientProvider(providers, conf); + } + private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf) throws IOException { - final KMSClientProvider ret = new KMSClientProvider(uri, conf); + final KMSClientProvider ret = new KMSClientProvider(uri, conf, uri); providersCreated.add(ret); return ret; } @@ -170,22 +256,33 @@ public class TestKMS { protected T runServer(int port, String keystore, String password, File confDir, KMSCallable callable) throws Exception { + return runServer(new int[] {port}, keystore, password, confDir, callable); + } + + protected T runServer(int[] ports, String keystore, String password, + File confDir, KMSCallable callable) throws Exception { MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir) .setLog4jConfFile("log4j.properties"); if (keystore != null) { miniKMSBuilder.setSslConf(new File(keystore), password); } - if (port > 0) { - miniKMSBuilder.setPort(port); + final List kmsList = new ArrayList<>(); + for (int i=0; i< ports.length; i++) { + if (ports[i] > 0) { + miniKMSBuilder.setPort(ports[i]); + } + MiniKMS miniKMS = miniKMSBuilder.build(); + kmsList.add(miniKMS); + miniKMS.start(); + LOG.info("Test KMS running at: " + miniKMS.getKMSUrl()); + callable.addKMSUrl(miniKMS.getKMSUrl()); } - MiniKMS miniKMS = miniKMSBuilder.build(); - miniKMS.start(); try { - System.out.println("Test KMS running at: " + miniKMS.getKMSUrl()); - callable.kmsUrl = miniKMS.getKMSUrl(); return callable.call(); } finally { - miniKMS.stop(); + for (MiniKMS miniKMS: kmsList) { + miniKMS.stop(); + } } } @@ -240,6 +337,13 @@ public class TestKMS { return new URI("kms://" + str); } + public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception { + URI[] uris = new URI[kmsUrls.length]; + for (int i = 0; i < kmsUrls.length; i++) { + uris[i] = createKMSUri(kmsUrls[i]); + } + return uris; + } private static class KerberosConfiguration extends javax.security.auth.login.Configuration { @@ -315,19 +419,17 @@ public class TestKMS { principals.toArray(new String[principals.size()])); } - private void setUpMiniKdc() throws Exception { + @BeforeClass + public static void setUpMiniKdc() throws Exception { Properties kdcConf = MiniKdc.createConf(); setUpMiniKdc(kdcConf); } @After public void tearDown() throws Exception { - if (kdc != null) { - kdc.stop(); - kdc = null; - } UserGroupInformation.setShouldRenewImmediatelyForTests(false); UserGroupInformation.reset(); + KMSUtilFaultInjector.set(oldInjector); if (!providersCreated.isEmpty()) { final MultipleIOException.Builder b = new MultipleIOException.Builder(); for (KeyProvider kp : providersCreated) { @@ -345,6 +447,14 @@ public class TestKMS { } } + @AfterClass + public static void shutdownMiniKdc() { + if (kdc != null) { + kdc.stop(); + kdc = null; + } + } + private T doAs(String user, final PrivilegedExceptionAction action) throws Exception { UserGroupInformation.loginUserFromKeytab(user, keytab.getAbsolutePath()); @@ -501,8 +611,10 @@ public class TestKMS { Token[] tokens = ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp) .addDelegationTokens("myuser", new Credentials()); - Assert.assertEquals(1, tokens.length); - Assert.assertEquals("kms-dt", tokens[0].getKind().toString()); + assertEquals(2, tokens.length); + assertEquals(KMSDelegationToken.TOKEN_KIND, + tokens[0].getKind()); + kp.close(); return null; } }); @@ -518,8 +630,9 @@ public class TestKMS { Token[] tokens = ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp) .addDelegationTokens("myuser", new Credentials()); - Assert.assertEquals(1, tokens.length); - Assert.assertEquals("kms-dt", tokens[0].getKind().toString()); + assertEquals(2, tokens.length); + assertEquals(KMSDelegationToken.TOKEN_KIND, tokens[0].getKind()); + kp.close(); } return null; } @@ -2011,7 +2124,6 @@ public class TestKMS { return null; } }); - nonKerberosUgi.addCredentials(credentials); try { @@ -2067,6 +2179,17 @@ public class TestKMS { testDelegationTokensOps(true, true); } + private Text getTokenService(KeyProvider provider) { + assertTrue("KeyProvider should be an instance of KMSClientProvider", + (provider instanceof LoadBalancingKMSClientProvider)); + assertEquals("Num client providers should be 1", 1, + ((LoadBalancingKMSClientProvider)provider).getProviders().length); + Text tokenService = + (((LoadBalancingKMSClientProvider)provider).getProviders()[0]) + .getDelegationTokenService(); + return tokenService; + } + private void testDelegationTokensOps(final boolean ssl, final boolean kerb) throws Exception { final File confDir = getTestDir(); @@ -2098,11 +2221,16 @@ public class TestKMS { final URI uri = createKMSUri(getKMSUrl()); clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH, createKMSUri(getKMSUrl()).toString()); + clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false); doAs("client", new PrivilegedExceptionAction() { @Override public Void run() throws Exception { KeyProvider kp = createProvider(uri, clientConf); + // Unset the conf value for key provider path just to be sure that + // the key provider created for renew and cancel token is from + // token service field. + clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH); // test delegation token retrieval KeyProviderDelegationTokenExtension kpdte = KeyProviderDelegationTokenExtension. @@ -2110,13 +2238,10 @@ public class TestKMS { final Credentials credentials = new Credentials(); final Token[] tokens = kpdte.addDelegationTokens("client1", credentials); - Assert.assertEquals(1, credentials.getAllTokens().size()); - InetSocketAddress kmsAddr = - new InetSocketAddress(getKMSUrl().getHost(), - getKMSUrl().getPort()); - Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, - credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)). - getKind()); + Text tokenService = getTokenService(kp); + assertEquals(1, credentials.getAllTokens().size()); + assertEquals(TOKEN_KIND, + credentials.getToken(tokenService).getKind()); // Test non-renewer user cannot renew. for (Token token : tokens) { @@ -2243,12 +2368,11 @@ public class TestKMS { final URI uri = createKMSUri(getKMSUrl()); clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH, createKMSUri(getKMSUrl()).toString()); + clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false); final KeyProvider kp = createProvider(uri, clientConf); final KeyProviderDelegationTokenExtension kpdte = KeyProviderDelegationTokenExtension. createKeyProviderDelegationTokenExtension(kp); - final InetSocketAddress kmsAddr = - new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort()); // Job 1 (e.g. YARN log aggregation job), with user DT. final Collection> job1Token = new HashSet<>(); @@ -2258,16 +2382,17 @@ public class TestKMS { // Get a DT and use it. final Credentials credentials = new Credentials(); kpdte.addDelegationTokens("client", credentials); + Text tokenService = getTokenService(kp); Assert.assertEquals(1, credentials.getAllTokens().size()); - Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials. - getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind()); + UserGroupInformation.getCurrentUser().addCredentials(credentials); LOG.info("Added kms dt to credentials: {}", UserGroupInformation. getCurrentUser().getCredentials().getAllTokens()); - Token token = + final Token token = UserGroupInformation.getCurrentUser().getCredentials() - .getToken(SecurityUtil.buildTokenService(kmsAddr)); - Assert.assertNotNull(token); + .getToken(tokenService); + assertNotNull(token); + assertEquals(TOKEN_KIND, token.getKind()); job1Token.add(token); // Decode the token to get max time. @@ -2302,17 +2427,16 @@ public class TestKMS { // Get a new DT, but don't use it yet. final Credentials newCreds = new Credentials(); kpdte.addDelegationTokens("client", newCreds); - Assert.assertEquals(1, newCreds.getAllTokens().size()); - Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, - newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)). - getKind()); + assertEquals(1, newCreds.getAllTokens().size()); + final Text tokenService = getTokenService(kp); + assertEquals(TOKEN_KIND, + newCreds.getToken(tokenService).getKind()); // Using job 1's DT should fail. final Credentials oldCreds = new Credentials(); for (Token token : job1Token) { - if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) { - oldCreds - .addToken(SecurityUtil.buildTokenService(kmsAddr), token); + if (token.getKind().equals(TOKEN_KIND)) { + oldCreds.addToken(tokenService, token); } } UserGroupInformation.getCurrentUser().addCredentials(oldCreds); @@ -2326,12 +2450,11 @@ public class TestKMS { } // Using the new DT should succeed. - Assert.assertEquals(1, newCreds.getAllTokens().size()); - Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, - newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)). - getKind()); + assertEquals(1, newCreds.getAllTokens().size()); + assertEquals(TOKEN_KIND, + newCreds.getToken(tokenService).getKind()); UserGroupInformation.getCurrentUser().addCredentials(newCreds); - LOG.info("Credetials now are: {}", UserGroupInformation + LOG.info("Credentials now are: {}", UserGroupInformation .getCurrentUser().getCredentials().getAllTokens()); kp.getKeys(); return null; @@ -2357,7 +2480,13 @@ public class TestKMS { doKMSWithZK(true, true); } - public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception { + private T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner, + KMSCallable callable) throws Exception { + return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1); + } + + private T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner, + KMSCallable callable, int kmsSize) throws Exception { TestingServer zkServer = null; try { zkServer = new TestingServer(); @@ -2403,43 +2532,265 @@ public class TestKMS { writeConf(testDir, conf); - KMSCallable c = - new KMSCallable() { - @Override - public KeyProvider call() throws Exception { - final Configuration conf = new Configuration(); - conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128); - final URI uri = createKMSUri(getKMSUrl()); - - final KeyProvider kp = - doAs("SET_KEY_MATERIAL", - new PrivilegedExceptionAction() { - @Override - public KeyProvider run() throws Exception { - KeyProvider kp = createProvider(uri, conf); - kp.createKey("k1", new byte[16], - new KeyProvider.Options(conf)); - kp.createKey("k2", new byte[16], - new KeyProvider.Options(conf)); - kp.createKey("k3", new byte[16], - new KeyProvider.Options(conf)); - return kp; - } - }); - return kp; - } - }; - - runServer(null, null, testDir, c); + int[] ports = new int[kmsSize]; + for (int i = 0; i < ports.length; i++) { + ports[i] = -1; + } + return runServer(ports, null, null, testDir, callable); } finally { if (zkServer != null) { zkServer.stop(); zkServer.close(); } } - } + public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception { + KMSCallable c = + new KMSCallable() { + @Override + public KeyProvider call() throws Exception { + final Configuration conf = new Configuration(); + conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128); + final URI uri = createKMSUri(getKMSUrl()); + + final KeyProvider kp = + doAs("SET_KEY_MATERIAL", + new PrivilegedExceptionAction() { + @Override + public KeyProvider run() throws Exception { + KeyProvider kp = createProvider(uri, conf); + kp.createKey("k1", new byte[16], + new KeyProvider.Options(conf)); + kp.createKey("k2", new byte[16], + new KeyProvider.Options(conf)); + kp.createKey("k3", new byte[16], + new KeyProvider.Options(conf)); + return kp; + } + }); + return kp; + } + }; + + runServerWithZooKeeper(zkDTSM, zkSigner, c); + } + + @Test + public void doKMSHAZKWithDelegationTokenAccess() throws Exception { + KMSCallable c = new KMSCallable() { + @Override + public Void call() throws Exception { + final Configuration conf = new Configuration(); + conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128); + final URI[] uris = createKMSHAUri(getKMSHAUrl()); + final Credentials credentials = new Credentials(); + final String lbUri = generateLoadBalancingKeyProviderUriString(); + final LoadBalancingKMSClientProvider lbkp = + createHAProvider(uris, conf, lbUri); + conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH); + // Login as a Kerberos user principal using keytab. + // Connect to KMS to create a delegation token and add it to credentials + final String keyName = "k0"; + doAs("SET_KEY_MATERIAL", + new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + KeyProviderDelegationTokenExtension kpdte = + KeyProviderDelegationTokenExtension. + createKeyProviderDelegationTokenExtension(lbkp); + kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials); + kpdte.createKey(keyName, new KeyProvider.Options(conf)); + return null; + } + }); + + assertTokenIdentifierEquals(credentials); + + final LoadBalancingKMSClientProvider lbkp1 = + createHAProvider(uris, conf, lbUri); + // verify both tokens can be used to authenticate + for (Token t : credentials.getAllTokens()) { + assertTokenAccess(lbkp1, keyName, t); + } + return null; + } + }; + runServerWithZooKeeper(true, true, c, 2); + } + + /** + * Assert that the passed in credentials have 2 tokens, of kind + * {@link KMSDelegationToken#TOKEN_KIND} and + * {@link KMSDelegationToken#TOKEN_LEGACY_KIND}. Assert that the 2 tokens have + * the same identifier. + */ + private void assertTokenIdentifierEquals(Credentials credentials) + throws IOException { + // verify the 2 tokens have the same identifier + assertEquals(2, credentials.getAllTokens().size()); + Token token = null; + Token legacyToken = null; + for (Token t : credentials.getAllTokens()) { + if (KMSDelegationToken.TOKEN_KIND.equals(t.getKind())) { + token = t; + } else if (KMSDelegationToken.TOKEN_LEGACY_KIND.equals(t.getKind())) { + legacyToken = t; + } + } + assertNotNull(token); + assertNotNull(legacyToken); + final DelegationTokenIdentifier tokenId = + (DelegationTokenIdentifier) token.decodeIdentifier(); + final DelegationTokenIdentifier legacyTokenId = + (DelegationTokenIdentifier) legacyToken.decodeIdentifier(); + assertEquals("KMS DT and legacy dt should have identical identifier", + tokenId, legacyTokenId); + } + + /** + * Tests token access with each providers in the + * {@link LoadBalancingKMSClientProvider}. This is to make sure the 2 token + * kinds are compatible and can both be used to authenticate. + */ + private void assertTokenAccess(final LoadBalancingKMSClientProvider lbkp, + final String keyName, final Token token) throws Exception { + UserGroupInformation tokenUgi = + UserGroupInformation.createUserForTesting("test", new String[] {}); + // Verify the tokens can authenticate to any KMS + tokenUgi.addToken(token); + tokenUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + // Create a kms client with one provider at a time. Must use one + // provider so that if it fails to authenticate, it does not fall + // back to the next KMS instance. + // It should succeed because its delegation token can access any + // KMS instances. + for (KMSClientProvider provider : lbkp.getProviders()) { + if (token.getKind().equals(TOKEN_LEGACY_KIND) && !token.getService() + .equals(provider.getDelegationTokenService())) { + // Historically known issue: Legacy token can only work with the + // key provider specified in the token's Service + continue; + } + LOG.info("Rolling key {} via provider {} with token {}.", keyName, + provider, token); + provider.rollNewVersion(keyName); + } + return null; + } + }); + } + + @Test + public void testKMSHAZKDelegationTokenRenewCancel() throws Exception { + testKMSHAZKDelegationTokenRenewCancel(TOKEN_KIND); + } + + @Test + public void testKMSHAZKDelegationTokenRenewCancelLegacy() throws Exception { + testKMSHAZKDelegationTokenRenewCancel(TOKEN_LEGACY_KIND); + } + + private void testKMSHAZKDelegationTokenRenewCancel(final Text tokenKind) + throws Exception { + GenericTestUtils.setLogLevel(KMSTokenRenewer.LOG, Level.TRACE); + assertTrue(tokenKind == TOKEN_KIND || tokenKind == TOKEN_LEGACY_KIND); + KMSCallable c = new KMSCallable() { + @Override + public Void call() throws Exception { + final Configuration conf = new Configuration(); + final URI[] uris = createKMSHAUri(getKMSHAUrl()); + final Credentials credentials = new Credentials(); + // Create a UGI without Kerberos auth. It will be authenticated with + // delegation token. + final UserGroupInformation nonKerberosUgi = + UserGroupInformation.getCurrentUser(); + final String lbUri = generateLoadBalancingKeyProviderUriString(); + final LoadBalancingKMSClientProvider lbkp = + createHAProvider(uris, conf, lbUri); + conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH); + // Login as a Kerberos user principal using keytab. + // Connect to KMS to create a delegation token and add it to credentials + doAs("SET_KEY_MATERIAL", + new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + KeyProviderDelegationTokenExtension kpdte = + KeyProviderDelegationTokenExtension. + createKeyProviderDelegationTokenExtension(lbkp); + kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials); + return null; + } + }); + + // Test token renewal and cancellation + final Collection> tokens = + credentials.getAllTokens(); + doAs("SET_KEY_MATERIAL", new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + Assert.assertEquals(2, tokens.size()); + boolean tokenFound = false; + for (Token token : tokens) { + if (!tokenKind.equals(token.getKind())) { + continue; + } else { + tokenFound = true; + } + KMSUtilFaultInjector.set(testInjector); + setupConfForToken(token.getKind(), conf, lbUri); + + LOG.info("Testing token: {}", token); + long tokenLife = token.renew(conf); + LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife); + Thread.sleep(10); + long newTokenLife = token.renew(conf); + LOG.info("Renewed token {}, new lifetime:{}", token, + newTokenLife); + assertTrue(newTokenLife > tokenLife); + + boolean canceled = false; + // test delegation token cancellation + if (!canceled) { + token.cancel(conf); + LOG.info("Cancelled token {}", token); + canceled = true; + } + assertTrue("token should have been canceled", canceled); + try { + token.renew(conf); + fail("should not be able to renew a canceled token " + token); + } catch (Exception e) { + LOG.info("Expected exception when renewing token", e); + } + } + assertTrue("Should have found token kind " + tokenKind + " from " + + tokens, tokenFound); + return null; + } + }); + return null; + } + }; + runServerWithZooKeeper(true, true, c, 2); + } + + /** + * Set or unset the key provider configuration based on token kind. + */ + private void setupConfForToken(Text tokenKind, Configuration conf, + String lbUri) { + if (tokenKind.equals(TOKEN_KIND)) { + conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH); + } else { + // conf is only required for legacy tokens to create provider, + // new tokens create provider by parsing its own Service field + assertEquals(TOKEN_LEGACY_KIND, tokenKind); + conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri); + } + } @Test public void testProxyUserKerb() throws Exception { @@ -2558,6 +2909,16 @@ public class TestKMS { @Test public void testTGTRenewal() throws Exception { + shutdownMiniKdc(); + try { + testTgtRenewalInt(); + } finally { + shutdownMiniKdc(); + setUpMiniKdc(); + } + } + + private void testTgtRenewalInt() throws Exception { Properties kdcConf = MiniKdc.createConf(); kdcConf.setProperty(MiniKdc.MAX_TICKET_LIFETIME, "3"); kdcConf.setProperty(MiniKdc.MIN_TICKET_LIFETIME, "3");