HADOOP-14445. Use DelegationTokenIssuer to create KMS delegation tokens that can authenticate to all KMS instances.

Contributed by Daryn Sharp, Xiao Chen, Rushabh S Shah.
This commit is contained in:
Xiao Chen 2018-10-12 11:50:54 -07:00
parent 53b522af6d
commit ff7ca472d2
18 changed files with 965 additions and 315 deletions

View File

@ -17,8 +17,12 @@
*/ */
package org.apache.hadoop.crypto.key; package org.apache.hadoop.crypto.key;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException; import java.io.IOException;
@ -28,7 +32,8 @@ import java.io.IOException;
*/ */
public class KeyProviderDelegationTokenExtension extends public class KeyProviderDelegationTokenExtension extends
KeyProviderExtension KeyProviderExtension
<KeyProviderDelegationTokenExtension.DelegationTokenExtension> { <KeyProviderDelegationTokenExtension.DelegationTokenExtension>
implements DelegationTokenIssuer {
private static DelegationTokenExtension DEFAULT_EXTENSION = private static DelegationTokenExtension DEFAULT_EXTENSION =
new DefaultDelegationTokenExtension(); new DefaultDelegationTokenExtension();
@ -37,21 +42,8 @@ public class KeyProviderDelegationTokenExtension extends
* DelegationTokenExtension is a type of Extension that exposes methods * DelegationTokenExtension is a type of Extension that exposes methods
* needed to work with Delegation Tokens. * needed to work with Delegation Tokens.
*/ */
public interface DelegationTokenExtension extends public interface DelegationTokenExtension
KeyProviderExtension.Extension { extends KeyProviderExtension.Extension, DelegationTokenIssuer {
/**
* The implementer of this class will take a renewer and add all
* delegation tokens associated with the renewer to the
* <code>Credentials</code> object if it is not already present,
* @param renewer the user allowed to renew the delegation tokens
* @param credentials cache in which to add new delegation tokens
* @return list of new delegation tokens
* @throws IOException thrown if IOException if an IO error occurs.
*/
Token<?>[] addDelegationTokens(final String renewer,
Credentials credentials) throws IOException;
/** /**
* Renews the given token. * Renews the given token.
* @param token The token to be renewed. * @param token The token to be renewed.
@ -66,6 +58,12 @@ public class KeyProviderDelegationTokenExtension extends
* @throws IOException * @throws IOException
*/ */
Void cancelDelegationToken(final Token<?> token) throws IOException; Void cancelDelegationToken(final Token<?> token) throws IOException;
// Do NOT call this. Only intended for internal use.
@VisibleForTesting
@InterfaceAudience.Private
@InterfaceStability.Unstable
Token<?> selectDelegationToken(Credentials creds);
} }
/** /**
@ -81,6 +79,16 @@ public class KeyProviderDelegationTokenExtension extends
return null; return null;
} }
@Override
public String getCanonicalServiceName() {
return null;
}
@Override
public Token<?> getDelegationToken(String renewer) {
return null;
}
@Override @Override
public long renewDelegationToken(final Token<?> token) throws IOException { public long renewDelegationToken(final Token<?> token) throws IOException {
return 0; return 0;
@ -90,6 +98,12 @@ public class KeyProviderDelegationTokenExtension extends
public Void cancelDelegationToken(final Token<?> token) throws IOException { public Void cancelDelegationToken(final Token<?> token) throws IOException {
return null; return null;
} }
@Override
public Token<?> selectDelegationToken(Credentials creds) {
return null;
}
} }
private KeyProviderDelegationTokenExtension(KeyProvider keyProvider, private KeyProviderDelegationTokenExtension(KeyProvider keyProvider,
@ -97,17 +111,14 @@ public class KeyProviderDelegationTokenExtension extends
super(keyProvider, extensions); super(keyProvider, extensions);
} }
/** @Override
* Passes the renewer and Credentials object to the underlying public String getCanonicalServiceName() {
* {@link DelegationTokenExtension} return getExtension().getCanonicalServiceName();
* @param renewer the user allowed to renew the delegation tokens }
* @param credentials cache in which to add new delegation tokens
* @return list of new delegation tokens @Override
* @throws IOException thrown if IOException if an IO error occurs. public Token<?> getDelegationToken(final String renewer) throws IOException {
*/ return getExtension().getDelegationToken(renewer);
public Token<?>[] addDelegationTokens(final String renewer,
Credentials credentials) throws IOException {
return getExtension().addDelegationTokens(renewer, credentials);
} }
/** /**

View File

@ -22,15 +22,17 @@ import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
/** /**
* File systems that support Encryption Zones have to implement this interface. * File systems that support Encryption Zones have to implement this interface.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public interface KeyProviderTokenIssuer { public interface KeyProviderTokenIssuer extends DelegationTokenIssuer {
KeyProvider getKeyProvider() throws IOException; KeyProvider getKeyProvider() throws IOException;
URI getKeyProviderUri() throws IOException; URI getKeyProviderUri() throws IOException;
} }

View File

@ -32,14 +32,13 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.ProviderUtils; import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator; import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL; import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.util.HttpExceptionUtils; import org.apache.hadoop.util.HttpExceptionUtils;
import org.apache.hadoop.util.KMSUtil; import org.apache.hadoop.util.KMSUtil;
@ -56,7 +55,6 @@ import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.lang.reflect.UndeclaredThrowableException; import java.lang.reflect.UndeclaredThrowableException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.net.URI; import java.net.URI;
@ -98,7 +96,7 @@ import static org.apache.hadoop.util.KMSUtil.parseJSONMetadata;
public class KMSClientProvider extends KeyProvider implements CryptoExtension, public class KMSClientProvider extends KeyProvider implements CryptoExtension,
KeyProviderDelegationTokenExtension.DelegationTokenExtension { KeyProviderDelegationTokenExtension.DelegationTokenExtension {
private static final Logger LOG = static final Logger LOG =
LoggerFactory.getLogger(KMSClientProvider.class); LoggerFactory.getLogger(KMSClientProvider.class);
private static final String INVALID_SIGNATURE = "Invalid signature"; private static final String INVALID_SIGNATURE = "Invalid signature";
@ -135,12 +133,12 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
private static final ObjectWriter WRITER = private static final ObjectWriter WRITER =
new ObjectMapper().writerWithDefaultPrettyPrinter(); new ObjectMapper().writerWithDefaultPrettyPrinter();
private KeyProviderDelegationTokenExtension.DelegationTokenExtension
clientTokenProvider = this;
// the token's service.
private final Text dtService; private final Text dtService;
// alias in the credentials.
// Allow fallback to default kms server port 9600 for certain tests that do private final Text canonicalService;
// not specify the port explicitly in the kms provider url.
@VisibleForTesting
public static volatile boolean fallbackDefaultPortForTesting = false;
private class EncryptedQueueRefiller implements private class EncryptedQueueRefiller implements
ValueQueue.QueueRefiller<EncryptedKeyVersion> { ValueQueue.QueueRefiller<EncryptedKeyVersion> {
@ -164,6 +162,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
} }
} }
static class TokenSelector extends AbstractDelegationTokenSelector {
static final TokenSelector INSTANCE = new TokenSelector();
TokenSelector() {
super(TOKEN_KIND);
}
}
/** /**
* The KMS implementation of {@link TokenRenewer}. * The KMS implementation of {@link TokenRenewer}.
*/ */
@ -184,8 +190,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
@Override @Override
public long renew(Token<?> token, Configuration conf) throws IOException { public long renew(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Renewing delegation token {}", token); LOG.debug("Renewing delegation token {}", token);
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf, KeyProvider keyProvider = createKeyProvider(token, conf);
KeyProviderFactory.KEY_PROVIDER_PATH);
try { try {
if (!(keyProvider instanceof if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
@ -206,8 +211,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
@Override @Override
public void cancel(Token<?> token, Configuration conf) throws IOException { public void cancel(Token<?> token, Configuration conf) throws IOException {
LOG.debug("Canceling delegation token {}", token); LOG.debug("Canceling delegation token {}", token);
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf, KeyProvider keyProvider = createKeyProvider(token, conf);
KeyProviderFactory.KEY_PROVIDER_PATH);
try { try {
if (!(keyProvider instanceof if (!(keyProvider instanceof
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) { KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
@ -224,6 +228,19 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
} }
} }
} }
private static KeyProvider createKeyProvider(
Token<?> token, Configuration conf) throws IOException {
String service = token.getService().toString();
URI uri;
if (service != null && service.startsWith(SCHEME_NAME + ":/")) {
LOG.debug("Creating key provider with token service value {}", service);
uri = URI.create(service);
} else { // conf fallback
uri = KMSUtil.getKeyProviderUri(conf);
}
return (uri != null) ? KMSUtil.createKeyProviderFromUri(conf, uri) : null;
}
} }
public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion { public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
@ -285,12 +302,14 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
} }
hostsPart = t[0]; hostsPart = t[0];
} }
return createProvider(conf, origUrl, port, hostsPart); KMSClientProvider[] providers =
createProviders(conf, origUrl, port, hostsPart);
return new LoadBalancingKMSClientProvider(providerUri, providers, conf);
} }
return null; return null;
} }
private KeyProvider createProvider(Configuration conf, private KMSClientProvider[] createProviders(Configuration conf,
URL origUrl, int port, String hostsPart) throws IOException { URL origUrl, int port, String hostsPart) throws IOException {
String[] hosts = hostsPart.split(";"); String[] hosts = hostsPart.split(";");
KMSClientProvider[] providers = new KMSClientProvider[hosts.length]; KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
@ -304,7 +323,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
throw new IOException("Could not instantiate KMSProvider.", e); throw new IOException("Could not instantiate KMSProvider.", e);
} }
} }
return new LoadBalancingKMSClientProvider(providers, conf); return providers;
} }
} }
@ -360,13 +379,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
public KMSClientProvider(URI uri, Configuration conf) throws IOException { public KMSClientProvider(URI uri, Configuration conf) throws IOException {
super(conf); super(conf);
kmsUrl = createServiceURL(extractKMSPath(uri)); kmsUrl = createServiceURL(extractKMSPath(uri));
int kmsPort = kmsUrl.getPort(); // the token's service so it can be instantiated for renew/cancel.
if ((kmsPort == -1) && fallbackDefaultPortForTesting) { dtService = getDtService(uri);
kmsPort = 9600; // the canonical service is the alias for the token in the credentials.
} // typically it's the actual service in the token but older clients expect
// an address.
InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort); URI serviceUri = URI.create(kmsUrl.toString());
dtService = SecurityUtil.buildTokenService(addr); canonicalService = SecurityUtil.buildTokenService(serviceUri);
if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) { if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf); sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
@ -404,8 +423,22 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT), KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
new EncryptedQueueRefiller()); new EncryptedQueueRefiller());
authToken = new DelegationTokenAuthenticatedURL.Token(); authToken = new DelegationTokenAuthenticatedURL.Token();
LOG.debug("KMSClientProvider for KMS url: {} delegation token service: {}" + LOG.debug("KMSClientProvider created for KMS url: {} delegation token "
" created.", kmsUrl, dtService); + "service: {} canonical service: {}.", kmsUrl, dtService,
canonicalService);
}
protected static Text getDtService(URI uri) {
Text service;
// remove fragment for forward compatibility with logical naming.
final String fragment = uri.getFragment();
if (fragment != null) {
service = new Text(
uri.getScheme() + ":" + uri.getSchemeSpecificPart());
} else {
service = new Text(uri.toString());
}
return service;
} }
private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException { private static Path extractKMSPath(URI uri) throws MalformedURLException, IOException {
@ -477,7 +510,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
@Override @Override
public HttpURLConnection run() throws Exception { public HttpURLConnection run() throws Exception {
DelegationTokenAuthenticatedURL authUrl = DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator); createAuthenticatedURL();
return authUrl.openConnection(url, authToken, doAsUser); return authUrl.openConnection(url, authToken, doAsUser);
} }
}); });
@ -929,6 +962,96 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return encKeyVersionQueue.getSize(keyName); return encKeyVersionQueue.getSize(keyName);
} }
// note: this is only a crutch for backwards compatibility.
// override the instance that will be used to select a token, intended
// to allow load balancing provider to find a token issued by any of its
// sub-providers.
protected void setClientTokenProvider(
KeyProviderDelegationTokenExtension.DelegationTokenExtension provider) {
clientTokenProvider = provider;
}
@VisibleForTesting
DelegationTokenAuthenticatedURL createAuthenticatedURL() {
return new DelegationTokenAuthenticatedURL(configurator) {
@Override
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
selectDelegationToken(URL url, Credentials creds) {
if (LOG.isDebugEnabled()) {
LOG.debug("Looking for delegation token. creds: {}",
creds.getAllTokens());
}
// clientTokenProvider is either "this" or a load balancing instance.
// if the latter, it will first look for the load balancer's uri
// service followed by each sub-provider for backwards-compatibility.
return clientTokenProvider.selectDelegationToken(creds);
}
};
}
@InterfaceAudience.Private
@Override
public Token<?> selectDelegationToken(Credentials creds) {
Token<?> token = selectDelegationToken(creds, dtService);
if (token == null) {
token = selectDelegationToken(creds, canonicalService);
}
return token;
}
protected static Token<?> selectDelegationToken(Credentials creds,
Text service) {
Token<?> token = creds.getToken(service);
LOG.debug("selected by alias={} token={}", service, token);
if (token != null && TOKEN_KIND.equals(token.getKind())) {
return token;
}
token = TokenSelector.INSTANCE.selectToken(service, creds.getAllTokens());
LOG.debug("selected by service={} token={}", service, token);
return token;
}
@Override
public String getCanonicalServiceName() {
return canonicalService.toString();
}
@Override
public Token<?> getDelegationToken(final String renewer) throws IOException {
final URL url = createURL(null, null, null, null);
final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator);
Token<?> token = null;
try {
final String doAsUser = getDoAsUser();
token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override
public Token<?> run() throws Exception {
// Not using the cached token here.. Creating a new token here
// everytime.
LOG.debug("Getting new token from {}, renewer:{}", url, renewer);
return authUrl.getDelegationToken(url,
new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
}
});
if (token != null) {
token.setService(dtService);
LOG.info("New token created: ({})", token);
} else {
throw new IOException("Got NULL as delegation token");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException) e;
} else {
throw new IOException(e);
}
}
return token;
}
@Override @Override
public long renewDelegationToken(final Token<?> dToken) throws IOException { public long renewDelegationToken(final Token<?> dToken) throws IOException {
try { try {
@ -939,7 +1062,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
LOG.debug("Renewing delegation token {} with url:{}, as:{}", LOG.debug("Renewing delegation token {} with url:{}, as:{}",
token, url, doAsUser); token, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl = final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator); createAuthenticatedURL();
return getActualUgi().doAs( return getActualUgi().doAs(
new PrivilegedExceptionAction<Long>() { new PrivilegedExceptionAction<Long>() {
@Override @Override
@ -971,7 +1094,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
LOG.debug("Cancelling delegation token {} with url:{}, as:{}", LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
dToken, url, doAsUser); dToken, url, doAsUser);
final DelegationTokenAuthenticatedURL authUrl = final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator); createAuthenticatedURL();
authUrl.cancelDelegationToken(url, token, doAsUser); authUrl.cancelDelegationToken(url, token, doAsUser);
return null; return null;
} }
@ -1023,47 +1146,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
return token; return token;
} }
@Override
public Token<?>[] addDelegationTokens(final String renewer,
Credentials credentials) throws IOException {
Token<?>[] tokens = null;
Token<?> token = credentials.getToken(dtService);
if (token == null) {
final URL url = createURL(null, null, null, null);
final DelegationTokenAuthenticatedURL authUrl =
new DelegationTokenAuthenticatedURL(configurator);
try {
final String doAsUser = getDoAsUser();
token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override
public Token<?> run() throws Exception {
// Not using the cached token here.. Creating a new token here
// everytime.
LOG.debug("Getting new token from {}, renewer:{}", url, renewer);
return authUrl.getDelegationToken(url,
new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser);
}
});
if (token != null) {
LOG.debug("New token received: ({})", token);
credentials.addToken(token.getService(), token);
tokens = new Token<?>[] { token };
} else {
throw new IOException("Got NULL as delegation token");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
if (e instanceof IOException) {
throw (IOException) e;
} else {
throw new IOException(e);
}
}
}
return tokens;
}
private boolean containsKmsDt(UserGroupInformation ugi) throws IOException { private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
// Add existing credentials from the UGI, since provider is cached. // Add existing credentials from the UGI, since provider is cached.
Credentials creds = ugi.getCredentials(); Credentials creds = ugi.getCredentials();

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.crypto.key.kms;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.net.ConnectException; import java.net.ConnectException;
import java.net.URI;
import java.security.GeneralSecurityException; import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.Arrays; import java.util.Arrays;
@ -36,12 +37,15 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.CryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension; import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,19 +80,42 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
private final KMSClientProvider[] providers; private final KMSClientProvider[] providers;
private final AtomicInteger currentIdx; private final AtomicInteger currentIdx;
private final Text dtService; // service in token.
private final Text canonicalService; // credentials alias for token.
private RetryPolicy retryPolicy = null; private RetryPolicy retryPolicy = null;
public LoadBalancingKMSClientProvider(KMSClientProvider[] providers, public LoadBalancingKMSClientProvider(URI providerUri,
Configuration conf) { KMSClientProvider[] providers, Configuration conf) {
this(shuffle(providers), Time.monotonicNow(), conf); this(providerUri, providers, Time.monotonicNow(), conf);
} }
@VisibleForTesting @VisibleForTesting
LoadBalancingKMSClientProvider(KMSClientProvider[] providers, long seed, LoadBalancingKMSClientProvider(KMSClientProvider[] providers, long seed,
Configuration conf) { Configuration conf) {
this(URI.create("kms://testing"), providers, seed, conf);
}
private LoadBalancingKMSClientProvider(URI uri,
KMSClientProvider[] providers, long seed, Configuration conf) {
super(conf); super(conf);
this.providers = providers; // uri is the token service so it can be instantiated for renew/cancel.
dtService = KMSClientProvider.getDtService(uri);
// if provider not in conf, new client will alias on uri else addr.
if (KMSUtil.getKeyProviderUri(conf) == null) {
canonicalService = dtService;
} else {
// canonical service (credentials alias) will be the first underlying
// provider's service. must be deterministic before shuffle so multiple
// calls for a token do not obtain another unnecessary token.
canonicalService = new Text(providers[0].getCanonicalServiceName());
}
// shuffle unless seed is 0 which is used by tests for determinism.
this.providers = (seed != 0) ? shuffle(providers) : providers;
for (KMSClientProvider provider : providers) {
provider.setClientTokenProvider(this);
}
this.currentIdx = new AtomicInteger((int)(seed % providers.length)); this.currentIdx = new AtomicInteger((int)(seed % providers.length));
int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic. int maxNumRetries = conf.getInt(CommonConfigurationKeysPublic.
KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length); KMS_CLIENT_FAILOVER_MAX_RETRIES_KEY, providers.length);
@ -106,6 +133,9 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
this.retryPolicy = RetryPolicies.failoverOnNetworkException( this.retryPolicy = RetryPolicies.failoverOnNetworkException(
RetryPolicies.TRY_ONCE_THEN_FAIL, maxNumRetries, 0, sleepBaseMillis, RetryPolicies.TRY_ONCE_THEN_FAIL, maxNumRetries, 0, sleepBaseMillis,
sleepMaxMillis); sleepMaxMillis);
LOG.debug("Created LoadBalancingKMSClientProvider for KMS url: {} with {} "
+ "providers. delegation token service: {}, canonical service: {}",
uri, providers.length, dtService, canonicalService);
} }
@VisibleForTesting @VisibleForTesting
@ -113,6 +143,23 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
return providers; return providers;
} }
@Override
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
selectDelegationToken(Credentials creds) {
Token<? extends TokenIdentifier> token =
KMSClientProvider.selectDelegationToken(creds, canonicalService);
// fallback to querying each sub-provider.
if (token == null) {
for (KMSClientProvider provider : getProviders()) {
token = provider.selectDelegationToken(creds);
if (token != null) {
break;
}
}
}
return token;
}
private <T> T doOp(ProviderCallable<T> op, int currPos) private <T> T doOp(ProviderCallable<T> op, int currPos)
throws IOException { throws IOException {
if (providers.length == 0) { if (providers.length == 0) {
@ -193,13 +240,21 @@ public class LoadBalancingKMSClientProvider extends KeyProvider implements
} }
@Override @Override
public Token<?>[] public String getCanonicalServiceName() {
addDelegationTokens(final String renewer, final Credentials credentials) return canonicalService.toString();
throws IOException { }
return doOp(new ProviderCallable<Token<?>[]>() {
@Override
public Token<?> getDelegationToken(String renewer) throws IOException {
return doOp(new ProviderCallable<Token<?>>() {
@Override @Override
public Token<?>[] call(KMSClientProvider provider) throws IOException { public Token<?> call(KMSClientProvider provider) throws IOException {
return provider.addDelegationTokens(renewer, credentials); Token<?> token = provider.getDelegationToken(renewer);
// override sub-providers service with our own so it can be used
// across all providers.
token.setService(dtService);
LOG.debug("New token service set. Token: ({})", token);
return token;
} }
}, nextIdx()); }, nextIdx());
} }

View File

@ -57,13 +57,13 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsCreateModes; import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.ClassUtil; import org.apache.hadoop.util.ClassUtil;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
@ -120,7 +120,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
@SuppressWarnings("DeprecatedIsStillUsed") @SuppressWarnings("DeprecatedIsStillUsed")
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public abstract class FileSystem extends Configured implements Closeable { public abstract class FileSystem extends Configured
implements Closeable, DelegationTokenIssuer {
public static final String FS_DEFAULT_NAME_KEY = public static final String FS_DEFAULT_NAME_KEY =
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY; CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
public static final String DEFAULT_FS = public static final String DEFAULT_FS =
@ -385,6 +386,7 @@ public abstract class FileSystem extends Configured implements Closeable {
*/ */
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Evolving @InterfaceStability.Evolving
@Override
public String getCanonicalServiceName() { public String getCanonicalServiceName() {
return (getChildFileSystems() == null) return (getChildFileSystems() == null)
? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort()) ? SecurityUtil.buildDTServiceName(getUri(), getDefaultPort())
@ -599,71 +601,11 @@ public abstract class FileSystem extends Configured implements Closeable {
* @throws IOException on any problem obtaining a token * @throws IOException on any problem obtaining a token
*/ */
@InterfaceAudience.Private() @InterfaceAudience.Private()
@Override
public Token<?> getDelegationToken(String renewer) throws IOException { public Token<?> getDelegationToken(String renewer) throws IOException {
return null; return null;
} }
/**
* Obtain all delegation tokens used by this FileSystem that are not
* already present in the given Credentials. Existing tokens will neither
* be verified as valid nor having the given renewer. Missing tokens will
* be acquired and added to the given Credentials.
*
* Default Impl: works for simple FS with its own token
* and also for an embedded FS whose tokens are those of its
* child FileSystems (i.e. the embedded FS has no tokens of its own).
*
* @param renewer the user allowed to renew the delegation tokens
* @param credentials cache in which to add new delegation tokens
* @return list of new delegation tokens
* @throws IOException problems obtaining a token
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public Token<?>[] addDelegationTokens(
final String renewer, Credentials credentials) throws IOException {
if (credentials == null) {
credentials = new Credentials();
}
final List<Token<?>> tokens = new ArrayList<>();
collectDelegationTokens(renewer, credentials, tokens);
return tokens.toArray(new Token<?>[tokens.size()]);
}
/**
* Recursively obtain the tokens for this FileSystem and all descendant
* FileSystems as determined by {@link #getChildFileSystems()}.
* @param renewer the user allowed to renew the delegation tokens
* @param credentials cache in which to add the new delegation tokens
* @param tokens list in which to add acquired tokens
* @throws IOException problems obtaining a token
*/
private void collectDelegationTokens(final String renewer,
final Credentials credentials,
final List<Token<?>> tokens)
throws IOException {
final String serviceName = getCanonicalServiceName();
// Collect token of the this filesystem and then of its embedded children
if (serviceName != null) { // fs has token, grab it
final Text service = new Text(serviceName);
Token<?> token = credentials.getToken(service);
if (token == null) {
token = getDelegationToken(renewer);
if (token != null) {
tokens.add(token);
credentials.addToken(service, token);
}
}
}
// Now collect the tokens from the children
final FileSystem[] children = getChildFileSystems();
if (children != null) {
for (final FileSystem fs : children) {
fs.collectDelegationTokens(renewer, credentials, tokens);
}
}
}
/** /**
* Get all the immediate child FileSystems embedded in this FileSystem. * Get all the immediate child FileSystems embedded in this FileSystem.
* It does not recurse and get grand children. If a FileSystem * It does not recurse and get grand children. If a FileSystem
@ -679,6 +621,13 @@ public abstract class FileSystem extends Configured implements Closeable {
return null; return null;
} }
@InterfaceAudience.Private
@Override
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
throws IOException {
return getChildFileSystems();
}
/** /**
* Create a file with the provided permission. * Create a file with the provided permission.
* *

View File

@ -296,15 +296,11 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
Credentials creds = UserGroupInformation.getCurrentUser(). Credentials creds = UserGroupInformation.getCurrentUser().
getCredentials(); getCredentials();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Token not set, looking for delegation token. Creds:{}", LOG.debug("Token not set, looking for delegation token. Creds:{},"
creds.getAllTokens()); + " size:{}", creds.getAllTokens(), creds.numberOfTokens());
} }
if (!creds.getAllTokens().isEmpty()) { if (!creds.getAllTokens().isEmpty()) {
InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(), dToken = selectDelegationToken(url, creds);
url.getPort());
Text service = SecurityUtil.buildTokenService(serviceAddr);
dToken = creds.getToken(service);
LOG.debug("Using delegation token {} from service:{}", dToken, service);
if (dToken != null) { if (dToken != null) {
if (useQueryStringForDelegationToken()) { if (useQueryStringForDelegationToken()) {
// delegation token will go in the query string, injecting it // delegation token will go in the query string, injecting it
@ -340,6 +336,21 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
return conn; return conn;
} }
/**
* Select a delegation token from all tokens in credentials, based on url.
*/
@InterfaceAudience.Private
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
selectDelegationToken(URL url, Credentials creds) {
final InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
url.getPort());
final Text service = SecurityUtil.buildTokenService(serviceAddr);
org.apache.hadoop.security.token.Token<? extends TokenIdentifier> dToken =
creds.getToken(service);
LOG.debug("Using delegation token {} from service:{}", dToken, service);
return dToken;
}
/** /**
* Requests a delegation token using the configured <code>Authenticator</code> * Requests a delegation token using the configured <code>Authenticator</code>
* for authentication. * for authentication.

View File

@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.security.token.org.apache.hadoop.security.token;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Class for issuing delegation tokens.
*/
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce", "Yarn"})
@InterfaceStability.Unstable
public interface DelegationTokenIssuer {
/**
* The service name used as the alias for the token in the credential
* token map. addDelegationTokens will use this to determine if
* a token exists, and if not, add a new token with this alias.
*/
String getCanonicalServiceName();
/**
* Unconditionally get a new token with the optional renewer. Returning
* null indicates the service does not issue tokens.
*/
Token<?> getDelegationToken(String renewer) throws IOException;
/**
* Issuers may need tokens from additional services.
*/
default DelegationTokenIssuer[] getAdditionalTokenIssuers()
throws IOException {
return null;
}
/**
* Given a renewer, add delegation tokens for issuer and it's child issuers
* to the <code>Credentials</code> object if it is not already present.
*<p>
* Note: This method is not intended to be overridden. Issuers should
* implement getCanonicalService and getDelegationToken to ensure
* consistent token acquisition behavior.
*
* @param renewer the user allowed to renew the delegation tokens
* @param credentials cache in which to add new delegation tokens
* @return list of new delegation tokens
* @throws IOException thrown if IOException if an IO error occurs.
*/
default Token<?>[] addDelegationTokens(
final String renewer, Credentials credentials) throws IOException {
if (credentials == null) {
credentials = new Credentials();
}
final List<Token<?>> tokens = new ArrayList<>();
collectDelegationTokens(this, renewer, credentials, tokens);
return tokens.toArray(new Token<?>[tokens.size()]);
}
/**
* NEVER call this method directly.
*/
@InterfaceAudience.Private
static void collectDelegationTokens(
final DelegationTokenIssuer issuer,
final String renewer,
final Credentials credentials,
final List<Token<?>> tokens) throws IOException {
final String serviceName = issuer.getCanonicalServiceName();
// Collect token of the this issuer and then of its embedded children
if (serviceName != null) {
final Text service = new Text(serviceName);
Token<?> token = credentials.getToken(service);
if (token == null) {
token = issuer.getDelegationToken(renewer);
if (token != null) {
tokens.add(token);
credentials.addToken(service, token);
}
}
}
// Now collect the tokens from the children.
final DelegationTokenIssuer[] ancillary =
issuer.getAdditionalTokenIssuers();
if (ancillary != null) {
for (DelegationTokenIssuer subIssuer : ancillary) {
collectDelegationTokens(subIssuer, renewer, credentials, tokens);
}
}
}
}

View File

@ -59,12 +59,23 @@ public final class KMSUtil {
public static KeyProvider createKeyProvider(final Configuration conf, public static KeyProvider createKeyProvider(final Configuration conf,
final String configKeyName) throws IOException { final String configKeyName) throws IOException {
LOG.debug("Creating key provider with config key {}", configKeyName); LOG.debug("Creating key provider with config key {}", configKeyName);
URI uri = getKeyProviderUri(conf, configKeyName);
return (uri != null) ? createKeyProviderFromUri(conf, uri) : null;
}
public static URI getKeyProviderUri(final Configuration conf) {
return KMSUtil.getKeyProviderUri(
conf, KeyProviderFactory.KEY_PROVIDER_PATH);
}
public static URI getKeyProviderUri(final Configuration conf,
final String configKeyName) {
final String providerUriStr = conf.getTrimmed(configKeyName); final String providerUriStr = conf.getTrimmed(configKeyName);
// No provider set in conf // No provider set in conf
if (providerUriStr == null || providerUriStr.isEmpty()) { if (providerUriStr == null || providerUriStr.isEmpty()) {
return null; return null;
} }
return createKeyProviderFromUri(conf, URI.create(providerUriStr)); return URI.create(providerUriStr);
} }
public static KeyProvider createKeyProviderFromUri(final Configuration conf, public static KeyProvider createKeyProviderFromUri(final Configuration conf,

View File

@ -51,23 +51,27 @@ public class TestKeyProviderDelegationTokenExtension {
KeyProviderDelegationTokenExtension KeyProviderDelegationTokenExtension
.createKeyProviderDelegationTokenExtension(kp); .createKeyProviderDelegationTokenExtension(kp);
Assert.assertNotNull(kpDTE1); Assert.assertNotNull(kpDTE1);
// Default implementation should be a no-op and return null Token<?>[] tokens = kpDTE1.addDelegationTokens("user", credentials);
Assert.assertNull(kpDTE1.addDelegationTokens("user", credentials)); // Default implementation should return no tokens.
Assert.assertNotNull(tokens);
Assert.assertEquals(0, tokens.length);
MockKeyProvider mock = mock(MockKeyProvider.class); MockKeyProvider mock = mock(MockKeyProvider.class);
Mockito.when(mock.getConf()).thenReturn(new Configuration()); Mockito.when(mock.getConf()).thenReturn(new Configuration());
when(mock.addDelegationTokens("renewer", credentials)).thenReturn( when(mock.getCanonicalServiceName()).thenReturn("cservice");
new Token<?>[]{new Token(null, null, new Text("kind"), new Text( when(mock.getDelegationToken("renewer")).thenReturn(
"service"))} new Token(null, null, new Text("kind"), new Text(
"tservice"))
); );
KeyProviderDelegationTokenExtension kpDTE2 = KeyProviderDelegationTokenExtension kpDTE2 =
KeyProviderDelegationTokenExtension KeyProviderDelegationTokenExtension
.createKeyProviderDelegationTokenExtension(mock); .createKeyProviderDelegationTokenExtension(mock);
Token<?>[] tokens = tokens = kpDTE2.addDelegationTokens("renewer", credentials);
kpDTE2.addDelegationTokens("renewer", credentials);
Assert.assertNotNull(tokens); Assert.assertNotNull(tokens);
Assert.assertEquals(1, tokens.length);
Assert.assertEquals("kind", tokens[0].getKind().toString()); Assert.assertEquals("kind", tokens[0].getKind().toString());
Assert.assertEquals("tservice", tokens[0].getService().toString());
Assert.assertNotNull(credentials.getToken(new Text("cservice")));
} }
} }

View File

@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.crypto.key.kms;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Unit test for {@link KMSClientProvider} class.
*/
public class TestKMSClientProvider {
public static final Logger LOG =
LoggerFactory.getLogger(TestKMSClientProvider.class);
private final Token token = new Token();
private final Token oldToken = new Token();
private final String urlString = "https://host:16000/kms";
private final String providerUriString = "kms://https@host:16000/kms";
private final String oldTokenService = "host:16000";
@Rule
public Timeout globalTimeout = new Timeout(60000);
{
GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
}
@Before
public void setup() {
SecurityUtil.setTokenServiceUseIp(false);
token.setKind(TOKEN_KIND);
token.setService(new Text(providerUriString));
oldToken.setKind(TOKEN_KIND);
oldToken.setService(new Text(oldTokenService));
}
@Test
public void testSelectDelegationToken() throws Exception {
final Credentials creds = new Credentials();
creds.addToken(new Text(providerUriString), token);
assertNull(KMSClientProvider.selectDelegationToken(creds, null));
assertNull(KMSClientProvider
.selectDelegationToken(creds, new Text(oldTokenService)));
assertEquals(token, KMSClientProvider
.selectDelegationToken(creds, new Text(providerUriString)));
}
@Test
public void testSelectTokenOldService() throws Exception {
final Configuration conf = new Configuration();
final URI uri = new URI(providerUriString);
final KMSClientProvider kp = new KMSClientProvider(uri, conf);
try {
final Credentials creds = new Credentials();
creds.addToken(new Text(oldTokenService), oldToken);
final Token t = kp.selectDelegationToken(creds);
assertEquals(oldToken, t);
} finally {
kp.close();
}
}
@Test
public void testSelectTokenWhenBothExist() throws Exception {
final Credentials creds = new Credentials();
final Configuration conf = new Configuration();
final URI uri = new URI(providerUriString);
final KMSClientProvider kp = new KMSClientProvider(uri, conf);
try {
creds.addToken(token.getService(), token);
creds.addToken(oldToken.getService(), oldToken);
final Token t = kp.selectDelegationToken(creds);
assertEquals("new token should be selected when both exist", token, t);
} finally {
kp.close();
}
}
@Test
public void testURLSelectTokenUriFormat() throws Exception {
testURLSelectToken(token);
}
@Test
public void testURLSelectTokenIpPort() throws Exception {
testURLSelectToken(oldToken);
}
private void testURLSelectToken(final Token tok)
throws URISyntaxException, IOException {
final Configuration conf = new Configuration();
final URI uri = new URI(providerUriString);
final KMSClientProvider kp = new KMSClientProvider(uri, conf);
final DelegationTokenAuthenticatedURL url = kp.createAuthenticatedURL();
final Credentials creds = new Credentials();
creds.addToken(tok.getService(), tok);
final Token chosen = url.selectDelegationToken(new URL(urlString), creds);
assertEquals(tok, chosen);
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.crypto.key.kms;
import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import static org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
@ -46,7 +47,6 @@ import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.AuthorizationException;
import org.junit.After;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
@ -65,33 +65,27 @@ public class TestLoadBalancingKMSClientProvider {
SecurityUtil.setTokenServiceUseIp(false); SecurityUtil.setTokenServiceUseIp(false);
} }
@After
public void teardown() throws IOException {
KMSClientProvider.fallbackDefaultPortForTesting = false;
}
@Test @Test
public void testCreation() throws Exception { public void testCreation() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
KMSClientProvider.fallbackDefaultPortForTesting = true;
KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI( KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
"kms://http@host1/kms/foo"), conf); "kms://http@host1:9600/kms/foo"), conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider); assertTrue(kp instanceof LoadBalancingKMSClientProvider);
KMSClientProvider[] providers = KMSClientProvider[] providers =
((LoadBalancingKMSClientProvider) kp).getProviders(); ((LoadBalancingKMSClientProvider) kp).getProviders();
assertEquals(1, providers.length); assertEquals(1, providers.length);
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"), assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"),
Sets.newHashSet(providers[0].getKMSUrl())); Sets.newHashSet(providers[0].getKMSUrl()));
kp = new KMSClientProvider.Factory().createProvider(new URI( kp = new KMSClientProvider.Factory().createProvider(new URI(
"kms://http@host1;host2;host3/kms/foo"), conf); "kms://http@host1;host2;host3:9600/kms/foo"), conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider); assertTrue(kp instanceof LoadBalancingKMSClientProvider);
providers = providers =
((LoadBalancingKMSClientProvider) kp).getProviders(); ((LoadBalancingKMSClientProvider) kp).getProviders();
assertEquals(3, providers.length); assertEquals(3, providers.length);
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/", assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/",
"http://host2/kms/foo/v1/", "http://host2:9600/kms/foo/v1/",
"http://host3/kms/foo/v1/"), "http://host3:9600/kms/foo/v1/"),
Sets.newHashSet(providers[0].getKMSUrl(), Sets.newHashSet(providers[0].getKMSUrl(),
providers[1].getKMSUrl(), providers[1].getKMSUrl(),
providers[2].getKMSUrl())); providers[2].getKMSUrl()));
@ -254,10 +248,9 @@ public class TestLoadBalancingKMSClientProvider {
@Test @Test
public void testClassCastException() throws Exception { public void testClassCastException() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
KMSClientProvider.fallbackDefaultPortForTesting = true;
KMSClientProvider p1 = new MyKMSClientProvider( KMSClientProvider p1 = new MyKMSClientProvider(
new URI("kms://http@host1/kms/foo"), conf); new URI("kms://http@host1:9600/kms/foo"), conf);
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider( LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
new KMSClientProvider[] {p1}, 0, conf); new KMSClientProvider[] {p1}, 0, conf);
try { try {
kp.generateEncryptedKey("foo"); kp.generateEncryptedKey("foo");
@ -717,4 +710,42 @@ public class TestLoadBalancingKMSClientProvider {
verify(p2, Mockito.times(1)).createKey(Mockito.eq(keyName), verify(p2, Mockito.times(1)).createKey(Mockito.eq(keyName),
Mockito.any(Options.class)); Mockito.any(Options.class));
} }
@Test
public void testTokenServiceCreationWithLegacyFormat() throws Exception {
Configuration conf = new Configuration();
// Create keyprovider with old token format (ip:port)
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
"kms:/something");
String authority = "host1:9600";
URI kmsUri = URI.create("kms://http@" + authority + "/kms/foo");
KeyProvider kp =
new KMSClientProvider.Factory().createProvider(kmsUri, conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
LoadBalancingKMSClientProvider lbkp = (LoadBalancingKMSClientProvider) kp;
assertEquals(1, lbkp.getProviders().length);
assertEquals(authority, lbkp.getCanonicalServiceName());
for (KMSClientProvider provider : lbkp.getProviders()) {
assertEquals(authority, provider.getCanonicalServiceName());
}
}
@Test
public void testTokenServiceCreationWithUriFormat() throws Exception {
final Configuration conf = new Configuration();
final URI kmsUri = URI.create("kms://http@host1;host2;host3:9600/kms/foo");
final KeyProvider kp =
new KMSClientProvider.Factory().createProvider(kmsUri, conf);
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
final LoadBalancingKMSClientProvider lbkp =
(LoadBalancingKMSClientProvider) kp;
assertEquals(kmsUri.toString(), lbkp.getCanonicalServiceName());
KMSClientProvider[] providers = lbkp.getProviders();
assertEquals(3, providers.length);
for (int i = 0; i < providers.length; i++) {
assertEquals(URI.create(providers[i].getKMSUrl()).getAuthority(),
providers[i].getCanonicalServiceName());
assertNotEquals(kmsUri, providers[i].getCanonicalServiceName());
}
}
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
@ -124,6 +125,8 @@ public class TestFilterFileSystem {
public int getDefaultPort(); public int getDefaultPort();
public String getCanonicalServiceName(); public String getCanonicalServiceName();
public Token<?> getDelegationToken(String renewer) throws IOException; public Token<?> getDelegationToken(String renewer) throws IOException;
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
throws IOException;
public boolean deleteOnExit(Path f) throws IOException; public boolean deleteOnExit(Path f) throws IOException;
public boolean cancelDeleteOnExit(Path f) throws IOException; public boolean cancelDeleteOnExit(Path f) throws IOException;
public Token<?>[] addDelegationTokens(String renewer, Credentials creds) public Token<?>[] addDelegationTokens(String renewer, Credentials creds)

View File

@ -25,6 +25,7 @@ import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -144,6 +145,8 @@ public class TestHarFileSystem {
public int getDefaultPort(); public int getDefaultPort();
public String getCanonicalServiceName(); public String getCanonicalServiceName();
public Token<?> getDelegationToken(String renewer) throws IOException; public Token<?> getDelegationToken(String renewer) throws IOException;
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
throws IOException;
public FileChecksum getFileChecksum(Path f) throws IOException; public FileChecksum getFileChecksum(Path f) throws IOException;
public boolean deleteOnExit(Path f) throws IOException; public boolean deleteOnExit(Path f) throws IOException;
public boolean cancelDeleteOnExit(Path f) throws IOException; public boolean cancelDeleteOnExit(Path f) throws IOException;

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
import org.apache.hadoop.crypto.key.kms.ValueQueue; import org.apache.hadoop.crypto.key.kms.ValueQueue;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.minikdc.MiniKdc; import org.apache.hadoop.minikdc.MiniKdc;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
@ -44,6 +45,7 @@ import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil; import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -96,6 +98,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
@ -141,21 +145,78 @@ public class TestKMS {
} }
public static abstract class KMSCallable<T> implements Callable<T> { public static abstract class KMSCallable<T> implements Callable<T> {
private URL kmsUrl; private List<URL> kmsUrl;
protected URL getKMSUrl() { protected URL getKMSUrl() {
return kmsUrl; return kmsUrl.get(0);
}
protected URL[] getKMSHAUrl() {
URL[] urls = new URL[kmsUrl.size()];
return kmsUrl.toArray(urls);
}
protected void addKMSUrl(URL url) {
if (kmsUrl == null) {
kmsUrl = new ArrayList<URL>();
}
kmsUrl.add(url);
}
/*
* The format of the returned value will be
* kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
*/
protected String generateLoadBalancingKeyProviderUriString() {
if (kmsUrl == null || kmsUrl.size() == 0) {
return null;
}
StringBuffer sb = new StringBuffer();
for (int i = 0; i < kmsUrl.size(); i++) {
sb.append(KMSClientProvider.SCHEME_NAME + "://" +
kmsUrl.get(0).getProtocol() + "@");
URL url = kmsUrl.get(i);
sb.append(url.getAuthority());
if (url.getPath() != null) {
sb.append(url.getPath());
}
if (i < kmsUrl.size() - 1) {
sb.append(",");
}
}
return sb.toString();
} }
} }
protected KeyProvider createProvider(URI uri, Configuration conf) protected KeyProvider createProvider(URI uri, Configuration conf)
throws IOException { throws IOException {
final KeyProvider ret = new LoadBalancingKMSClientProvider( final KeyProvider ret = new LoadBalancingKMSClientProvider(uri,
new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf); new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf);
providersCreated.add(ret); providersCreated.add(ret);
return ret; return ret;
} }
/**
* create a LoadBalancingKMSClientProvider from an array of URIs.
* @param uris an array of KMS URIs
* @param conf configuration object
* @return a LoadBalancingKMSClientProvider object
* @throws IOException
*/
protected LoadBalancingKMSClientProvider createHAProvider(URI lbUri,
URI[] uris, Configuration conf) throws IOException {
KMSClientProvider[] providers = new KMSClientProvider[uris.length];
for (int i = 0; i < providers.length; i++) {
providers[i] =
new KMSClientProvider(uris[i], conf);
}
final LoadBalancingKMSClientProvider ret =
new LoadBalancingKMSClientProvider(lbUri, providers, conf);
providersCreated.add(ret);
return ret;
}
private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf) private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf)
throws IOException { throws IOException {
final KMSClientProvider ret = new KMSClientProvider(uri, conf); final KMSClientProvider ret = new KMSClientProvider(uri, conf);
@ -170,22 +231,34 @@ public class TestKMS {
protected <T> T runServer(int port, String keystore, String password, File confDir, protected <T> T runServer(int port, String keystore, String password, File confDir,
KMSCallable<T> callable) throws Exception { KMSCallable<T> callable) throws Exception {
return runServer(new int[] {port}, keystore, password, confDir, callable);
}
protected <T> T runServer(int[] ports, String keystore, String password,
File confDir, KMSCallable<T> callable) throws Exception {
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir) MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
.setLog4jConfFile("log4j.properties"); .setLog4jConfFile("log4j.properties");
if (keystore != null) { if (keystore != null) {
miniKMSBuilder.setSslConf(new File(keystore), password); miniKMSBuilder.setSslConf(new File(keystore), password);
} }
if (port > 0) { final List<MiniKMS> kmsList = new ArrayList<>();
miniKMSBuilder.setPort(port);
for (int i = 0; i < ports.length; i++) {
if (ports[i] > 0) {
miniKMSBuilder.setPort(ports[i]);
}
MiniKMS miniKMS = miniKMSBuilder.build();
kmsList.add(miniKMS);
miniKMS.start();
LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
callable.addKMSUrl(miniKMS.getKMSUrl());
} }
MiniKMS miniKMS = miniKMSBuilder.build();
miniKMS.start();
try { try {
System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
callable.kmsUrl = miniKMS.getKMSUrl();
return callable.call(); return callable.call();
} finally { } finally {
miniKMS.stop(); for (MiniKMS miniKMS: kmsList) {
miniKMS.stop();
}
} }
} }
@ -240,6 +313,13 @@ public class TestKMS {
return new URI("kms://" + str); return new URI("kms://" + str);
} }
public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
URI[] uris = new URI[kmsUrls.length];
for (int i=0; i< kmsUrls.length; i++) {
uris[i] = createKMSUri(kmsUrls[i]);
}
return uris;
}
private static class KerberosConfiguration private static class KerberosConfiguration
extends javax.security.auth.login.Configuration { extends javax.security.auth.login.Configuration {
@ -306,6 +386,7 @@ public class TestKMS {
principals.add("otheradmin"); principals.add("otheradmin");
principals.add("client/host"); principals.add("client/host");
principals.add("client1"); principals.add("client1");
principals.add("foo");
for (KMSACLs.Type type : KMSACLs.Type.values()) { for (KMSACLs.Type type : KMSACLs.Type.values()) {
principals.add(type.toString()); principals.add(type.toString());
} }
@ -2011,7 +2092,6 @@ public class TestKMS {
return null; return null;
} }
}); });
nonKerberosUgi.addCredentials(credentials); nonKerberosUgi.addCredentials(credentials);
try { try {
@ -2067,6 +2147,18 @@ public class TestKMS {
testDelegationTokensOps(true, true); testDelegationTokensOps(true, true);
} }
private Text getTokenService(KeyProvider provider) {
assertTrue("KeyProvider should be an instance of " +
"LoadBalancingKMSClientProvider", (provider instanceof
LoadBalancingKMSClientProvider));
assertEquals("Num client providers should be 1", 1,
((LoadBalancingKMSClientProvider)provider).getProviders().length);
final Text tokenService = new Text(
(((LoadBalancingKMSClientProvider)provider).getProviders()[0])
.getCanonicalServiceName());
return tokenService;
}
private void testDelegationTokensOps(final boolean ssl, final boolean kerb) private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
throws Exception { throws Exception {
final File confDir = getTestDir(); final File confDir = getTestDir();
@ -2103,6 +2195,10 @@ public class TestKMS {
@Override @Override
public Void run() throws Exception { public Void run() throws Exception {
KeyProvider kp = createProvider(uri, clientConf); KeyProvider kp = createProvider(uri, clientConf);
// Unset the conf value for key provider path just to be sure that
// the key provider created for renew and cancel token is from
// token service field.
clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
// test delegation token retrieval // test delegation token retrieval
KeyProviderDelegationTokenExtension kpdte = KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension. KeyProviderDelegationTokenExtension.
@ -2110,13 +2206,10 @@ public class TestKMS {
final Credentials credentials = new Credentials(); final Credentials credentials = new Credentials();
final Token<?>[] tokens = final Token<?>[] tokens =
kpdte.addDelegationTokens("client1", credentials); kpdte.addDelegationTokens("client1", credentials);
Text tokenService = getTokenService(kp);
Assert.assertEquals(1, credentials.getAllTokens().size()); Assert.assertEquals(1, credentials.getAllTokens().size());
InetSocketAddress kmsAddr =
new InetSocketAddress(getKMSUrl().getHost(),
getKMSUrl().getPort());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)). credentials.getToken(tokenService).getKind());
getKind());
// Test non-renewer user cannot renew. // Test non-renewer user cannot renew.
for (Token<?> token : tokens) { for (Token<?> token : tokens) {
@ -2258,15 +2351,16 @@ public class TestKMS {
// Get a DT and use it. // Get a DT and use it.
final Credentials credentials = new Credentials(); final Credentials credentials = new Credentials();
kpdte.addDelegationTokens("client", credentials); kpdte.addDelegationTokens("client", credentials);
Text tokenService = getTokenService(kp);
Assert.assertEquals(1, credentials.getAllTokens().size()); Assert.assertEquals(1, credentials.getAllTokens().size());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials. Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind()); getToken(tokenService).getKind());
UserGroupInformation.getCurrentUser().addCredentials(credentials); UserGroupInformation.getCurrentUser().addCredentials(credentials);
LOG.info("Added kms dt to credentials: {}", UserGroupInformation. LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
getCurrentUser().getCredentials().getAllTokens()); getCurrentUser().getCredentials().getAllTokens());
Token<?> token = Token<?> token =
UserGroupInformation.getCurrentUser().getCredentials() UserGroupInformation.getCurrentUser().getCredentials()
.getToken(SecurityUtil.buildTokenService(kmsAddr)); .getToken(tokenService);
Assert.assertNotNull(token); Assert.assertNotNull(token);
job1Token.add(token); job1Token.add(token);
@ -2302,17 +2396,17 @@ public class TestKMS {
// Get a new DT, but don't use it yet. // Get a new DT, but don't use it yet.
final Credentials newCreds = new Credentials(); final Credentials newCreds = new Credentials();
kpdte.addDelegationTokens("client", newCreds); kpdte.addDelegationTokens("client", newCreds);
Text tokenService = getTokenService(kp);
Assert.assertEquals(1, newCreds.getAllTokens().size()); Assert.assertEquals(1, newCreds.getAllTokens().size());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)). newCreds.getToken(tokenService).
getKind()); getKind());
// Using job 1's DT should fail. // Using job 1's DT should fail.
final Credentials oldCreds = new Credentials(); final Credentials oldCreds = new Credentials();
for (Token<?> token : job1Token) { for (Token<?> token : job1Token) {
if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) { if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
oldCreds oldCreds.addToken(tokenService, token);
.addToken(SecurityUtil.buildTokenService(kmsAddr), token);
} }
} }
UserGroupInformation.getCurrentUser().addCredentials(oldCreds); UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
@ -2328,7 +2422,7 @@ public class TestKMS {
// Using the new DT should succeed. // Using the new DT should succeed.
Assert.assertEquals(1, newCreds.getAllTokens().size()); Assert.assertEquals(1, newCreds.getAllTokens().size());
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)). newCreds.getToken(tokenService).
getKind()); getKind());
UserGroupInformation.getCurrentUser().addCredentials(newCreds); UserGroupInformation.getCurrentUser().addCredentials(newCreds);
LOG.info("Credetials now are: {}", UserGroupInformation LOG.info("Credetials now are: {}", UserGroupInformation
@ -2357,7 +2451,14 @@ public class TestKMS {
doKMSWithZK(true, true); doKMSWithZK(true, true);
} }
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception { private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
KMSCallable<T> callable) throws Exception {
return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
}
private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
KMSCallable<T> callable, int kmsSize) throws Exception {
TestingServer zkServer = null; TestingServer zkServer = null;
try { try {
zkServer = new TestingServer(); zkServer = new TestingServer();
@ -2403,43 +2504,189 @@ public class TestKMS {
writeConf(testDir, conf); writeConf(testDir, conf);
KMSCallable<KeyProvider> c = int[] ports = new int[kmsSize];
new KMSCallable<KeyProvider>() { for (int i = 0; i < ports.length; i++) {
@Override ports[i] = -1;
public KeyProvider call() throws Exception { }
final Configuration conf = new Configuration(); return runServer(ports, null, null, testDir, callable);
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
final URI uri = createKMSUri(getKMSUrl());
final KeyProvider kp =
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<KeyProvider>() {
@Override
public KeyProvider run() throws Exception {
KeyProvider kp = createProvider(uri, conf);
kp.createKey("k1", new byte[16],
new KeyProvider.Options(conf));
kp.createKey("k2", new byte[16],
new KeyProvider.Options(conf));
kp.createKey("k3", new byte[16],
new KeyProvider.Options(conf));
return kp;
}
});
return kp;
}
};
runServer(null, null, testDir, c);
} finally { } finally {
if (zkServer != null) { if (zkServer != null) {
zkServer.stop(); zkServer.stop();
zkServer.close(); zkServer.close();
} }
} }
} }
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
KMSCallable<KeyProvider> c =
new KMSCallable<KeyProvider>() {
@Override
public KeyProvider call() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
final URI uri = createKMSUri(getKMSUrl());
final KeyProvider kp =
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<KeyProvider>() {
@Override
public KeyProvider run() throws Exception {
KeyProvider kp = createProvider(uri, conf);
kp.createKey("k1", new byte[16],
new KeyProvider.Options(conf));
kp.createKey("k2", new byte[16],
new KeyProvider.Options(conf));
kp.createKey("k3", new byte[16],
new KeyProvider.Options(conf));
return kp;
}
});
return kp;
}
};
runServerWithZooKeeper(zkDTSM, zkSigner, c);
}
@Test
public void testKMSHAZooKeeperDelegationToken() throws Exception {
final int kmsSize = 2;
doKMSWithZKWithDelegationToken(true, true, kmsSize);
}
private void doKMSWithZKWithDelegationToken(boolean zkDTSM, boolean zkSigner,
int kmsSize) throws Exception {
// Create a KMSCallable to execute requests after ZooKeeper and KMS are up.
KMSCallable<Void> c = new KMSCallable<Void>() {
@Override
public Void call() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
final URI[] uris = createKMSHAUri(getKMSHAUrl());
final Credentials credentials = new Credentials();
// Create a UGI without Kerberos auth. It will authenticate with tokens.
final UserGroupInformation nonKerberosUgi =
UserGroupInformation.getCurrentUser();
final String lbUri = generateLoadBalancingKeyProviderUriString();
final LoadBalancingKMSClientProvider lbkp =
createHAProvider(URI.create(lbUri), uris, conf);
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
// get delegation tokens using kerberos login
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(lbkp);
kpdte.addDelegationTokens("foo", credentials);
return null;
}
});
nonKerberosUgi.addCredentials(credentials);
// Access KMS using delegation token for authentication, no Kerberos.
nonKerberosUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
// Create a kms client with one provider at a time. Must use one
// provider so that if it fails to authenticate, it does not fall
// back to the next KMS instance.
// Should succeed because it has delegation tokens for any instance.
int i = 0;
for (KMSClientProvider provider : lbkp.getProviders()) {
final String key = "k" + i++;
LOG.info("Connect to {} to create key {}.", provider, key);
provider.createKey(key, new KeyProvider.Options(conf));
}
return null;
}
});
final Collection<Token<? extends TokenIdentifier>> tokens =
credentials.getAllTokens();
doAs("foo", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
assertEquals(1, tokens.size());
Token token = tokens.iterator().next();
assertEquals(KMSDelegationToken.TOKEN_KIND, token.getKind());
LOG.info("Got dt for token: {}", token);
final long tokenLife = token.renew(conf);
LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
Thread.sleep(10);
final long newTokenLife = token.renew(conf);
LOG.info("Renewed token {}, new lifetime:{}", token, newTokenLife);
assertTrue(newTokenLife > tokenLife);
// test delegation token cancellation
LOG.info("Got dt for token: {}", token);
token.cancel(conf);
LOG.info("Cancelled token {}", token);
try {
token.renew(conf);
fail("should not be able to renew a canceled token");
} catch (Exception e) {
LOG.info("Expected exception when renewing token", e);
}
return null;
}
});
final Credentials newCredentials = new Credentials();
doAs("SET_KEY_MATERIAL",
new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KeyProviderDelegationTokenExtension kpdte =
KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(lbkp);
kpdte.addDelegationTokens("foo", newCredentials);
return null;
}
});
doAs("foo", new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
KMSClientProvider kp1 = lbkp.getProviders()[0];
URL[] urls = getKMSHAUrl();
final Collection<Token<? extends TokenIdentifier>> tokens =
newCredentials.getAllTokens();
assertEquals(1, tokens.size());
Token token = tokens.iterator().next();
assertEquals(KMSDelegationToken.TOKEN_KIND,
token.getKind());
// Testing backward compatibility of token renewal and cancellation.
// Set the token service to ip:port format and test to renew/cancel.
Text text = SecurityUtil.buildTokenService(
new InetSocketAddress(urls[0].getHost(), urls[0].getPort()));
token.setService(text);
conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
long tokenLife = 0L;
for (KMSClientProvider kp : lbkp.getProviders()) {
long renewedTokenLife = token.renew(conf);
LOG.info("Renewed token of kind {}, new lifetime:{}",
token.getKind(), renewedTokenLife);
assertTrue(renewedTokenLife > tokenLife);
tokenLife = renewedTokenLife;
Thread.sleep(10);
}
token.cancel(conf);
try {
token.renew(conf);
fail("should not be able to renew a canceled token");
} catch (IOException e) {
LOG.info("Expected exception when renewing token", e);
}
return null;
}
});
return null;
}
};
runServerWithZooKeeper(zkDTSM, zkSigner, c, kmsSize);
}
@Test @Test
public void testProxyUserKerb() throws Exception { public void testProxyUserKerb() throws Exception {

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.crypto.CryptoOutputStream; import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
@ -199,7 +200,7 @@ import com.google.common.net.InetAddresses;
********************************************************/ ********************************************************/
@InterfaceAudience.Private @InterfaceAudience.Private
public class DFSClient implements java.io.Closeable, RemotePeerFactory, public class DFSClient implements java.io.Closeable, RemotePeerFactory,
DataEncryptionKeyFactory { DataEncryptionKeyFactory, KeyProviderTokenIssuer {
public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class); public static final Logger LOG = LoggerFactory.getLogger(DFSClient.class);
private final Configuration conf; private final Configuration conf;
@ -678,6 +679,11 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return (dtService != null) ? dtService.toString() : null; return (dtService != null) ? dtService.toString() : null;
} }
@Override
public Token<?>getDelegationToken(String renewer) throws IOException {
return getDelegationToken(renewer == null ? null : new Text(renewer));
}
/** /**
* @see ClientProtocol#getDelegationToken(Text) * @see ClientProtocol#getDelegationToken(Text)
*/ */
@ -2907,7 +2913,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
return HEDGED_READ_METRIC; return HEDGED_READ_METRIC;
} }
URI getKeyProviderUri() throws IOException { @Override
public URI getKeyProviderUri() throws IOException {
return HdfsKMSUtil.getKeyProviderUri(ugi, namenodeUri, return HdfsKMSUtil.getKeyProviderUri(ugi, namenodeUri,
getServerDefaults().getKeyProviderUri(), conf); getServerDefaults().getKeyProviderUri(), conf);
} }

