HADOOP-13155. Implement TokenRenewer to renew and cancel delegation tokens in KMS. Contributed by Xiao Chen.
(cherry picked from commit713cb71820
) Conflicts: hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java (cherry picked from commit03c4724c88
) Conflicts: hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
This commit is contained in:
parent
e54f073cfe
commit
d712b2ee3b
|
@ -34,7 +34,7 @@ public class KeyProviderDelegationTokenExtension extends
|
|||
new DefaultDelegationTokenExtension();
|
||||
|
||||
/**
|
||||
* DelegationTokenExtension is a type of Extension that exposes methods to
|
||||
* DelegationTokenExtension is a type of Extension that exposes methods
|
||||
* needed to work with Delegation Tokens.
|
||||
*/
|
||||
public interface DelegationTokenExtension extends
|
||||
|
@ -49,8 +49,23 @@ public class KeyProviderDelegationTokenExtension extends
|
|||
* @return list of new delegation tokens
|
||||
* @throws IOException thrown if IOException if an IO error occurs.
|
||||
*/
|
||||
public Token<?>[] addDelegationTokens(final String renewer,
|
||||
Token<?>[] addDelegationTokens(final String renewer,
|
||||
Credentials credentials) throws IOException;
|
||||
|
||||
/**
|
||||
* Renews the given token.
|
||||
* @param token The token to be renewed.
|
||||
* @return The token's lifetime after renewal, or 0 if it can't be renewed.
|
||||
* @throws IOException
|
||||
*/
|
||||
long renewDelegationToken(final Token<?> token) throws IOException;
|
||||
|
||||
/**
|
||||
* Cancels the given token.
|
||||
* @param token The token to be cancelled.
|
||||
* @throws IOException
|
||||
*/
|
||||
Void cancelDelegationToken(final Token<?> token) throws IOException;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -66,6 +81,15 @@ public class KeyProviderDelegationTokenExtension extends
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renewDelegationToken(final Token<?> token) throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void cancelDelegationToken(final Token<?> token) throws IOException {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private KeyProviderDelegationTokenExtension(KeyProvider keyProvider,
|
||||
|
|
|
@ -38,8 +38,11 @@ import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
|||
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenRenewer;
|
||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.util.HttpExceptionUtils;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
import org.apache.http.client.utils.URIBuilder;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -94,7 +97,8 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|||
|
||||
private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed";
|
||||
|
||||
public static final String TOKEN_KIND = "kms-dt";
|
||||
public static final String TOKEN_KIND_STR = "kms-dt";
|
||||
public static final Text TOKEN_KIND = new Text(TOKEN_KIND_STR);
|
||||
|
||||
public static final String SCHEME_NAME = "kms";
|
||||
|
||||
|
@ -146,6 +150,54 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The KMS implementation of {@link TokenRenewer}.
|
||||
*/
|
||||
public static class KMSTokenRenewer extends TokenRenewer {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(KMSTokenRenewer.class);
|
||||
|
||||
@Override
|
||||
public boolean handleKind(Text kind) {
|
||||
return kind.equals(TOKEN_KIND);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isManaged(Token<?> token) throws IOException {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renew(Token<?> token, Configuration conf) throws IOException {
|
||||
LOG.debug("Renewing delegation token {}", token);
|
||||
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
|
||||
KeyProviderFactory.KEY_PROVIDER_PATH);
|
||||
if (!(keyProvider instanceof
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||
LOG.warn("keyProvider {} cannot renew dt.", keyProvider == null ?
|
||||
"null" : keyProvider.getClass());
|
||||
return 0;
|
||||
}
|
||||
return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
||||
keyProvider).renewDelegationToken(token);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel(Token<?> token, Configuration conf) throws IOException {
|
||||
LOG.debug("Canceling delegation token {}", token);
|
||||
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
|
||||
KeyProviderFactory.KEY_PROVIDER_PATH);
|
||||
if (!(keyProvider instanceof
|
||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||
LOG.warn("keyProvider {} cannot cancel dt.", keyProvider == null ?
|
||||
"null" : keyProvider.getClass());
|
||||
return;
|
||||
}
|
||||
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
||||
keyProvider).cancelDelegationToken(token);
|
||||
}
|
||||
}
|
||||
|
||||
public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
|
||||
public KMSEncryptedKeyVersion(String keyName, String keyVersionName,
|
||||
byte[] iv, String encryptedVersionName, byte[] keyMaterial) {
|
||||
|
@ -853,6 +905,100 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|||
return encKeyVersionQueue.getSize(keyName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renewDelegationToken(final Token<?> dToken) throws IOException {
|
||||
try {
|
||||
final String doAsUser = getDoAsUser();
|
||||
final DelegationTokenAuthenticatedURL.Token token =
|
||||
generateDelegationToken(dToken);
|
||||
final URL url = createURL(null, null, null, null);
|
||||
LOG.debug("Renewing delegation token {} with url:{}, as:{}",
|
||||
token, url, doAsUser);
|
||||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
return actualUgi.doAs(
|
||||
new PrivilegedExceptionAction<Long>() {
|
||||
@Override
|
||||
public Long run() throws Exception {
|
||||
return authUrl.renewDelegationToken(url, token, doAsUser);
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (Exception ex) {
|
||||
if (ex instanceof IOException) {
|
||||
throw (IOException) ex;
|
||||
} else {
|
||||
throw new IOException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void cancelDelegationToken(final Token<?> dToken) throws IOException {
|
||||
try {
|
||||
final String doAsUser = getDoAsUser();
|
||||
final DelegationTokenAuthenticatedURL.Token token =
|
||||
generateDelegationToken(dToken);
|
||||
return actualUgi.doAs(
|
||||
new PrivilegedExceptionAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws Exception {
|
||||
final URL url = createURL(null, null, null, null);
|
||||
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
|
||||
dToken, url, doAsUser);
|
||||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
authUrl.cancelDelegationToken(url, token, doAsUser);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
);
|
||||
} catch (Exception ex) {
|
||||
if (ex instanceof IOException) {
|
||||
throw (IOException) ex;
|
||||
} else {
|
||||
throw new IOException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the doAs user name.
|
||||
*
|
||||
* 'actualUGI' is the UGI of the user creating the client
|
||||
* It is possible that the creator of the KMSClientProvier
|
||||
* calls this method on behalf of a proxyUser (the doAsUser).
|
||||
* In which case this call has to be made as the proxy user.
|
||||
*
|
||||
* @return the doAs user name.
|
||||
* @throws IOException
|
||||
*/
|
||||
private String getDoAsUser() throws IOException {
|
||||
UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();
|
||||
return (currentUgi.getAuthenticationMethod() ==
|
||||
UserGroupInformation.AuthenticationMethod.PROXY)
|
||||
? currentUgi.getShortUserName() : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate a DelegationTokenAuthenticatedURL.Token from the given generic
|
||||
* typed delegation token.
|
||||
*
|
||||
* @param dToken The delegation token.
|
||||
* @return The DelegationTokenAuthenticatedURL.Token, with its delegation
|
||||
* token set to the delegation token passed in.
|
||||
*/
|
||||
private DelegationTokenAuthenticatedURL.Token generateDelegationToken(
|
||||
final Token<?> dToken) {
|
||||
DelegationTokenAuthenticatedURL.Token token =
|
||||
new DelegationTokenAuthenticatedURL.Token();
|
||||
Token<AbstractDelegationTokenIdentifier> dt =
|
||||
new Token<>(dToken.getIdentifier(), dToken.getPassword(),
|
||||
dToken.getKind(), dToken.getService());
|
||||
token.setDelegationToken(dt);
|
||||
return token;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?>[] addDelegationTokens(final String renewer,
|
||||
Credentials credentials) throws IOException {
|
||||
|
@ -864,15 +1010,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
|||
final DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(configurator);
|
||||
try {
|
||||
// 'actualUGI' is the UGI of the user creating the client
|
||||
// It is possible that the creator of the KMSClientProvier
|
||||
// calls this method on behalf of a proxyUser (the doAsUser).
|
||||
// In which case this call has to be made as the proxy user.
|
||||
UserGroupInformation currentUgi = UserGroupInformation.getCurrentUser();
|
||||
final String doAsUser = (currentUgi.getAuthenticationMethod() ==
|
||||
UserGroupInformation.AuthenticationMethod.PROXY)
|
||||
? currentUgi.getShortUserName() : null;
|
||||
|
||||
final String doAsUser = getDoAsUser();
|
||||
token = actualUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
|
||||
@Override
|
||||
public Token<?> run() throws Exception {
|
||||
|
|
|
@ -134,6 +134,27 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
|
|||
}, nextIdx());
|
||||
}
|
||||
|
||||
@Override
|
||||
public long renewDelegationToken(final Token<?> token) throws IOException {
|
||||
return doOp(new ProviderCallable<Long>() {
|
||||
@Override
|
||||
public Long call(KMSClientProvider provider) throws IOException {
|
||||
return provider.renewDelegationToken(token);
|
||||
}
|
||||
}, nextIdx());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void cancelDelegationToken(final Token<?> token) throws IOException {
|
||||
return doOp(new ProviderCallable<Void>() {
|
||||
@Override
|
||||
public Void call(KMSClientProvider provider) throws IOException {
|
||||
provider.cancelDelegationToken(token);
|
||||
return null;
|
||||
}
|
||||
}, nextIdx());
|
||||
}
|
||||
|
||||
// This request is sent to all providers in the load-balancing group
|
||||
@Override
|
||||
public void warmUpEncryptedKeys(String... keyNames) throws IOException {
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
|
||||
/**
|
||||
* Utils for KMS.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class KMSUtil {
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(KMSUtil.class);
|
||||
|
||||
private KMSUtil() { /* Hidden constructor */ }
|
||||
|
||||
/**
|
||||
* Creates a new KeyProvider from the given Configuration
|
||||
* and configuration key name.
|
||||
*
|
||||
* @param conf Configuration
|
||||
* @param configKeyName The configuration key name
|
||||
* @return new KeyProvider, or null if no provider was found.
|
||||
* @throws IOException if the KeyProvider is improperly specified in
|
||||
* the Configuration
|
||||
*/
|
||||
public static KeyProvider createKeyProvider(final Configuration conf,
|
||||
final String configKeyName) throws IOException {
|
||||
LOG.debug("Creating key provider with config key {}", configKeyName);
|
||||
final String providerUriStr = conf.getTrimmed(configKeyName, "");
|
||||
// No provider set in conf
|
||||
if (providerUriStr.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
final URI providerUri;
|
||||
try {
|
||||
providerUri = new URI(providerUriStr);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
|
||||
if (keyProvider == null) {
|
||||
throw new IOException("Could not instantiate KeyProvider from " +
|
||||
configKeyName + " setting of '" + providerUriStr + "'");
|
||||
}
|
||||
if (keyProvider.isTransient()) {
|
||||
throw new IOException("KeyProvider " + keyProvider.toString()
|
||||
+ " was found but it is a transient provider.");
|
||||
}
|
||||
return keyProvider;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
org.apache.hadoop.crypto.key.kms.KMSClientProvider$KMSTokenRenewer
|
|
@ -72,7 +72,7 @@ public class KMSAuthenticationFilter
|
|||
KerberosDelegationTokenAuthenticationHandler.class.getName());
|
||||
}
|
||||
props.setProperty(DelegationTokenAuthenticationHandler.TOKEN_KIND,
|
||||
KMSClientProvider.TOKEN_KIND);
|
||||
KMSClientProvider.TOKEN_KIND_STR);
|
||||
return props;
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.crypto.key.kms.server;
|
|||
|
||||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.kms.server.KeyAuthorizationKeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider.Options;
|
||||
|
@ -31,11 +31,14 @@ import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
|
|||
import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.minikdc.MiniKdc;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||
import org.apache.hadoop.security.authentication.client.Authenticator;
|
||||
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
|
||||
import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
|
@ -45,11 +48,12 @@ import org.junit.AfterClass;
|
|||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.security.auth.kerberos.KerberosPrincipal;
|
||||
import javax.security.auth.login.AppConfigurationEntry;
|
||||
|
||||
import java.io.File;
|
||||
|
@ -72,9 +76,17 @@ import java.util.Properties;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.Callable;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TestKMS {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestKMS.class);
|
||||
|
||||
@Rule
|
||||
public final Timeout testTimeout = new Timeout(180000);
|
||||
|
||||
@Before
|
||||
public void cleanUp() {
|
||||
// resetting kerberos security
|
||||
|
@ -649,20 +661,6 @@ public class TestKMS {
|
|||
Assert.assertEquals("d", meta.getDescription());
|
||||
Assert.assertEquals(attributes, meta.getAttributes());
|
||||
|
||||
// test delegation token retrieval
|
||||
KeyProviderDelegationTokenExtension kpdte =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(kp);
|
||||
Credentials credentials = new Credentials();
|
||||
kpdte.addDelegationTokens("foo", credentials);
|
||||
Assert.assertEquals(1, credentials.getAllTokens().size());
|
||||
InetSocketAddress kmsAddr = new InetSocketAddress(getKMSUrl().getHost(),
|
||||
getKMSUrl().getPort());
|
||||
|
||||
Assert.assertEquals(new Text("kms-dt"), credentials.getToken(
|
||||
SecurityUtil.buildTokenService(kmsAddr)).getKind());
|
||||
|
||||
|
||||
// test rollover draining
|
||||
KeyProviderCryptoExtension kpce = KeyProviderCryptoExtension.
|
||||
createKeyProviderCryptoExtension(kp);
|
||||
|
@ -1746,6 +1744,101 @@ public class TestKMS {
|
|||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelegationTokensOpsSimple() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
final Authenticator mock = mock(PseudoAuthenticator.class);
|
||||
testDelegationTokensOps(conf, mock);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDelegationTokensOpsKerberized() throws Exception {
|
||||
final Configuration conf = new Configuration();
|
||||
conf.set("hadoop.security.authentication", "kerberos");
|
||||
final Authenticator mock = mock(KerberosAuthenticator.class);
|
||||
testDelegationTokensOps(conf, mock);
|
||||
}
|
||||
|
||||
private void testDelegationTokensOps(Configuration conf,
|
||||
final Authenticator mockAuthenticator) throws Exception {
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
File confDir = getTestDir();
|
||||
conf = createBaseKMSConf(confDir);
|
||||
writeConf(confDir, conf);
|
||||
doNothing().when(mockAuthenticator).authenticate(any(URL.class),
|
||||
any(AuthenticatedURL.Token.class));
|
||||
|
||||
runServer(null, null, confDir, new KMSCallable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
URI uri = createKMSUri(getKMSUrl());
|
||||
KeyProvider kp = createProvider(uri, conf);
|
||||
conf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||
createKMSUri(getKMSUrl()).toString());
|
||||
|
||||
// test delegation token retrieval
|
||||
KeyProviderDelegationTokenExtension kpdte =
|
||||
KeyProviderDelegationTokenExtension.
|
||||
createKeyProviderDelegationTokenExtension(kp);
|
||||
Credentials credentials = new Credentials();
|
||||
final Token<?>[] tokens = kpdte.addDelegationTokens(
|
||||
UserGroupInformation.getCurrentUser().getUserName(), credentials);
|
||||
Assert.assertEquals(1, credentials.getAllTokens().size());
|
||||
InetSocketAddress kmsAddr = new InetSocketAddress(getKMSUrl().getHost(),
|
||||
getKMSUrl().getPort());
|
||||
Assert.assertEquals(KMSClientProvider.TOKEN_KIND,
|
||||
credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
||||
getKind());
|
||||
|
||||
// After this point, we're supposed to use the delegation token to auth.
|
||||
doThrow(new IOException("Authenticator should not fall back"))
|
||||
.when(mockAuthenticator).authenticate(any(URL.class),
|
||||
any(AuthenticatedURL.Token.class));
|
||||
|
||||
// test delegation token renewal
|
||||
boolean renewed = false;
|
||||
for (Token<?> token : tokens) {
|
||||
if (!(token.getKind().equals(KMSClientProvider.TOKEN_KIND))) {
|
||||
LOG.info("Skipping token {}", token);
|
||||
continue;
|
||||
}
|
||||
LOG.info("Got dt for " + uri + "; " + token);
|
||||
long tokenLife = token.renew(conf);
|
||||
LOG.info("Renewed token of kind {}, new lifetime:{}",
|
||||
token.getKind(), tokenLife);
|
||||
Thread.sleep(100);
|
||||
long newTokenLife = token.renew(conf);
|
||||
LOG.info("Renewed token of kind {}, new lifetime:{}",
|
||||
token.getKind(), newTokenLife);
|
||||
Assert.assertTrue(newTokenLife > tokenLife);
|
||||
renewed = true;
|
||||
}
|
||||
Assert.assertTrue(renewed);
|
||||
|
||||
// test delegation token cancellation
|
||||
for (Token<?> token : tokens) {
|
||||
if (!(token.getKind().equals(KMSClientProvider.TOKEN_KIND))) {
|
||||
LOG.info("Skipping token {}", token);
|
||||
continue;
|
||||
}
|
||||
LOG.info("Got dt for " + uri + "; " + token);
|
||||
token.cancel(conf);
|
||||
LOG.info("Cancelled token of kind {}", token.getKind());
|
||||
doNothing().when(mockAuthenticator).
|
||||
authenticate(any(URL.class), any(AuthenticatedURL.Token.class));
|
||||
try {
|
||||
token.renew(conf);
|
||||
Assert.fail("should not be able to renew a canceled token");
|
||||
} catch (Exception e) {
|
||||
LOG.info("Expected exception when trying to renew token", e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testKMSWithZKSigner() throws Exception {
|
||||
doKMSWithZK(true, false);
|
||||
|
|
|
@ -43,7 +43,6 @@ import javax.net.SocketFactory;
|
|||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||
import org.apache.hadoop.crypto.key.KeyProviderFactory;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -68,6 +67,7 @@ import org.apache.hadoop.net.NetUtils;
|
|||
import org.apache.hadoop.net.NodeBase;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.KMSUtil;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -498,6 +498,17 @@ public class DFSUtilClient {
|
|||
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
|
||||
}
|
||||
|
||||
private static String keyProviderUriKeyName =
|
||||
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI;
|
||||
|
||||
/**
|
||||
* Set the key provider uri configuration key name for creating key providers.
|
||||
* @param keyName The configuration key name.
|
||||
*/
|
||||
public static void setKeyProviderUriKeyName(final String keyName) {
|
||||
keyProviderUriKeyName = keyName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new KeyProvider from the given Configuration.
|
||||
*
|
||||
|
@ -508,29 +519,7 @@ public class DFSUtilClient {
|
|||
*/
|
||||
public static KeyProvider createKeyProvider(
|
||||
final Configuration conf) throws IOException {
|
||||
final String providerUriStr =
|
||||
conf.getTrimmed(HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
|
||||
// No provider set in conf
|
||||
if (providerUriStr.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
final URI providerUri;
|
||||
try {
|
||||
providerUri = new URI(providerUriStr);
|
||||
} catch (URISyntaxException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
|
||||
if (keyProvider == null) {
|
||||
throw new IOException("Could not instantiate KeyProvider from " +
|
||||
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '"
|
||||
+ providerUriStr + "'");
|
||||
}
|
||||
if (keyProvider.isTransient()) {
|
||||
throw new IOException("KeyProvider " + keyProvider.toString()
|
||||
+ " was found but it is a transient provider.");
|
||||
}
|
||||
return keyProvider;
|
||||
return KMSUtil.createKeyProvider(conf, keyProviderUriKeyName);
|
||||
}
|
||||
|
||||
public static Peer peerFromSocket(Socket socket)
|
||||
|
|
Loading…
Reference in New Issue