From 5ec86b445cc492f52c33639efb6a09a0d2f27475 Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Fri, 12 Oct 2018 09:32:21 -0700 Subject: [PATCH] HADOOP-14445. Use DelegationTokenIssuer to create KMS delegation tokens that can authenticate to all KMS instances. Contributed by Daryn Sharp, Xiao Chen, Rushabh S Shah. --- .../KeyProviderDelegationTokenExtension.java | 71 ++-- .../crypto/key/KeyProviderTokenIssuer.java | 4 +- .../crypto/key/kms/KMSClientProvider.java | 220 +++++++---- .../kms/LoadBalancingKMSClientProvider.java | 75 +++- .../java/org/apache/hadoop/fs/FileSystem.java | 75 +--- .../web/DelegationTokenAuthenticatedURL.java | 25 +- .../security/token/DelegationTokenIssuer.java | 112 ++++++ .../java/org/apache/hadoop/util/KMSUtil.java | 13 +- ...stKeyProviderDelegationTokenExtension.java | 20 +- .../crypto/key/kms/TestKMSClientProvider.java | 138 +++++++ .../TestLoadBalancingKMSClientProvider.java | 63 +++- .../hadoop/fs/TestFilterFileSystem.java | 3 + .../apache/hadoop/fs/TestHarFileSystem.java | 3 + .../hadoop/crypto/key/kms/server/TestKMS.java | 351 +++++++++++++++--- .../org/apache/hadoop/hdfs/DFSClient.java | 11 +- .../hadoop/hdfs/DistributedFileSystem.java | 14 +- .../org/apache/hadoop/hdfs/HdfsKMSUtil.java | 62 ++-- .../hadoop/hdfs/web/WebHdfsFileSystem.java | 20 +- 18 files changed, 965 insertions(+), 315 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java index a63b7d50090..29c5bcd370d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java @@ -17,8 +17,12 @@ */ package org.apache.hadoop.crypto.key; +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer; import java.io.IOException; @@ -28,7 +32,8 @@ import java.io.IOException; */ public class KeyProviderDelegationTokenExtension extends KeyProviderExtension - { + + implements DelegationTokenIssuer { private static DelegationTokenExtension DEFAULT_EXTENSION = new DefaultDelegationTokenExtension(); @@ -36,22 +41,9 @@ public class KeyProviderDelegationTokenExtension extends /** * DelegationTokenExtension is a type of Extension that exposes methods * needed to work with Delegation Tokens. - */ - public interface DelegationTokenExtension extends - KeyProviderExtension.Extension { - - /** - * The implementer of this class will take a renewer and add all - * delegation tokens associated with the renewer to the - * Credentials object if it is not already present, - * @param renewer the user allowed to renew the delegation tokens - * @param credentials cache in which to add new delegation tokens - * @return list of new delegation tokens - * @throws IOException thrown if IOException if an IO error occurs. - */ - Token[] addDelegationTokens(final String renewer, - Credentials credentials) throws IOException; - + */ + public interface DelegationTokenExtension + extends KeyProviderExtension.Extension, DelegationTokenIssuer { /** * Renews the given token. * @param token The token to be renewed. @@ -66,6 +58,12 @@ public class KeyProviderDelegationTokenExtension extends * @throws IOException */ Void cancelDelegationToken(final Token token) throws IOException; + + // Do NOT call this. Only intended for internal use. + @VisibleForTesting + @InterfaceAudience.Private + @InterfaceStability.Unstable + Token selectDelegationToken(Credentials creds); } /** @@ -81,6 +79,16 @@ public class KeyProviderDelegationTokenExtension extends return null; } + @Override + public String getCanonicalServiceName() { + return null; + } + + @Override + public Token getDelegationToken(String renewer) { + return null; + } + @Override public long renewDelegationToken(final Token token) throws IOException { return 0; @@ -90,26 +98,29 @@ public class KeyProviderDelegationTokenExtension extends public Void cancelDelegationToken(final Token token) throws IOException { return null; } + + @Override + public Token selectDelegationToken(Credentials creds) { + return null; + } + } private KeyProviderDelegationTokenExtension(KeyProvider keyProvider, DelegationTokenExtension extensions) { super(keyProvider, extensions); } - - /** - * Passes the renewer and Credentials object to the underlying - * {@link DelegationTokenExtension} - * @param renewer the user allowed to renew the delegation tokens - * @param credentials cache in which to add new delegation tokens - * @return list of new delegation tokens - * @throws IOException thrown if IOException if an IO error occurs. - */ - public Token[] addDelegationTokens(final String renewer, - Credentials credentials) throws IOException { - return getExtension().addDelegationTokens(renewer, credentials); + + @Override + public String getCanonicalServiceName() { + return getExtension().getCanonicalServiceName(); } - + + @Override + public Token getDelegationToken(final String renewer) throws IOException { + return getExtension().getDelegationToken(renewer); + } + /** * Creates a KeyProviderDelegationTokenExtension using a given * {@link KeyProvider}. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java index aa5de2cda75..81caff45f2c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderTokenIssuer.java @@ -22,15 +22,17 @@ import java.net.URI; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer; /** * File systems that support Encryption Zones have to implement this interface. */ @InterfaceAudience.Private @InterfaceStability.Unstable -public interface KeyProviderTokenIssuer { +public interface KeyProviderTokenIssuer extends DelegationTokenIssuer { KeyProvider getKeyProvider() throws IOException; URI getKeyProviderUri() throws IOException; + } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index 26b528c6dcc..1718a1fa0dc 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -32,14 +32,13 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; -import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; import org.apache.hadoop.util.HttpExceptionUtils; import org.apache.hadoop.util.JsonSerialization; @@ -58,7 +57,6 @@ import java.io.Writer; import java.lang.reflect.UndeclaredThrowableException; import java.net.ConnectException; import java.net.HttpURLConnection; -import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.SocketTimeoutException; import java.net.URI; @@ -99,7 +97,7 @@ import static org.apache.hadoop.util.KMSUtil.parseJSONMetadata; public class KMSClientProvider extends KeyProvider implements CryptoExtension, KeyProviderDelegationTokenExtension.DelegationTokenExtension { - private static final Logger LOG = + static final Logger LOG = LoggerFactory.getLogger(KMSClientProvider.class); private static final String INVALID_SIGNATURE = "Invalid signature"; @@ -133,12 +131,12 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, private final ValueQueue encKeyVersionQueue; + private KeyProviderDelegationTokenExtension.DelegationTokenExtension + clientTokenProvider = this; + // the token's service. private final Text dtService; - - // Allow fallback to default kms server port 9600 for certain tests that do - // not specify the port explicitly in the kms provider url. - @VisibleForTesting - public static volatile boolean fallbackDefaultPortForTesting = false; + // alias in the credentials. + private final Text canonicalService; private class EncryptedQueueRefiller implements ValueQueue.QueueRefiller { @@ -162,6 +160,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, } } + static class TokenSelector extends AbstractDelegationTokenSelector { + static final TokenSelector INSTANCE = new TokenSelector(); + + TokenSelector() { + super(TOKEN_KIND); + } + } + /** * The KMS implementation of {@link TokenRenewer}. */ @@ -182,8 +188,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, @Override public long renew(Token token, Configuration conf) throws IOException { LOG.debug("Renewing delegation token {}", token); - KeyProvider keyProvider = KMSUtil.createKeyProvider(conf, - KeyProviderFactory.KEY_PROVIDER_PATH); + KeyProvider keyProvider = createKeyProvider(token, conf); try { if (!(keyProvider instanceof KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { @@ -204,8 +209,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, @Override public void cancel(Token token, Configuration conf) throws IOException { LOG.debug("Canceling delegation token {}", token); - KeyProvider keyProvider = KMSUtil.createKeyProvider(conf, - KeyProviderFactory.KEY_PROVIDER_PATH); + KeyProvider keyProvider = createKeyProvider(token, conf); try { if (!(keyProvider instanceof KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { @@ -222,6 +226,19 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, } } } + + private static KeyProvider createKeyProvider( + Token token, Configuration conf) throws IOException { + String service = token.getService().toString(); + URI uri; + if (service != null && service.startsWith(SCHEME_NAME + ":/")) { + LOG.debug("Creating key provider with token service value {}", service); + uri = URI.create(service); + } else { // conf fallback + uri = KMSUtil.getKeyProviderUri(conf); + } + return (uri != null) ? KMSUtil.createKeyProviderFromUri(conf, uri) : null; + } } public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion { @@ -283,12 +300,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, } hostsPart = t[0]; } - return createProvider(conf, origUrl, port, hostsPart); + KMSClientProvider[] providers = + createProviders(conf, origUrl, port, hostsPart); + return new LoadBalancingKMSClientProvider(providerUri, providers, conf); } return null; } - private KeyProvider createProvider(Configuration conf, + private KMSClientProvider[] createProviders(Configuration conf, URL origUrl, int port, String hostsPart) throws IOException { String[] hosts = hostsPart.split(";"); KMSClientProvider[] providers = new KMSClientProvider[hosts.length]; @@ -302,7 +321,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, throw new IOException("Could not instantiate KMSProvider.", e); } } - return new LoadBalancingKMSClientProvider(providers, conf); + return providers; } } @@ -358,13 +377,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, public KMSClientProvider(URI uri, Configuration conf) throws IOException { super(conf); kmsUrl = createServiceURL(extractKMSPath(uri)); - int kmsPort = kmsUrl.getPort(); - if ((kmsPort == -1) && fallbackDefaultPortForTesting) { - kmsPort = 9600; - } - - InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort); - dtService = SecurityUtil.buildTokenService(addr); + // the token's service so it can be instantiated for renew/cancel. + dtService = getDtService(uri); + // the canonical service is the alias for the token in the credentials. + // typically it's the actual service in the token but older clients expect + // an address. + URI serviceUri = URI.create(kmsUrl.toString()); + canonicalService = SecurityUtil.buildTokenService(serviceUri); if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) { sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); @@ -402,8 +421,22 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT), new EncryptedQueueRefiller()); authToken = new DelegationTokenAuthenticatedURL.Token(); - LOG.debug("KMSClientProvider for KMS url: {} delegation token service: {}" + - " created.", kmsUrl, dtService); + LOG.debug("KMSClientProvider created for KMS url: {} delegation token " + + "service: {} canonical service: {}.", kmsUrl, dtService, + canonicalService); + } + + protected static Text getDtService(URI uri) { + Text service; + // remove fragment for forward compatibility with logical naming. + final String fragment = uri.getFragment(); + if (fragment != null) { + service = new Text( + uri.getScheme() + ":" + uri.getSchemeSpecificPart()); + } else { + service = new Text(uri.toString()); + } + return service; } private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException { @@ -475,7 +508,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, @Override public HttpURLConnection run() throws Exception { DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(configurator); + createAuthenticatedURL(); return authUrl.openConnection(url, authToken, doAsUser); } }); @@ -931,6 +964,96 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, return encKeyVersionQueue.getSize(keyName); } + // note: this is only a crutch for backwards compatibility. + // override the instance that will be used to select a token, intended + // to allow load balancing provider to find a token issued by any of its + // sub-providers. + protected void setClientTokenProvider( + KeyProviderDelegationTokenExtension.DelegationTokenExtension provider) { + clientTokenProvider = provider; + } + + @VisibleForTesting + DelegationTokenAuthenticatedURL createAuthenticatedURL() { + return new DelegationTokenAuthenticatedURL(configurator) { + @Override + public org.apache.hadoop.security.token.Token + selectDelegationToken(URL url, Credentials creds) { + if (LOG.isDebugEnabled()) { + LOG.debug("Looking for delegation token. creds: {}", + creds.getAllTokens()); + } + // clientTokenProvider is either "this" or a load balancing instance. + // if the latter, it will first look for the load balancer's uri + // service followed by each sub-provider for backwards-compatibility. + return clientTokenProvider.selectDelegationToken(creds); + } + }; + } + + @InterfaceAudience.Private + @Override + public Token selectDelegationToken(Credentials creds) { + Token token = selectDelegationToken(creds, dtService); + if (token == null) { + token = selectDelegationToken(creds, canonicalService); + } + return token; + } + + protected static Token selectDelegationToken(Credentials creds, + Text service) { + Token token = creds.getToken(service); + LOG.debug("selected by alias={} token={}", service, token); + if (token != null && TOKEN_KIND.equals(token.getKind())) { + return token; + } + token = TokenSelector.INSTANCE.selectToken(service, creds.getAllTokens()); + LOG.debug("selected by service={} token={}", service, token); + return token; + } + + @Override + public String getCanonicalServiceName() { + return canonicalService.toString(); + } + + @Override + public Token getDelegationToken(final String renewer) throws IOException { + final URL url = createURL(null, null, null, null); + final DelegationTokenAuthenticatedURL authUrl = + new DelegationTokenAuthenticatedURL(configurator); + Token token = null; + try { + final String doAsUser = getDoAsUser(); + token = getActualUgi().doAs(new PrivilegedExceptionAction>() { + @Override + public Token run() throws Exception { + // Not using the cached token here.. Creating a new token here + // everytime. + LOG.debug("Getting new token from {}, renewer:{}", url, renewer); + return authUrl.getDelegationToken(url, + new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser); + } + }); + if (token != null) { + token.setService(dtService); + LOG.info("New token created: ({})", token); + } else { + throw new IOException("Got NULL as delegation token"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + if (e instanceof IOException) { + throw (IOException) e; + } else { + throw new IOException(e); + } + } + return token; + } + @Override public long renewDelegationToken(final Token dToken) throws IOException { try { @@ -941,7 +1064,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, LOG.debug("Renewing delegation token {} with url:{}, as:{}", token, url, doAsUser); final DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(configurator); + createAuthenticatedURL(); return getActualUgi().doAs( new PrivilegedExceptionAction() { @Override @@ -973,7 +1096,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, LOG.debug("Cancelling delegation token {} with url:{}, as:{}", dToken, url, doAsUser); final DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(configurator); + createAuthenticatedURL(); authUrl.cancelDelegationToken(url, token, doAsUser); return null; } @@ -1025,47 +1148,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, return token; } - @Override - public Token[] addDelegationTokens(final String renewer, - Credentials credentials) throws IOException { - Token[] tokens = null; - Token token = credentials.getToken(dtService); - if (token == null) { - final URL url = createURL(null, null, null, null); - final DelegationTokenAuthenticatedURL authUrl = - new DelegationTokenAuthenticatedURL(configurator); - try { - final String doAsUser = getDoAsUser(); - token = getActualUgi().doAs(new PrivilegedExceptionAction>() { - @Override - public Token run() throws Exception { - // Not using the cached token here.. Creating a new token here - // everytime. - LOG.info("Getting new token from {}, renewer:{}", url, renewer); - return authUrl.getDelegationToken(url, - new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser); - } - }); - if (token != null) { - LOG.info("New token received: ({})", token); - credentials.addToken(token.getService(), token); - tokens = new Token[] { token }; - } else { - throw new IOException("Got NULL as delegation token"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (Exception e) { - if (e instanceof IOException) { - throw (IOException) e; - } else { - throw new IOException(e); - } - } - } - return tokens; - } - private boolean containsKmsDt(UserGroupInformation ugi) throws IOException { // Add existing credentials from the UGI, since provider is cached. Credentials creds = ugi.getCredentials(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java index e68e8448aa3..6cb2cdc2d03 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/LoadBalancingKMSClientProvider.java @@ -21,6 +21,7 @@ package org.apache.hadoop.crypto.key.kms; import java.io.IOException; import java.io.InterruptedIOException; import java.net.ConnectException; +import java.net.URI; import java.security.GeneralSecurityException; import java.security.NoSuchAlgorithmException; import java.util.Arrays; @@ -36,12 +37,15 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.KMSUtil; import org.apache.hadoop.util.Time; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,19 +80,42 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements private final KMSClientProvider[] providers; private final AtomicInteger currentIdx; + private final Text dtService; // service in token. + private final Text canonicalService; // credentials alias for token. private RetryPolicy retryPolicy = null; - public LoadBalancingKMSClientProvider(KMSClientProvider[] providers, - Configuration conf) { - this(shuffle(providers), Time.monotonicNow(), conf); + public LoadBalancingKMSClientProvider(URI providerUri, + KMSClientProvider[] providers, Configuration conf) { + this(providerUri, providers, Time.monotonicNow(), conf); } @VisibleForTesting LoadBalancingKMSClientProvider(KMSClientProvider[] providers, long seed, Configuration conf) { + this(URI.create("kms://testing"), providers, seed, conf); + } + + private LoadBalancingKMSClientProvider(URI uri, + KMSClientProvider[] providers, long seed, Configuration conf) { super(conf); - this.providers = providers; + // uri is the token service so it can be instantiated for renew/cancel. + dtService = KMSClientProvider.getDtService(uri); + // if provider not in conf, new client will alias on uri else addr. + if (KMSUtil.getKeyProviderUri(conf) == null) { + canonicalService = dtService; + } else { + // canonical service (credentials alias) will be the first underlying + // provider's service. must be deterministic before shuffle so multiple + // calls for a token do not obtain another unnecessary token. + canonicalService = new Text(providers[0].getCanonicalServiceName()); + } + + // shuffle unless seed is 0 which is used by tests for determinism. + this.providers = (seed != 0) ? shuffle(providers) : providers; + for (KMSClientProvider provider : providers) { + provider.setClientTokenProvider(this); + } this.currentIdx = new AtomicInteger((int)(seed % providers.length)); int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic. KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length); @@ -106,6 +133,9 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements this.retryPolicy = RetryPolicies.failoverOnNetworkException( RetryPolicies.TRY_ONCE_THEN_FAIL, maxNumRetries, 0, sleepBaseMillis, sleepMaxMillis); + LOG.debug("Created LoadBalancingKMSClientProvider for KMS url: {} with {} " + + "providers. delegation token service: {}, canonical service: {}", + uri, providers.length, dtService, canonicalService); } @VisibleForTesting @@ -113,6 +143,23 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements return providers; } + @Override + public org.apache.hadoop.security.token.Token + selectDelegationToken(Credentials creds) { + Token token = + KMSClientProvider.selectDelegationToken(creds, canonicalService); + // fallback to querying each sub-provider. + if (token == null) { + for (KMSClientProvider provider : getProviders()) { + token = provider.selectDelegationToken(creds); + if (token != null) { + break; + } + } + } + return token; + } + private T doOp(ProviderCallable op, int currPos, boolean isIdempotent) throws IOException { if (providers.length == 0) { @@ -193,13 +240,21 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements } @Override - public Token[] - addDelegationTokens(final String renewer, final Credentials credentials) - throws IOException { - return doOp(new ProviderCallable[]>() { + public String getCanonicalServiceName() { + return canonicalService.toString(); + } + + @Override + public Token getDelegationToken(String renewer) throws IOException { + return doOp(new ProviderCallable>() { @Override - public Token[] call(KMSClientProvider provider) throws IOException { - return provider.addDelegationTokens(renewer, credentials); + public Token call(KMSClientProvider provider) throws IOException { + Token token = provider.getDelegationToken(renewer); + // override sub-providers service with our own so it can be used + // across all providers. + token.setService(dtService); + LOG.debug("New token service set. Token: ({})", token); + return token; } }, nextIdx(), false); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 66b6d44e7fa..3d40b6aadaf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -58,13 +58,13 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.util.ClassUtil; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; @@ -121,7 +121,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*; @SuppressWarnings("DeprecatedIsStillUsed") @InterfaceAudience.Public @InterfaceStability.Stable -public abstract class FileSystem extends Configured implements Closeable { +public abstract class FileSystem extends Configured + implements Closeable, DelegationTokenIssuer { public static final String FS_DEFAULT_NAME_KEY = CommonConfigurationKeys.FS_DEFAULT_NAME_KEY; public static final String DEFAULT_FS = @@ -386,6 +387,7 @@ public abstract class FileSystem extends Configured implements Closeable { */ @InterfaceAudience.Public @InterfaceStability.Evolving + @Override public String getCanonicalServiceName() { return (getChildFileSystems() == null) ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort()) @@ -600,71 +602,11 @@ public abstract class FileSystem extends Configured implements Closeable { * @throws IOException on any problem obtaining a token */ @InterfaceAudience.Private() + @Override public Token getDelegationToken(String renewer) throws IOException { return null; } - /** - * Obtain all delegation tokens used by this FileSystem that are not - * already present in the given Credentials. Existing tokens will neither - * be verified as valid nor having the given renewer. Missing tokens will - * be acquired and added to the given Credentials. - * - * Default Impl: works for simple FS with its own token - * and also for an embedded FS whose tokens are those of its - * child FileSystems (i.e. the embedded FS has no tokens of its own). - * - * @param renewer the user allowed to renew the delegation tokens - * @param credentials cache in which to add new delegation tokens - * @return list of new delegation tokens - * @throws IOException problems obtaining a token - */ - @InterfaceAudience.Public - @InterfaceStability.Evolving - public Token[] addDelegationTokens( - final String renewer, Credentials credentials) throws IOException { - if (credentials == null) { - credentials = new Credentials(); - } - final List> tokens = new ArrayList<>(); - collectDelegationTokens(renewer, credentials, tokens); - return tokens.toArray(new Token[tokens.size()]); - } - - /** - * Recursively obtain the tokens for this FileSystem and all descendant - * FileSystems as determined by {@link #getChildFileSystems()}. - * @param renewer the user allowed to renew the delegation tokens - * @param credentials cache in which to add the new delegation tokens - * @param tokens list in which to add acquired tokens - * @throws IOException problems obtaining a token - */ - private void collectDelegationTokens(final String renewer, - final Credentials credentials, - final List> tokens) - throws IOException { - final String serviceName = getCanonicalServiceName(); - // Collect token of the this filesystem and then of its embedded children - if (serviceName != null) { // fs has token, grab it - final Text service = new Text(serviceName); - Token token = credentials.getToken(service); - if (token == null) { - token = getDelegationToken(renewer); - if (token != null) { - tokens.add(token); - credentials.addToken(service, token); - } - } - } - // Now collect the tokens from the children - final FileSystem[] children = getChildFileSystems(); - if (children != null) { - for (final FileSystem fs : children) { - fs.collectDelegationTokens(renewer, credentials, tokens); - } - } - } - /** * Get all the immediate child FileSystems embedded in this FileSystem. * It does not recurse and get grand children. If a FileSystem @@ -680,6 +622,13 @@ public abstract class FileSystem extends Configured implements Closeable { return null; } + @InterfaceAudience.Private + @Override + public DelegationTokenIssuer[] getAdditionalTokenIssuers() + throws IOException { + return getChildFileSystems(); + } + /** * Create a file with the provided permission. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java index 35589a2a424..4e9881bc343 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java @@ -296,15 +296,11 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL { Credentials creds = UserGroupInformation.getCurrentUser(). getCredentials(); if (LOG.isDebugEnabled()) { - LOG.debug("Token not set, looking for delegation token. Creds:{}", - creds.getAllTokens()); + LOG.debug("Token not set, looking for delegation token. Creds:{}," + + " size:{}", creds.getAllTokens(), creds.numberOfTokens()); } if (!creds.getAllTokens().isEmpty()) { - InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(), - url.getPort()); - Text service = SecurityUtil.buildTokenService(serviceAddr); - dToken = creds.getToken(service); - LOG.debug("Using delegation token {} from service:{}", dToken, service); + dToken = selectDelegationToken(url, creds); if (dToken != null) { if (useQueryStringForDelegationToken()) { // delegation token will go in the query string, injecting it @@ -340,6 +336,21 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL { return conn; } + /** + * Select a delegation token from all tokens in credentials, based on url. + */ + @InterfaceAudience.Private + public org.apache.hadoop.security.token.Token + selectDelegationToken(URL url, Credentials creds) { + final InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(), + url.getPort()); + final Text service = SecurityUtil.buildTokenService(serviceAddr); + org.apache.hadoop.security.token.Token dToken = + creds.getToken(service); + LOG.debug("Using delegation token {} from service:{}", dToken, service); + return dToken; + } + /** * Requests a delegation token using the configured Authenticator * for authentication. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java new file mode 100644 index 00000000000..90e72b9fe50 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/org/apache/hadoop/security/token/DelegationTokenIssuer.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.security.token.org.apache.hadoop.security.token; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * Class for issuing delegation tokens. + */ +@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "Yarn"}) +@InterfaceStability.Unstable +public interface DelegationTokenIssuer { + + /** + * The service name used as the alias for the token in the credential + * token map. addDelegationTokens will use this to determine if + * a token exists, and if not, add a new token with this alias. + */ + String getCanonicalServiceName(); + + /** + * Unconditionally get a new token with the optional renewer. Returning + * null indicates the service does not issue tokens. + */ + Token getDelegationToken(String renewer) throws IOException; + + /** + * Issuers may need tokens from additional services. + */ + default DelegationTokenIssuer[] getAdditionalTokenIssuers() + throws IOException { + return null; + } + + /** + * Given a renewer, add delegation tokens for issuer and it's child issuers + * to the Credentials object if it is not already present. + *

+ * Note: This method is not intended to be overridden. Issuers should + * implement getCanonicalService and getDelegationToken to ensure + * consistent token acquisition behavior. + * + * @param renewer the user allowed to renew the delegation tokens + * @param credentials cache in which to add new delegation tokens + * @return list of new delegation tokens + * @throws IOException thrown if IOException if an IO error occurs. + */ + default Token[] addDelegationTokens( + final String renewer, Credentials credentials) throws IOException { + if (credentials == null) { + credentials = new Credentials(); + } + final List> tokens = new ArrayList<>(); + collectDelegationTokens(this, renewer, credentials, tokens); + return tokens.toArray(new Token[tokens.size()]); + } + + /** + * NEVER call this method directly. + */ + @InterfaceAudience.Private + static void collectDelegationTokens( + final DelegationTokenIssuer issuer, + final String renewer, + final Credentials credentials, + final List> tokens) throws IOException { + final String serviceName = issuer.getCanonicalServiceName(); + // Collect token of the this issuer and then of its embedded children + if (serviceName != null) { + final Text service = new Text(serviceName); + Token token = credentials.getToken(service); + if (token == null) { + token = issuer.getDelegationToken(renewer); + if (token != null) { + tokens.add(token); + credentials.addToken(service, token); + } + } + } + // Now collect the tokens from the children. + final DelegationTokenIssuer[] ancillary = + issuer.getAdditionalTokenIssuers(); + if (ancillary != null) { + for (DelegationTokenIssuer subIssuer : ancillary) { + collectDelegationTokens(subIssuer, renewer, credentials, tokens); + } + } + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java index c96c6fbde3d..5b48da15556 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java @@ -59,12 +59,23 @@ public final class KMSUtil { public static KeyProvider createKeyProvider(final Configuration conf, final String configKeyName) throws IOException { LOG.debug("Creating key provider with config key {}", configKeyName); + URI uri = getKeyProviderUri(conf, configKeyName); + return (uri != null) ? createKeyProviderFromUri(conf, uri) : null; + } + + public static URI getKeyProviderUri(final Configuration conf) { + return KMSUtil.getKeyProviderUri( + conf, KeyProviderFactory.KEY_PROVIDER_PATH); + } + + public static URI getKeyProviderUri(final Configuration conf, + final String configKeyName) { final String providerUriStr = conf.getTrimmed(configKeyName); // No provider set in conf if (providerUriStr == null || providerUriStr.isEmpty()) { return null; } - return createKeyProviderFromUri(conf, URI.create(providerUriStr)); + return URI.create(providerUriStr); } public static KeyProvider createKeyProviderFromUri(final Configuration conf, diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java index df5d3e88846..4fabc5b4f60 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/TestKeyProviderDelegationTokenExtension.java @@ -51,23 +51,27 @@ public class TestKeyProviderDelegationTokenExtension { KeyProviderDelegationTokenExtension .createKeyProviderDelegationTokenExtension(kp); Assert.assertNotNull(kpDTE1); - // Default implementation should be a no-op and return null - Assert.assertNull(kpDTE1.addDelegationTokens("user", credentials)); + Token[] tokens = kpDTE1.addDelegationTokens("user", credentials); + // Default implementation should return no tokens. + Assert.assertNotNull(tokens); + Assert.assertEquals(0, tokens.length); MockKeyProvider mock = mock(MockKeyProvider.class); Mockito.when(mock.getConf()).thenReturn(new Configuration()); - when(mock.addDelegationTokens("renewer", credentials)).thenReturn( - new Token[]{new Token(null, null, new Text("kind"), new Text( - "service"))} + when(mock.getCanonicalServiceName()).thenReturn("cservice"); + when(mock.getDelegationToken("renewer")).thenReturn( + new Token(null, null, new Text("kind"), new Text( + "tservice")) ); KeyProviderDelegationTokenExtension kpDTE2 = KeyProviderDelegationTokenExtension .createKeyProviderDelegationTokenExtension(mock); - Token[] tokens = - kpDTE2.addDelegationTokens("renewer", credentials); + tokens = kpDTE2.addDelegationTokens("renewer", credentials); Assert.assertNotNull(tokens); + Assert.assertEquals(1, tokens.length); Assert.assertEquals("kind", tokens[0].getKind().toString()); - + Assert.assertEquals("tservice", tokens[0].getService().toString()); + Assert.assertNotNull(credentials.getToken(new Text("cservice"))); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java new file mode 100644 index 00000000000..b87f45ac97a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.crypto.key.kms; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; + +import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** + * Unit test for {@link KMSClientProvider} class. + */ +public class TestKMSClientProvider { + + public static final Logger LOG = + LoggerFactory.getLogger(TestKMSClientProvider.class); + + private final Token token = new Token(); + private final Token oldToken = new Token(); + private final String urlString = "https://host:16000/kms"; + private final String providerUriString = "kms://https@host:16000/kms"; + private final String oldTokenService = "host:16000"; + + @Rule + public Timeout globalTimeout = new Timeout(60000); + + { + GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE); + } + + @Before + public void setup() { + SecurityUtil.setTokenServiceUseIp(false); + token.setKind(TOKEN_KIND); + token.setService(new Text(providerUriString)); + oldToken.setKind(TOKEN_KIND); + oldToken.setService(new Text(oldTokenService)); + } + + @Test + public void testSelectDelegationToken() throws Exception { + final Credentials creds = new Credentials(); + creds.addToken(new Text(providerUriString), token); + assertNull(KMSClientProvider.selectDelegationToken(creds, null)); + assertNull(KMSClientProvider + .selectDelegationToken(creds, new Text(oldTokenService))); + assertEquals(token, KMSClientProvider + .selectDelegationToken(creds, new Text(providerUriString))); + } + + @Test + public void testSelectTokenOldService() throws Exception { + final Configuration conf = new Configuration(); + final URI uri = new URI(providerUriString); + final KMSClientProvider kp = new KMSClientProvider(uri, conf); + try { + final Credentials creds = new Credentials(); + creds.addToken(new Text(oldTokenService), oldToken); + final Token t = kp.selectDelegationToken(creds); + assertEquals(oldToken, t); + } finally { + kp.close(); + } + } + + @Test + public void testSelectTokenWhenBothExist() throws Exception { + final Credentials creds = new Credentials(); + final Configuration conf = new Configuration(); + final URI uri = new URI(providerUriString); + final KMSClientProvider kp = new KMSClientProvider(uri, conf); + try { + creds.addToken(token.getService(), token); + creds.addToken(oldToken.getService(), oldToken); + final Token t = kp.selectDelegationToken(creds); + assertEquals("new token should be selected when both exist", token, t); + } finally { + kp.close(); + } + } + + @Test + public void testURLSelectTokenUriFormat() throws Exception { + testURLSelectToken(token); + } + + @Test + public void testURLSelectTokenIpPort() throws Exception { + testURLSelectToken(oldToken); + } + + private void testURLSelectToken(final Token tok) + throws URISyntaxException, IOException { + final Configuration conf = new Configuration(); + final URI uri = new URI(providerUriString); + final KMSClientProvider kp = new KMSClientProvider(uri, conf); + final DelegationTokenAuthenticatedURL url = kp.createAuthenticatedURL(); + final Credentials creds = new Credentials(); + creds.addToken(tok.getService(), tok); + final Token chosen = url.selectDelegationToken(new URL(urlString), creds); + assertEquals(tok, chosen); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java index 058db921793..c99c63dfffd 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java @@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key.kms; import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -49,7 +50,6 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authorize.AuthorizationException; -import org.junit.After; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; @@ -68,33 +68,27 @@ public class TestLoadBalancingKMSClientProvider { SecurityUtil.setTokenServiceUseIp(false); } - @After - public void teardown() throws IOException { - KMSClientProvider.fallbackDefaultPortForTesting = false; - } - @Test public void testCreation() throws Exception { Configuration conf = new Configuration(); - KMSClientProvider.fallbackDefaultPortForTesting = true; KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI( - "kms://http@host1/kms/foo"), conf); + "kms://http@host1:9600/kms/foo"), conf); assertTrue(kp instanceof LoadBalancingKMSClientProvider); KMSClientProvider[] providers = ((LoadBalancingKMSClientProvider) kp).getProviders(); assertEquals(1, providers.length); - assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"), + assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"), Sets.newHashSet(providers[0].getKMSUrl())); kp = new KMSClientProvider.Factory().createProvider(new URI( - "kms://http@host1;host2;host3/kms/foo"), conf); + "kms://http@host1;host2;host3:9600/kms/foo"), conf); assertTrue(kp instanceof LoadBalancingKMSClientProvider); providers = ((LoadBalancingKMSClientProvider) kp).getProviders(); assertEquals(3, providers.length); - assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/", - "http://host2/kms/foo/v1/", - "http://host3/kms/foo/v1/"), + assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/", + "http://host2:9600/kms/foo/v1/", + "http://host3:9600/kms/foo/v1/"), Sets.newHashSet(providers[0].getKMSUrl(), providers[1].getKMSUrl(), providers[2].getKMSUrl())); @@ -257,10 +251,9 @@ public class TestLoadBalancingKMSClientProvider { @Test public void testClassCastException() throws Exception { Configuration conf = new Configuration(); - KMSClientProvider.fallbackDefaultPortForTesting = true; KMSClientProvider p1 = new MyKMSClientProvider( - new URI("kms://http@host1/kms/foo"), conf); - LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( + new URI("kms://http@host1:9600/kms/foo"), conf); + LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( new KMSClientProvider[] {p1}, 0, conf); try { kp.generateEncryptedKey("foo"); @@ -878,4 +871,42 @@ public class TestLoadBalancingKMSClientProvider { verify(kp.getProviders()[2], Mockito.times(1)) .createKey(Mockito.eq(keyName), Mockito.any(Options.class)); } + + @Test + public void testTokenServiceCreationWithLegacyFormat() throws Exception { + Configuration conf = new Configuration(); + // Create keyprovider with old token format (ip:port) + conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, + "kms:/something"); + String authority = "host1:9600"; + URI kmsUri = URI.create("kms://http@" + authority + "/kms/foo"); + KeyProvider kp = + new KMSClientProvider.Factory().createProvider(kmsUri, conf); + assertTrue(kp instanceof LoadBalancingKMSClientProvider); + LoadBalancingKMSClientProvider lbkp = (LoadBalancingKMSClientProvider) kp; + assertEquals(1, lbkp.getProviders().length); + assertEquals(authority, lbkp.getCanonicalServiceName()); + for (KMSClientProvider provider : lbkp.getProviders()) { + assertEquals(authority, provider.getCanonicalServiceName()); + } + } + + @Test + public void testTokenServiceCreationWithUriFormat() throws Exception { + final Configuration conf = new Configuration(); + final URI kmsUri = URI.create("kms://http@host1;host2;host3:9600/kms/foo"); + final KeyProvider kp = + new KMSClientProvider.Factory().createProvider(kmsUri, conf); + assertTrue(kp instanceof LoadBalancingKMSClientProvider); + final LoadBalancingKMSClientProvider lbkp = + (LoadBalancingKMSClientProvider) kp; + assertEquals(kmsUri.toString(), lbkp.getCanonicalServiceName()); + KMSClientProvider[] providers = lbkp.getProviders(); + assertEquals(3, providers.length); + for (int i = 0; i < providers.length; i++) { + assertEquals(URI.create(providers[i].getKMSUrl()).getAuthority(), + providers[i].getCanonicalServiceName()); + assertNotEquals(kmsUri, providers[i].getCanonicalServiceName()); + } + } } \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java index 0e9a6125f40..a766cfb4710 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java @@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Options.CreateOpts; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.util.Progressable; import org.junit.BeforeClass; import org.junit.Test; @@ -125,6 +126,8 @@ public class TestFilterFileSystem { public int getDefaultPort(); public String getCanonicalServiceName(); public Token getDelegationToken(String renewer) throws IOException; + public DelegationTokenIssuer[] getAdditionalTokenIssuers() + throws IOException; public boolean deleteOnExit(Path f) throws IOException; public boolean cancelDeleteOnExit(Path f) throws IOException; public Token[] addDelegationTokens(String renewer, Credentials creds) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index 1b696938559..870a8286830 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.util.Progressable; import org.junit.Assert; import org.junit.Test; @@ -145,6 +146,8 @@ public class TestHarFileSystem { public int getDefaultPort(); public String getCanonicalServiceName(); public Token getDelegationToken(String renewer) throws IOException; + public DelegationTokenIssuer[] getAdditionalTokenIssuers() + throws IOException; public FileChecksum getFileChecksum(Path f) throws IOException; public boolean deleteOnExit(Path f) throws IOException; public boolean cancelDeleteOnExit(Path f) throws IOException; diff --git a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java index d709ba8baae..af59877a316 100644 --- a/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java +++ b/hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java @@ -35,6 +35,7 @@ import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider; import org.apache.hadoop.crypto.key.kms.ValueQueue; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.security.Credentials; @@ -44,6 +45,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.Whitebox; @@ -96,6 +98,8 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH; + import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -141,21 +145,78 @@ public class TestKMS { } public static abstract class KMSCallable implements Callable { - private URL kmsUrl; + private List kmsUrl; protected URL getKMSUrl() { - return kmsUrl; + return kmsUrl.get(0); + } + + protected URL[] getKMSHAUrl() { + URL[] urls = new URL[kmsUrl.size()]; + return kmsUrl.toArray(urls); + } + + protected void addKMSUrl(URL url) { + if (kmsUrl == null) { + kmsUrl = new ArrayList(); + } + kmsUrl.add(url); + } + + /* + * The format of the returned value will be + * kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2 + */ + protected String generateLoadBalancingKeyProviderUriString() { + if (kmsUrl == null || kmsUrl.size() == 0) { + return null; + } + StringBuffer sb = new StringBuffer(); + + for (int i = 0; i < kmsUrl.size(); i++) { + sb.append(KMSClientProvider.SCHEME_NAME + "://" + + kmsUrl.get(0).getProtocol() + "@"); + URL url = kmsUrl.get(i); + sb.append(url.getAuthority()); + if (url.getPath() != null) { + sb.append(url.getPath()); + } + if (i < kmsUrl.size() - 1) { + sb.append(","); + } + } + return sb.toString(); } } protected KeyProvider createProvider(URI uri, Configuration conf) throws IOException { - final KeyProvider ret = new LoadBalancingKMSClientProvider( + final KeyProvider ret = new LoadBalancingKMSClientProvider(uri, new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf); providersCreated.add(ret); return ret; } + /** + * create a LoadBalancingKMSClientProvider from an array of URIs. + * @param uris an array of KMS URIs + * @param conf configuration object + * @return a LoadBalancingKMSClientProvider object + * @throws IOException + */ + protected LoadBalancingKMSClientProvider createHAProvider(URI lbUri, + URI[] uris, Configuration conf) throws IOException { + KMSClientProvider[] providers = new KMSClientProvider[uris.length]; + for (int i = 0; i < providers.length; i++) { + providers[i] = + new KMSClientProvider(uris[i], conf); + } + final LoadBalancingKMSClientProvider ret = + new LoadBalancingKMSClientProvider(lbUri, providers, conf); + providersCreated.add(ret); + return ret; + } + private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf) throws IOException { final KMSClientProvider ret = new KMSClientProvider(uri, conf); @@ -170,22 +231,34 @@ public class TestKMS { protected T runServer(int port, String keystore, String password, File confDir, KMSCallable callable) throws Exception { + return runServer(new int[] {port}, keystore, password, confDir, callable); + } + + protected T runServer(int[] ports, String keystore, String password, + File confDir, KMSCallable callable) throws Exception { MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir) .setLog4jConfFile("log4j.properties"); if (keystore != null) { miniKMSBuilder.setSslConf(new File(keystore), password); } - if (port > 0) { - miniKMSBuilder.setPort(port); + final List kmsList = new ArrayList<>(); + + for (int i = 0; i < ports.length; i++) { + if (ports[i] > 0) { + miniKMSBuilder.setPort(ports[i]); + } + MiniKMS miniKMS = miniKMSBuilder.build(); + kmsList.add(miniKMS); + miniKMS.start(); + LOG.info("Test KMS running at: " + miniKMS.getKMSUrl()); + callable.addKMSUrl(miniKMS.getKMSUrl()); } - MiniKMS miniKMS = miniKMSBuilder.build(); - miniKMS.start(); try { - System.out.println("Test KMS running at: " + miniKMS.getKMSUrl()); - callable.kmsUrl = miniKMS.getKMSUrl(); return callable.call(); } finally { - miniKMS.stop(); + for (MiniKMS miniKMS: kmsList) { + miniKMS.stop(); + } } } @@ -240,6 +313,13 @@ public class TestKMS { return new URI("kms://" + str); } + public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception { + URI[] uris = new URI[kmsUrls.length]; + for (int i=0; i< kmsUrls.length; i++) { + uris[i] = createKMSUri(kmsUrls[i]); + } + return uris; + } private static class KerberosConfiguration extends javax.security.auth.login.Configuration { @@ -306,6 +386,7 @@ public class TestKMS { principals.add("otheradmin"); principals.add("client/host"); principals.add("client1"); + principals.add("foo"); for (KMSACLs.Type type : KMSACLs.Type.values()) { principals.add(type.toString()); } @@ -2011,7 +2092,6 @@ public class TestKMS { return null; } }); - nonKerberosUgi.addCredentials(credentials); try { @@ -2067,6 +2147,18 @@ public class TestKMS { testDelegationTokensOps(true, true); } + private Text getTokenService(KeyProvider provider) { + assertTrue("KeyProvider should be an instance of " + + "LoadBalancingKMSClientProvider", (provider instanceof + LoadBalancingKMSClientProvider)); + assertEquals("Num client providers should be 1", 1, + ((LoadBalancingKMSClientProvider)provider).getProviders().length); + final Text tokenService = new Text( + (((LoadBalancingKMSClientProvider)provider).getProviders()[0]) + .getCanonicalServiceName()); + return tokenService; + } + private void testDelegationTokensOps(final boolean ssl, final boolean kerb) throws Exception { final File confDir = getTestDir(); @@ -2103,6 +2195,10 @@ public class TestKMS { @Override public Void run() throws Exception { KeyProvider kp = createProvider(uri, clientConf); + // Unset the conf value for key provider path just to be sure that + // the key provider created for renew and cancel token is from + // token service field. + clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH); // test delegation token retrieval KeyProviderDelegationTokenExtension kpdte = KeyProviderDelegationTokenExtension. @@ -2110,13 +2206,10 @@ public class TestKMS { final Credentials credentials = new Credentials(); final Token[] tokens = kpdte.addDelegationTokens("client1", credentials); + Text tokenService = getTokenService(kp); Assert.assertEquals(1, credentials.getAllTokens().size()); - InetSocketAddress kmsAddr = - new InetSocketAddress(getKMSUrl().getHost(), - getKMSUrl().getPort()); Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, - credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)). - getKind()); + credentials.getToken(tokenService).getKind()); // Test non-renewer user cannot renew. for (Token token : tokens) { @@ -2258,15 +2351,16 @@ public class TestKMS { // Get a DT and use it. final Credentials credentials = new Credentials(); kpdte.addDelegationTokens("client", credentials); + Text tokenService = getTokenService(kp); Assert.assertEquals(1, credentials.getAllTokens().size()); Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials. - getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind()); + getToken(tokenService).getKind()); UserGroupInformation.getCurrentUser().addCredentials(credentials); LOG.info("Added kms dt to credentials: {}", UserGroupInformation. getCurrentUser().getCredentials().getAllTokens()); Token token = UserGroupInformation.getCurrentUser().getCredentials() - .getToken(SecurityUtil.buildTokenService(kmsAddr)); + .getToken(tokenService); Assert.assertNotNull(token); job1Token.add(token); @@ -2302,17 +2396,17 @@ public class TestKMS { // Get a new DT, but don't use it yet. final Credentials newCreds = new Credentials(); kpdte.addDelegationTokens("client", newCreds); + Text tokenService = getTokenService(kp); Assert.assertEquals(1, newCreds.getAllTokens().size()); Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, - newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)). + newCreds.getToken(tokenService). getKind()); // Using job 1's DT should fail. final Credentials oldCreds = new Credentials(); for (Token token : job1Token) { if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) { - oldCreds - .addToken(SecurityUtil.buildTokenService(kmsAddr), token); + oldCreds.addToken(tokenService, token); } } UserGroupInformation.getCurrentUser().addCredentials(oldCreds); @@ -2328,7 +2422,7 @@ public class TestKMS { // Using the new DT should succeed. Assert.assertEquals(1, newCreds.getAllTokens().size()); Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, - newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)). + newCreds.getToken(tokenService). getKind()); UserGroupInformation.getCurrentUser().addCredentials(newCreds); LOG.info("Credetials now are: {}", UserGroupInformation @@ -2357,7 +2451,14 @@ public class TestKMS { doKMSWithZK(true, true); } - public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception { + private T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner, + KMSCallable callable) throws Exception { + return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1); + } + + private T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner, + KMSCallable callable, int kmsSize) throws Exception { + TestingServer zkServer = null; try { zkServer = new TestingServer(); @@ -2403,43 +2504,189 @@ public class TestKMS { writeConf(testDir, conf); - KMSCallable c = - new KMSCallable() { - @Override - public KeyProvider call() throws Exception { - final Configuration conf = new Configuration(); - conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128); - final URI uri = createKMSUri(getKMSUrl()); - - final KeyProvider kp = - doAs("SET_KEY_MATERIAL", - new PrivilegedExceptionAction() { - @Override - public KeyProvider run() throws Exception { - KeyProvider kp = createProvider(uri, conf); - kp.createKey("k1", new byte[16], - new KeyProvider.Options(conf)); - kp.createKey("k2", new byte[16], - new KeyProvider.Options(conf)); - kp.createKey("k3", new byte[16], - new KeyProvider.Options(conf)); - return kp; - } - }); - return kp; - } - }; - - runServer(null, null, testDir, c); + int[] ports = new int[kmsSize]; + for (int i = 0; i < ports.length; i++) { + ports[i] = -1; + } + return runServer(ports, null, null, testDir, callable); } finally { if (zkServer != null) { zkServer.stop(); zkServer.close(); } } - } + public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception { + KMSCallable c = + new KMSCallable() { + @Override + public KeyProvider call() throws Exception { + final Configuration conf = new Configuration(); + conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128); + final URI uri = createKMSUri(getKMSUrl()); + + final KeyProvider kp = + doAs("SET_KEY_MATERIAL", + new PrivilegedExceptionAction() { + @Override + public KeyProvider run() throws Exception { + KeyProvider kp = createProvider(uri, conf); + kp.createKey("k1", new byte[16], + new KeyProvider.Options(conf)); + kp.createKey("k2", new byte[16], + new KeyProvider.Options(conf)); + kp.createKey("k3", new byte[16], + new KeyProvider.Options(conf)); + return kp; + } + }); + return kp; + } + }; + + runServerWithZooKeeper(zkDTSM, zkSigner, c); + } + + @Test + public void testKMSHAZooKeeperDelegationToken() throws Exception { + final int kmsSize = 2; + doKMSWithZKWithDelegationToken(true, true, kmsSize); + } + + private void doKMSWithZKWithDelegationToken(boolean zkDTSM, boolean zkSigner, + int kmsSize) throws Exception { + // Create a KMSCallable to execute requests after ZooKeeper and KMS are up. + KMSCallable c = new KMSCallable() { + @Override + public Void call() throws Exception { + final Configuration conf = new Configuration(); + conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128); + final URI[] uris = createKMSHAUri(getKMSHAUrl()); + final Credentials credentials = new Credentials(); + // Create a UGI without Kerberos auth. It will authenticate with tokens. + final UserGroupInformation nonKerberosUgi = + UserGroupInformation.getCurrentUser(); + final String lbUri = generateLoadBalancingKeyProviderUriString(); + final LoadBalancingKMSClientProvider lbkp = + createHAProvider(URI.create(lbUri), uris, conf); + conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH); + // get delegation tokens using kerberos login + doAs("SET_KEY_MATERIAL", + new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + KeyProviderDelegationTokenExtension kpdte = + KeyProviderDelegationTokenExtension. + createKeyProviderDelegationTokenExtension(lbkp); + kpdte.addDelegationTokens("foo", credentials); + return null; + } + }); + + nonKerberosUgi.addCredentials(credentials); + // Access KMS using delegation token for authentication, no Kerberos. + nonKerberosUgi.doAs(new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + // Create a kms client with one provider at a time. Must use one + // provider so that if it fails to authenticate, it does not fall + // back to the next KMS instance. + // Should succeed because it has delegation tokens for any instance. + int i = 0; + for (KMSClientProvider provider : lbkp.getProviders()) { + final String key = "k" + i++; + LOG.info("Connect to {} to create key {}.", provider, key); + provider.createKey(key, new KeyProvider.Options(conf)); + } + return null; + } + }); + + final Collection> tokens = + credentials.getAllTokens(); + doAs("foo", new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + assertEquals(1, tokens.size()); + Token token = tokens.iterator().next(); + assertEquals(KMSDelegationToken.TOKEN_KIND, token.getKind()); + LOG.info("Got dt for token: {}", token); + final long tokenLife = token.renew(conf); + LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife); + Thread.sleep(10); + final long newTokenLife = token.renew(conf); + LOG.info("Renewed token {}, new lifetime:{}", token, newTokenLife); + assertTrue(newTokenLife > tokenLife); + + // test delegation token cancellation + LOG.info("Got dt for token: {}", token); + token.cancel(conf); + LOG.info("Cancelled token {}", token); + try { + token.renew(conf); + fail("should not be able to renew a canceled token"); + } catch (Exception e) { + LOG.info("Expected exception when renewing token", e); + } + return null; + } + }); + + final Credentials newCredentials = new Credentials(); + doAs("SET_KEY_MATERIAL", + new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + KeyProviderDelegationTokenExtension kpdte = + KeyProviderDelegationTokenExtension. + createKeyProviderDelegationTokenExtension(lbkp); + kpdte.addDelegationTokens("foo", newCredentials); + return null; + } + }); + + doAs("foo", new PrivilegedExceptionAction() { + @Override + public Void run() throws Exception { + KMSClientProvider kp1 = lbkp.getProviders()[0]; + URL[] urls = getKMSHAUrl(); + final Collection> tokens = + newCredentials.getAllTokens(); + assertEquals(1, tokens.size()); + Token token = tokens.iterator().next(); + assertEquals(KMSDelegationToken.TOKEN_KIND, + token.getKind()); + // Testing backward compatibility of token renewal and cancellation. + // Set the token service to ip:port format and test to renew/cancel. + Text text = SecurityUtil.buildTokenService( + new InetSocketAddress(urls[0].getHost(), urls[0].getPort())); + token.setService(text); + conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri); + long tokenLife = 0L; + for (KMSClientProvider kp : lbkp.getProviders()) { + long renewedTokenLife = token.renew(conf); + LOG.info("Renewed token of kind {}, new lifetime:{}", + token.getKind(), renewedTokenLife); + assertTrue(renewedTokenLife > tokenLife); + tokenLife = renewedTokenLife; + Thread.sleep(10); + } + token.cancel(conf); + try { + token.renew(conf); + fail("should not be able to renew a canceled token"); + } catch (IOException e) { + LOG.info("Expected exception when renewing token", e); + } + return null; + } + }); + return null; + } + }; + runServerWithZooKeeper(zkDTSM, zkSigner, c, kmsSize); + } @Test public void testProxyUserKerb() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 5511657e1b3..ce1083deb7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -61,6 +61,7 @@ import org.apache.hadoop.crypto.CryptoInputStream; import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; +import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.ContentSummary; @@ -205,7 +206,7 @@ import com.google.common.net.InetAddresses; ********************************************************/ @InterfaceAudience.Private public class DFSClient implements java.io.Closeable, RemotePeerFactory, - DataEncryptionKeyFactory { + DataEncryptionKeyFactory, KeyProviderTokenIssuer { public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class); private final Configuration conf; @@ -684,6 +685,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return (dtService != null) ? dtService.toString() : null; } + @Override + public TokengetDelegationToken(String renewer) throws IOException { + return getDelegationToken(renewer == null ? null : new Text(renewer)); + } + /** * @see ClientProtocol#getDelegationToken(Text) */ @@ -3029,7 +3035,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return HEDGED_READ_METRIC; } - URI getKeyProviderUri() throws IOException { + @Override + public URI getKeyProviderUri() throws IOException { return HdfsKMSUtil.getKeyProviderUri(ugi, namenodeUri, getServerDefaults().getKeyProviderUri(), conf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 3532a71da5b..12bc73cf821 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -102,8 +102,8 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.io.Text; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.util.ChunkedArrayList; import org.apache.hadoop.util.Progressable; @@ -2818,11 +2818,13 @@ public class DistributedFileSystem extends FileSystem } @Override - public Token[] addDelegationTokens( - final String renewer, Credentials credentials) throws IOException { - Token[] tokens = super.addDelegationTokens(renewer, credentials); - return HdfsKMSUtil.addDelegationTokensForKeyProvider( - this, renewer, credentials, uri, tokens); + public DelegationTokenIssuer[] getAdditionalTokenIssuers() + throws IOException { + KeyProvider keyProvider = getKeyProvider(); + if (keyProvider instanceof DelegationTokenIssuer) { + return new DelegationTokenIssuer[]{(DelegationTokenIssuer)keyProvider}; + } + return null; } public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsKMSUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsKMSUtil.java index de27f7e0ede..30e8aa7b479 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsKMSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/HdfsKMSUtil.java @@ -35,14 +35,12 @@ import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; -import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.KMSUtil; /** @@ -71,32 +69,6 @@ public final class HdfsKMSUtil { return KMSUtil.createKeyProvider(conf, keyProviderUriKeyName); } - public static Token[] addDelegationTokensForKeyProvider( - KeyProviderTokenIssuer kpTokenIssuer, final String renewer, - Credentials credentials, URI namenodeUri, Token[] tokens) - throws IOException { - KeyProvider keyProvider = kpTokenIssuer.getKeyProvider(); - if (keyProvider != null) { - KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension - = KeyProviderDelegationTokenExtension. - createKeyProviderDelegationTokenExtension(keyProvider); - Token[] kpTokens = keyProviderDelegationTokenExtension. - addDelegationTokens(renewer, credentials); - credentials.addSecretKey(getKeyProviderMapKey(namenodeUri), - DFSUtilClient.string2Bytes( - kpTokenIssuer.getKeyProviderUri().toString())); - if (tokens != null && kpTokens != null) { - Token[] all = new Token[tokens.length + kpTokens.length]; - System.arraycopy(tokens, 0, all, 0, tokens.length); - System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length); - tokens = all; - } else { - tokens = (tokens != null) ? tokens : kpTokens; - } - } - return tokens; - } - /** * Obtain the crypto protocol version from the provided FileEncryptionInfo, * checking to see if this version is supported by. @@ -161,30 +133,38 @@ public final class HdfsKMSUtil { URI keyProviderUri = null; // Lookup the secret in credentials object for namenodeuri. Credentials credentials = ugi.getCredentials(); + Text credsKey = getKeyProviderMapKey(namenodeUri); byte[] keyProviderUriBytes = - credentials.getSecretKey(getKeyProviderMapKey(namenodeUri)); + credentials.getSecretKey(credsKey); if(keyProviderUriBytes != null) { keyProviderUri = URI.create(DFSUtilClient.bytes2String(keyProviderUriBytes)); - return keyProviderUri; } - - if (keyProviderUriStr != null) { - if (!keyProviderUriStr.isEmpty()) { + if (keyProviderUri == null) { + // NN is old and doesn't report provider, so use conf. + if (keyProviderUriStr == null) { + keyProviderUri = KMSUtil.getKeyProviderUri(conf, keyProviderUriKeyName); + } else if (!keyProviderUriStr.isEmpty()) { keyProviderUri = URI.create(keyProviderUriStr); } - return keyProviderUri; - } - - // Last thing is to trust its own conf to be backwards compatible. - String keyProviderUriFromConf = conf.getTrimmed( - CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH); - if (keyProviderUriFromConf != null && !keyProviderUriFromConf.isEmpty()) { - keyProviderUri = URI.create(keyProviderUriFromConf); + if (keyProviderUri != null) { + credentials.addSecretKey( + credsKey, DFSUtilClient.string2Bytes(keyProviderUri.toString())); + } } return keyProviderUri; } + public static KeyProvider getKeyProvider(KeyProviderTokenIssuer issuer, + Configuration conf) + throws IOException { + URI keyProviderUri = issuer.getKeyProviderUri(); + if (keyProviderUri != null) { + return KMSUtil.createKeyProviderFromUri(conf, keyProviderUri); + } + return null; + } + /** * Returns a key to map namenode uri to key provider uri. * Tasks will lookup this key to find key Provider. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index d504cfeb3ce..b7325ba8796 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -111,7 +111,6 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; @@ -119,6 +118,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; +import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer; import org.apache.hadoop.util.JsonSerialization; import org.apache.hadoop.util.KMSUtil; import org.apache.hadoop.util.Progressable; @@ -1690,6 +1690,16 @@ public class WebHdfsFileSystem extends FileSystem return token; } + @Override + public DelegationTokenIssuer[] getAdditionalTokenIssuers() + throws IOException { + KeyProvider keyProvider = getKeyProvider(); + if (keyProvider instanceof DelegationTokenIssuer) { + return new DelegationTokenIssuer[] {(DelegationTokenIssuer) keyProvider}; + } + return null; + } + @Override public synchronized Token getRenewToken() { return delegationToken; @@ -1725,14 +1735,6 @@ public class WebHdfsFileSystem extends FileSystem ).run(); } - @Override - public Token[] addDelegationTokens(String renewer, - Credentials credentials) throws IOException { - Token[] tokens = super.addDelegationTokens(renewer, credentials); - return HdfsKMSUtil.addDelegationTokensForKeyProvider(this, renewer, - credentials, getUri(), tokens); - } - public BlockLocation[] getFileBlockLocations(final FileStatus status, final long offset, final long length) throws IOException { if (status == null) {