View File

@ -94,8 +94,8 @@ import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
@ -2601,11 +2601,13 @@ public class DistributedFileSystem extends FileSystem
} }
@Override @Override
public Token<?>[] addDelegationTokens( public DelegationTokenIssuer[] getAdditionalTokenIssuers()
final String renewer, Credentials credentials) throws IOException { throws IOException {
Token<?>[] tokens = super.addDelegationTokens(renewer, credentials); KeyProvider keyProvider = getKeyProvider();
return HdfsKMSUtil.addDelegationTokensForKeyProvider( if (keyProvider instanceof DelegationTokenIssuer) {
this, renewer, credentials, uri, tokens); return new DelegationTokenIssuer[]{(DelegationTokenIssuer)keyProvider};
}
return null;
} }
public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { public DFSInotifyEventInputStream getInotifyEventStream() throws IOException {

View File

@ -35,14 +35,12 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion; import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion; import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer; import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.KMSUtil; import org.apache.hadoop.util.KMSUtil;
/** /**
@ -71,32 +69,6 @@ public final class HdfsKMSUtil {
return KMSUtil.createKeyProvider(conf, keyProviderUriKeyName); return KMSUtil.createKeyProvider(conf, keyProviderUriKeyName);
} }
public static Token<?>[] addDelegationTokensForKeyProvider(
KeyProviderTokenIssuer kpTokenIssuer, final String renewer,
Credentials credentials, URI namenodeUri, Token<?>[] tokens)
throws IOException {
KeyProvider keyProvider = kpTokenIssuer.getKeyProvider();
if (keyProvider != null) {
KeyProviderDelegationTokenExtension keyProviderDelegationTokenExtension
= KeyProviderDelegationTokenExtension.
createKeyProviderDelegationTokenExtension(keyProvider);
Token<?>[] kpTokens = keyProviderDelegationTokenExtension.
addDelegationTokens(renewer, credentials);
credentials.addSecretKey(getKeyProviderMapKey(namenodeUri),
DFSUtilClient.string2Bytes(
kpTokenIssuer.getKeyProviderUri().toString()));
if (tokens != null && kpTokens != null) {
Token<?>[] all = new Token<?>[tokens.length + kpTokens.length];
System.arraycopy(tokens, 0, all, 0, tokens.length);
System.arraycopy(kpTokens, 0, all, tokens.length, kpTokens.length);
tokens = all;
} else {
tokens = (tokens != null) ? tokens : kpTokens;
}
}
return tokens;
}
/** /**
* Obtain the crypto protocol version from the provided FileEncryptionInfo, * Obtain the crypto protocol version from the provided FileEncryptionInfo,
* checking to see if this version is supported by. * checking to see if this version is supported by.
@ -161,30 +133,38 @@ public final class HdfsKMSUtil {
URI keyProviderUri = null; URI keyProviderUri = null;
// Lookup the secret in credentials object for namenodeuri. // Lookup the secret in credentials object for namenodeuri.
Credentials credentials = ugi.getCredentials(); Credentials credentials = ugi.getCredentials();
Text credsKey = getKeyProviderMapKey(namenodeUri);
byte[] keyProviderUriBytes = byte[] keyProviderUriBytes =
credentials.getSecretKey(getKeyProviderMapKey(namenodeUri)); credentials.getSecretKey(credsKey);
if(keyProviderUriBytes != null) { if(keyProviderUriBytes != null) {
keyProviderUri = keyProviderUri =
URI.create(DFSUtilClient.bytes2String(keyProviderUriBytes)); URI.create(DFSUtilClient.bytes2String(keyProviderUriBytes));
return keyProviderUri;
} }
if (keyProviderUri == null) {
if (keyProviderUriStr != null) { // NN is old and doesn't report provider, so use conf.
if (!keyProviderUriStr.isEmpty()) { if (keyProviderUriStr == null) {
keyProviderUri = KMSUtil.getKeyProviderUri(conf, keyProviderUriKeyName);
} else if (!keyProviderUriStr.isEmpty()) {
keyProviderUri = URI.create(keyProviderUriStr); keyProviderUri = URI.create(keyProviderUriStr);
} }
return keyProviderUri; if (keyProviderUri != null) {
} credentials.addSecretKey(
credsKey, DFSUtilClient.string2Bytes(keyProviderUri.toString()));
// Last thing is to trust its own conf to be backwards compatible. }
String keyProviderUriFromConf = conf.getTrimmed(
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
if (keyProviderUriFromConf != null && !keyProviderUriFromConf.isEmpty()) {
keyProviderUri = URI.create(keyProviderUriFromConf);
} }
return keyProviderUri; return keyProviderUri;
} }
public static KeyProvider getKeyProvider(KeyProviderTokenIssuer issuer,
Configuration conf)
throws IOException {
URI keyProviderUri = issuer.getKeyProviderUri();
if (keyProviderUri != null) {
return KMSUtil.createKeyProviderFromUri(conf, keyProviderUri);
}
return null;
}
/** /**
* Returns a key to map namenode uri to key provider uri. * Returns a key to map namenode uri to key provider uri.
* Tasks will lookup this key to find key Provider. * Tasks will lookup this key to find key Provider.

View File

@ -111,7 +111,6 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@ -119,6 +118,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.security.token.org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.KMSUtil; import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -1642,6 +1642,16 @@ public class WebHdfsFileSystem extends FileSystem
return token; return token;
} }
@Override
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
throws IOException {
KeyProvider keyProvider = getKeyProvider();
if (keyProvider instanceof DelegationTokenIssuer) {
return new DelegationTokenIssuer[] {(DelegationTokenIssuer) keyProvider};
}
return null;
}
@Override @Override
public synchronized Token<?> getRenewToken() { public synchronized Token<?> getRenewToken() {
return delegationToken; return delegationToken;
@ -1677,14 +1687,6 @@ public class WebHdfsFileSystem extends FileSystem
).run(); ).run();
} }
@Override
public Token<?>[] addDelegationTokens(String renewer,
Credentials credentials) throws IOException {
Token<?>[] tokens = super.addDelegationTokens(renewer, credentials);
return HdfsKMSUtil.addDelegationTokensForKeyProvider(this, renewer,
credentials, getUri(), tokens);
}
public BlockLocation[] getFileBlockLocations(final FileStatus status, public BlockLocation[] getFileBlockLocations(final FileStatus status,
final long offset, final long length) throws IOException { final long offset, final long length) throws IOException {
if (status == null) { if (status == null) {