HADOOP-14445. Delegation tokens are not shared between KMS instances. Contributed by Xiao Chen and Rushabh S Shah.
(cherry picked from commit 583fa6ed48
)
This commit is contained in:
parent
96af1af28f
commit
6d6f65f224
|
@ -36,8 +36,9 @@ import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.security.token.TokenRenewer;
|
import org.apache.hadoop.security.token.TokenSelector;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
|
||||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||||
import org.apache.hadoop.util.HttpExceptionUtils;
|
import org.apache.hadoop.util.HttpExceptionUtils;
|
||||||
import org.apache.hadoop.util.KMSUtil;
|
import org.apache.hadoop.util.KMSUtil;
|
||||||
|
@ -82,6 +83,8 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Strings;
|
import com.google.common.base.Strings;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT;
|
||||||
import static org.apache.hadoop.util.KMSUtil.checkNotEmpty;
|
import static org.apache.hadoop.util.KMSUtil.checkNotEmpty;
|
||||||
import static org.apache.hadoop.util.KMSUtil.checkNotNull;
|
import static org.apache.hadoop.util.KMSUtil.checkNotNull;
|
||||||
import static org.apache.hadoop.util.KMSUtil.parseJSONEncKeyVersion;
|
import static org.apache.hadoop.util.KMSUtil.parseJSONEncKeyVersion;
|
||||||
|
@ -96,16 +99,13 @@ import static org.apache.hadoop.util.KMSUtil.parseJSONMetadata;
|
||||||
public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension {
|
KeyProviderDelegationTokenExtension.DelegationTokenExtension {
|
||||||
|
|
||||||
private static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(KMSClientProvider.class);
|
LoggerFactory.getLogger(KMSClientProvider.class);
|
||||||
|
|
||||||
private static final String INVALID_SIGNATURE = "Invalid signature";
|
private static final String INVALID_SIGNATURE = "Invalid signature";
|
||||||
|
|
||||||
private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed";
|
private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed";
|
||||||
|
|
||||||
public static final String TOKEN_KIND_STR = KMSDelegationToken.TOKEN_KIND_STR;
|
|
||||||
public static final Text TOKEN_KIND = KMSDelegationToken.TOKEN_KIND;
|
|
||||||
|
|
||||||
public static final String SCHEME_NAME = "kms";
|
public static final String SCHEME_NAME = "kms";
|
||||||
|
|
||||||
private static final String UTF8 = "UTF-8";
|
private static final String UTF8 = "UTF-8";
|
||||||
|
@ -133,12 +133,17 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
private static final ObjectWriter WRITER =
|
private static final ObjectWriter WRITER =
|
||||||
new ObjectMapper().writerWithDefaultPrettyPrinter();
|
new ObjectMapper().writerWithDefaultPrettyPrinter();
|
||||||
|
|
||||||
|
/* dtService defines the token service value for the kms token.
|
||||||
|
* The value can be legacy format which is ip:port format or it can be uri.
|
||||||
|
* If it's uri format, then the value is read from
|
||||||
|
* CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH at key
|
||||||
|
* provider creation time, and set to token's Service field.
|
||||||
|
* When a token is renewed / canceled, its Service field will be used to
|
||||||
|
* instantiate a KeyProvider, eliminating the need to read configs
|
||||||
|
* at that time.
|
||||||
|
*/
|
||||||
private final Text dtService;
|
private final Text dtService;
|
||||||
|
private final boolean copyLegacyToken;
|
||||||
// Allow fallback to default kms server port 9600 for certain tests that do
|
|
||||||
// not specify the port explicitly in the kms provider url.
|
|
||||||
@VisibleForTesting
|
|
||||||
public static volatile boolean fallbackDefaultPortForTesting = false;
|
|
||||||
|
|
||||||
private class EncryptedQueueRefiller implements
|
private class EncryptedQueueRefiller implements
|
||||||
ValueQueue.QueueRefiller<EncryptedKeyVersion> {
|
ValueQueue.QueueRefiller<EncryptedKeyVersion> {
|
||||||
|
@ -162,66 +167,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* The KMS implementation of {@link TokenRenewer}.
|
|
||||||
*/
|
|
||||||
public static class KMSTokenRenewer extends TokenRenewer {
|
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(KMSTokenRenewer.class);
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean handleKind(Text kind) {
|
|
||||||
return kind.equals(TOKEN_KIND);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isManaged(Token<?> token) throws IOException {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long renew(Token<?> token, Configuration conf) throws IOException {
|
|
||||||
LOG.debug("Renewing delegation token {}", token);
|
|
||||||
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
|
|
||||||
KeyProviderFactory.KEY_PROVIDER_PATH);
|
|
||||||
try {
|
|
||||||
if (!(keyProvider instanceof
|
|
||||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
|
||||||
LOG.warn("keyProvider {} cannot renew dt.", keyProvider == null ?
|
|
||||||
"null" : keyProvider.getClass());
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
|
||||||
keyProvider).renewDelegationToken(token);
|
|
||||||
} finally {
|
|
||||||
if (keyProvider != null) {
|
|
||||||
keyProvider.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void cancel(Token<?> token, Configuration conf) throws IOException {
|
|
||||||
LOG.debug("Canceling delegation token {}", token);
|
|
||||||
KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
|
|
||||||
KeyProviderFactory.KEY_PROVIDER_PATH);
|
|
||||||
try {
|
|
||||||
if (!(keyProvider instanceof
|
|
||||||
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
|
||||||
LOG.warn("keyProvider {} cannot cancel dt.", keyProvider == null ?
|
|
||||||
"null" : keyProvider.getClass());
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
|
||||||
keyProvider).cancelDelegationToken(token);
|
|
||||||
} finally {
|
|
||||||
if (keyProvider != null) {
|
|
||||||
keyProvider.close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
|
public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
|
||||||
public KMSEncryptedKeyVersion(String keyName, String keyVersionName,
|
public KMSEncryptedKeyVersion(String keyName, String keyVersionName,
|
||||||
byte[] iv, String encryptedVersionName, byte[] keyMaterial) {
|
byte[] iv, String encryptedVersionName, byte[] keyMaterial) {
|
||||||
|
@ -281,13 +226,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
}
|
}
|
||||||
hostsPart = t[0];
|
hostsPart = t[0];
|
||||||
}
|
}
|
||||||
return createProvider(conf, origUrl, port, hostsPart);
|
return createProvider(conf, origUrl, port, hostsPart, providerUri);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private KeyProvider createProvider(Configuration conf,
|
private KeyProvider createProvider(Configuration conf, URL origUrl,
|
||||||
URL origUrl, int port, String hostsPart) throws IOException {
|
int port, String hostsPart, URI providerUri) throws IOException {
|
||||||
String[] hosts = hostsPart.split(";");
|
String[] hosts = hostsPart.split(";");
|
||||||
KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
|
KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
|
||||||
for (int i = 0; i < hosts.length; i++) {
|
for (int i = 0; i < hosts.length; i++) {
|
||||||
|
@ -295,7 +240,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
providers[i] =
|
providers[i] =
|
||||||
new KMSClientProvider(
|
new KMSClientProvider(
|
||||||
new URI("kms", origUrl.getProtocol(), hosts[i], port,
|
new URI("kms", origUrl.getProtocol(), hosts[i], port,
|
||||||
origUrl.getPath(), null, null), conf);
|
origUrl.getPath(), null, null), conf, providerUri);
|
||||||
} catch (URISyntaxException e) {
|
} catch (URISyntaxException e) {
|
||||||
throw new IOException("Could not instantiate KMSProvider.", e);
|
throw new IOException("Could not instantiate KMSProvider.", e);
|
||||||
}
|
}
|
||||||
|
@ -353,17 +298,10 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public KMSClientProvider(URI uri, Configuration conf) throws IOException {
|
public KMSClientProvider(URI uri, Configuration conf, URI providerUri) throws
|
||||||
|
IOException {
|
||||||
super(conf);
|
super(conf);
|
||||||
kmsUrl = createServiceURL(extractKMSPath(uri));
|
kmsUrl = createServiceURL(extractKMSPath(uri));
|
||||||
int kmsPort = kmsUrl.getPort();
|
|
||||||
if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
|
|
||||||
kmsPort = 9600;
|
|
||||||
}
|
|
||||||
|
|
||||||
InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
|
|
||||||
dtService = SecurityUtil.buildTokenService(addr);
|
|
||||||
|
|
||||||
if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
|
if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
|
||||||
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
||||||
try {
|
try {
|
||||||
|
@ -376,6 +314,9 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_SECONDS,
|
CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_SECONDS,
|
||||||
CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_DEFAULT);
|
CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_DEFAULT);
|
||||||
authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY);
|
authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY);
|
||||||
|
copyLegacyToken = conf.getBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY,
|
||||||
|
KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT);
|
||||||
|
|
||||||
configurator = new TimeoutConnConfigurator(timeout, sslFactory);
|
configurator = new TimeoutConnConfigurator(timeout, sslFactory);
|
||||||
encKeyVersionQueue =
|
encKeyVersionQueue =
|
||||||
new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
|
new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
|
||||||
|
@ -400,6 +341,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
|
KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
|
||||||
new EncryptedQueueRefiller());
|
new EncryptedQueueRefiller());
|
||||||
authToken = new DelegationTokenAuthenticatedURL.Token();
|
authToken = new DelegationTokenAuthenticatedURL.Token();
|
||||||
|
dtService = new Text(providerUri.toString());
|
||||||
LOG.info("KMSClientProvider for KMS url: {} delegation token service: {}" +
|
LOG.info("KMSClientProvider for KMS url: {} delegation token service: {}" +
|
||||||
" created.", kmsUrl, dtService);
|
" created.", kmsUrl, dtService);
|
||||||
}
|
}
|
||||||
|
@ -473,7 +415,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
@Override
|
@Override
|
||||||
public HttpURLConnection run() throws Exception {
|
public HttpURLConnection run() throws Exception {
|
||||||
DelegationTokenAuthenticatedURL authUrl =
|
DelegationTokenAuthenticatedURL authUrl =
|
||||||
new DelegationTokenAuthenticatedURL(configurator);
|
createKMSAuthenticatedURL();
|
||||||
return authUrl.openConnection(url, authToken, doAsUser);
|
return authUrl.openConnection(url, authToken, doAsUser);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -924,7 +866,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
LOG.debug("Renewing delegation token {} with url:{}, as:{}",
|
LOG.debug("Renewing delegation token {} with url:{}, as:{}",
|
||||||
token, url, doAsUser);
|
token, url, doAsUser);
|
||||||
final DelegationTokenAuthenticatedURL authUrl =
|
final DelegationTokenAuthenticatedURL authUrl =
|
||||||
new DelegationTokenAuthenticatedURL(configurator);
|
createKMSAuthenticatedURL();
|
||||||
return getActualUgi().doAs(
|
return getActualUgi().doAs(
|
||||||
new PrivilegedExceptionAction<Long>() {
|
new PrivilegedExceptionAction<Long>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -956,7 +898,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
|
LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
|
||||||
dToken, url, doAsUser);
|
dToken, url, doAsUser);
|
||||||
final DelegationTokenAuthenticatedURL authUrl =
|
final DelegationTokenAuthenticatedURL authUrl =
|
||||||
new DelegationTokenAuthenticatedURL(configurator);
|
createKMSAuthenticatedURL();
|
||||||
authUrl.cancelDelegationToken(url, token, doAsUser);
|
authUrl.cancelDelegationToken(url, token, doAsUser);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -1008,6 +950,17 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
return token;
|
return token;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
DelegationTokenAuthenticatedURL createKMSAuthenticatedURL() {
|
||||||
|
return new DelegationTokenAuthenticatedURL(configurator) {
|
||||||
|
@Override
|
||||||
|
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
||||||
|
getDelegationToken(URL url, Credentials creds) {
|
||||||
|
return selectKMSDelegationToken(creds);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Token<?>[] addDelegationTokens(final String renewer,
|
public Token<?>[] addDelegationTokens(final String renewer,
|
||||||
Credentials credentials) throws IOException {
|
Credentials credentials) throws IOException {
|
||||||
|
@ -1016,7 +969,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
if (token == null) {
|
if (token == null) {
|
||||||
final URL url = createURL(null, null, null, null);
|
final URL url = createURL(null, null, null, null);
|
||||||
final DelegationTokenAuthenticatedURL authUrl =
|
final DelegationTokenAuthenticatedURL authUrl =
|
||||||
new DelegationTokenAuthenticatedURL(configurator);
|
createKMSAuthenticatedURL();
|
||||||
try {
|
try {
|
||||||
final String doAsUser = getDoAsUser();
|
final String doAsUser = getDoAsUser();
|
||||||
token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
|
token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
|
||||||
|
@ -1030,9 +983,16 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (token != null) {
|
if (token != null) {
|
||||||
LOG.debug("New token received: ({})", token);
|
if (KMSDelegationToken.TOKEN_KIND.equals(token.getKind())) {
|
||||||
|
// do not set service for legacy kind, for compatibility.
|
||||||
|
token.setService(dtService);
|
||||||
|
}
|
||||||
|
LOG.info("New token created: ({})", token);
|
||||||
credentials.addToken(token.getService(), token);
|
credentials.addToken(token.getService(), token);
|
||||||
tokens = new Token<?>[] { token };
|
Token<?> legacyToken = createAndAddLegacyToken(credentials, token);
|
||||||
|
tokens = legacyToken == null ?
|
||||||
|
new Token<?>[] {token} :
|
||||||
|
new Token<?>[] {token, legacyToken};
|
||||||
} else {
|
} else {
|
||||||
throw new IOException("Got NULL as delegation token");
|
throw new IOException("Got NULL as delegation token");
|
||||||
}
|
}
|
||||||
|
@ -1049,13 +1009,75 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
|
||||||
return tokens;
|
return tokens;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
|
/**
|
||||||
// Add existing credentials from the UGI, since provider is cached.
|
* If {@link CommonConfigurationKeysPublic#KMS_CLIENT_COPY_LEGACY_TOKEN_KEY}
|
||||||
Credentials creds = ugi.getCredentials();
|
* is true when creating the provider, then copy the passed-in token of
|
||||||
|
* {@link KMSDelegationToken#TOKEN_KIND} and create a new token of
|
||||||
|
* {@link KMSDelegationToken#TOKEN_LEGACY_KIND}, and add it to credentials.
|
||||||
|
*
|
||||||
|
* @return The legacy token, or null if one should not be created.
|
||||||
|
*/
|
||||||
|
private Token<?> createAndAddLegacyToken(Credentials credentials,
|
||||||
|
Token<?> token) {
|
||||||
|
if (!copyLegacyToken || !KMSDelegationToken.TOKEN_KIND
|
||||||
|
.equals(token.getKind())) {
|
||||||
|
LOG.debug("Not creating legacy token because copyLegacyToken={}, "
|
||||||
|
+ "token={}", copyLegacyToken, token);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
// copy a KMS_DELEGATION_TOKEN and create a new kms-dt with the same
|
||||||
|
// underlying token for backwards-compatibility. Old clients/renewers
|
||||||
|
// does not parse the new token and can only work with kms-dt.
|
||||||
|
final Token<?> legacyToken = token.copyToken();
|
||||||
|
legacyToken.setKind(KMSDelegationToken.TOKEN_LEGACY_KIND);
|
||||||
|
final InetSocketAddress addr =
|
||||||
|
new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
|
||||||
|
final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
|
||||||
|
legacyToken.setService(fallBackServiceText);
|
||||||
|
LOG.info("Copied token to legacy kind: {}", legacyToken);
|
||||||
|
credentials.addToken(legacyToken.getService(), legacyToken);
|
||||||
|
return legacyToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Text getDelegationTokenService() {
|
||||||
|
return dtService;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a list of tokens, return the token that should be used for KMS
|
||||||
|
* authentication.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
Token selectKMSDelegationToken(Credentials creds) {
|
||||||
|
// always look for TOKEN_KIND first
|
||||||
|
final TokenSelector<AbstractDelegationTokenIdentifier> tokenSelector =
|
||||||
|
new AbstractDelegationTokenSelector<AbstractDelegationTokenIdentifier>(
|
||||||
|
KMSDelegationToken.TOKEN_KIND) {
|
||||||
|
};
|
||||||
|
Token token = tokenSelector.selectToken(dtService, creds.getAllTokens());
|
||||||
|
LOG.debug("Searching service {} found token {}", dtService, token);
|
||||||
|
if (token != null) {
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fall back to look for token by service, regardless of kind.
|
||||||
|
// this is old behavior, keeping for compatibility reasons (for example,
|
||||||
|
// even if KMS server is new, if the job is submitted with an old kms
|
||||||
|
// client, job runners on new version should be able to find the token).
|
||||||
|
final InetSocketAddress addr =
|
||||||
|
new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
|
||||||
|
final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
|
||||||
|
token = creds.getToken(fallBackServiceText);
|
||||||
|
LOG.debug("Selected delegation token {} using service:{}", token,
|
||||||
|
fallBackServiceText);
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean containsKmsDt(UserGroupInformation ugi) {
|
||||||
|
final Credentials creds = ugi.getCredentials();
|
||||||
if (!creds.getAllTokens().isEmpty()) {
|
if (!creds.getAllTokens().isEmpty()) {
|
||||||
LOG.debug("Searching for token that matches service: {}", dtService);
|
final Token dToken = selectKMSDelegationToken(creds);
|
||||||
org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
|
||||||
dToken = creds.getToken(dtService);
|
|
||||||
if (dToken != null) {
|
if (dToken != null) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,10 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class KMSDelegationToken {
|
public final class KMSDelegationToken {
|
||||||
|
|
||||||
public static final String TOKEN_KIND_STR = "kms-dt";
|
public static final String TOKEN_LEGACY_KIND_STR = "kms-dt";
|
||||||
|
public static final Text TOKEN_LEGACY_KIND = new Text(TOKEN_LEGACY_KIND_STR);
|
||||||
|
|
||||||
|
public static final String TOKEN_KIND_STR = "KMS_DELEGATION_TOKEN";
|
||||||
public static final Text TOKEN_KIND = new Text(TOKEN_KIND_STR);
|
public static final Text TOKEN_KIND = new Text(TOKEN_KIND_STR);
|
||||||
|
|
||||||
// Utility class is not supposed to be instantiated.
|
// Utility class is not supposed to be instantiated.
|
||||||
|
@ -49,4 +52,21 @@ public final class KMSDelegationToken {
|
||||||
return TOKEN_KIND;
|
return TOKEN_KIND;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* DelegationTokenIdentifier used for the KMS for legacy tokens.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
|
public static class KMSLegacyDelegationTokenIdentifier
|
||||||
|
extends DelegationTokenIdentifier {
|
||||||
|
|
||||||
|
public KMSLegacyDelegationTokenIdentifier() {
|
||||||
|
super(TOKEN_LEGACY_KIND);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Text getKind() {
|
||||||
|
return TOKEN_LEGACY_KIND;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -0,0 +1,56 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.crypto.key.kms;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.util.KMSUtil;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The {@link KMSTokenRenewer} that supports legacy tokens.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@Deprecated
|
||||||
|
public class KMSLegacyTokenRenewer extends KMSTokenRenewer {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean handleKind(Text kind) {
|
||||||
|
return kind.equals(TOKEN_LEGACY_KIND);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a key provider for token renewal / cancellation.
|
||||||
|
* Caller is responsible for closing the key provider.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
protected KeyProvider createKeyProvider(Token<?> token,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
assert token.getKind().equals(TOKEN_LEGACY_KIND);
|
||||||
|
// Legacy tokens get service from configuration.
|
||||||
|
return KMSUtil.createKeyProvider(conf,
|
||||||
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,103 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.crypto.key.kms;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenRenewer;
|
||||||
|
import org.apache.hadoop.util.KMSUtil;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The KMS implementation of {@link TokenRenewer}.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class KMSTokenRenewer extends TokenRenewer {
|
||||||
|
|
||||||
|
public static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(org.apache.hadoop.crypto.key.kms.KMSTokenRenewer.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean handleKind(Text kind) {
|
||||||
|
return kind.equals(TOKEN_KIND);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isManaged(Token<?> token) throws IOException {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long renew(Token<?> token, Configuration conf) throws IOException {
|
||||||
|
LOG.debug("Renewing delegation token {}", token);
|
||||||
|
final KeyProvider keyProvider = createKeyProvider(token, conf);
|
||||||
|
try {
|
||||||
|
if (!(keyProvider instanceof
|
||||||
|
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||||
|
LOG.warn("keyProvider {} cannot renew token {}.",
|
||||||
|
keyProvider == null ? "null" : keyProvider.getClass(), token);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
||||||
|
keyProvider).renewDelegationToken(token);
|
||||||
|
} finally {
|
||||||
|
if (keyProvider != null) {
|
||||||
|
keyProvider.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cancel(Token<?> token, Configuration conf) throws IOException {
|
||||||
|
LOG.debug("Canceling delegation token {}", token);
|
||||||
|
final KeyProvider keyProvider = createKeyProvider(token, conf);
|
||||||
|
try {
|
||||||
|
if (!(keyProvider instanceof
|
||||||
|
KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
|
||||||
|
LOG.warn("keyProvider {} cannot cancel token {}.",
|
||||||
|
keyProvider == null ? "null" : keyProvider.getClass(), token);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
|
||||||
|
keyProvider).cancelDelegationToken(token);
|
||||||
|
} finally {
|
||||||
|
if (keyProvider != null) {
|
||||||
|
keyProvider.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a key provider for token renewal / cancellation.
|
||||||
|
* Caller is responsible for closing the key provider.
|
||||||
|
*/
|
||||||
|
protected KeyProvider createKeyProvider(Token<?> token,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
return KMSUtil
|
||||||
|
.createKeyProviderFromTokenService(conf, token.getService().toString());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,18 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.crypto.key.kms;
|
|
@ -770,6 +770,16 @@ public class CommonConfigurationKeysPublic {
|
||||||
/** Default value is 100 ms. */
|
/** Default value is 100 ms. */
|
||||||
public static final int KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT = 100;
|
public static final int KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT = 100;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @see
|
||||||
|
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
||||||
|
* core-default.xml</a>
|
||||||
|
*/
|
||||||
|
public static final String KMS_CLIENT_COPY_LEGACY_TOKEN_KEY =
|
||||||
|
"hadoop.security.kms.client.copy.legacy.token";
|
||||||
|
/** Default value is true. */
|
||||||
|
public static final boolean KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT = true;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see
|
* @see
|
||||||
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
|
||||||
|
|
|
@ -300,11 +300,7 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
|
||||||
creds.getAllTokens());
|
creds.getAllTokens());
|
||||||
}
|
}
|
||||||
if (!creds.getAllTokens().isEmpty()) {
|
if (!creds.getAllTokens().isEmpty()) {
|
||||||
InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
|
dToken = getDelegationToken(url, creds);
|
||||||
url.getPort());
|
|
||||||
Text service = SecurityUtil.buildTokenService(serviceAddr);
|
|
||||||
dToken = creds.getToken(service);
|
|
||||||
LOG.debug("Using delegation token {} from service:{}", dToken, service);
|
|
||||||
if (dToken != null) {
|
if (dToken != null) {
|
||||||
if (useQueryStringForDelegationToken()) {
|
if (useQueryStringForDelegationToken()) {
|
||||||
// delegation token will go in the query string, injecting it
|
// delegation token will go in the query string, injecting it
|
||||||
|
@ -340,6 +336,21 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Select a delegation token from all tokens in credentials, based on url.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
|
||||||
|
getDelegationToken(URL url, Credentials creds) {
|
||||||
|
final InetSocketAddress serviceAddr =
|
||||||
|
new InetSocketAddress(url.getHost(), url.getPort());
|
||||||
|
final Text service = SecurityUtil.buildTokenService(serviceAddr);
|
||||||
|
org.apache.hadoop.security.token.Token<? extends TokenIdentifier> dToken =
|
||||||
|
creds.getToken(service);
|
||||||
|
LOG.debug("Selected delegation token {} using service:{}", dToken, service);
|
||||||
|
return dToken;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Requests a delegation token using the configured <code>Authenticator</code>
|
* Requests a delegation token using the configured <code>Authenticator</code>
|
||||||
* for authentication.
|
* for authentication.
|
||||||
|
|
|
@ -81,7 +81,7 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public abstract class DelegationTokenAuthenticationHandler
|
public abstract class DelegationTokenAuthenticationHandler
|
||||||
implements AuthenticationHandler {
|
implements AuthenticationHandler {
|
||||||
private static final Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(DelegationTokenAuthenticationHandler.class);
|
LoggerFactory.getLogger(DelegationTokenAuthenticationHandler.class);
|
||||||
|
|
||||||
protected static final String TYPE_POSTFIX = "-dt";
|
protected static final String TYPE_POSTFIX = "-dt";
|
||||||
|
@ -224,7 +224,8 @@ public abstract class DelegationTokenAuthenticationHandler
|
||||||
HttpServletRequest request, HttpServletResponse response)
|
HttpServletRequest request, HttpServletResponse response)
|
||||||
throws IOException, AuthenticationException {
|
throws IOException, AuthenticationException {
|
||||||
boolean requestContinues = true;
|
boolean requestContinues = true;
|
||||||
LOG.trace("Processing operation for req=({}), token: {}", request, token);
|
LOG.trace("Processing operation for req=({}), token: {}",
|
||||||
|
request.getRequestURL(), token);
|
||||||
String op = ServletUtils.getParameter(request,
|
String op = ServletUtils.getParameter(request,
|
||||||
KerberosDelegationTokenAuthenticator.OP_PARAM);
|
KerberosDelegationTokenAuthenticator.OP_PARAM);
|
||||||
op = (op != null) ? StringUtils.toUpperCase(op) : null;
|
op = (op != null) ? StringUtils.toUpperCase(op) : null;
|
||||||
|
@ -407,7 +408,8 @@ public abstract class DelegationTokenAuthenticationHandler
|
||||||
HttpServletResponse.SC_FORBIDDEN, new AuthenticationException(ex));
|
HttpServletResponse.SC_FORBIDDEN, new AuthenticationException(ex));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.debug("Falling back to {} (req={})", authHandler.getClass(), request);
|
LOG.debug("Falling back to {} (req={})", authHandler.getClass(),
|
||||||
|
request.getRequestURL());
|
||||||
token = authHandler.authenticate(request, response);
|
token = authHandler.authenticate(request, response);
|
||||||
}
|
}
|
||||||
return token;
|
return token;
|
||||||
|
|
|
@ -50,7 +50,7 @@ import java.util.Map;
|
||||||
@InterfaceAudience.Public
|
@InterfaceAudience.Public
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public abstract class DelegationTokenAuthenticator implements Authenticator {
|
public abstract class DelegationTokenAuthenticator implements Authenticator {
|
||||||
private static Logger LOG =
|
public static final Logger LOG =
|
||||||
LoggerFactory.getLogger(DelegationTokenAuthenticator.class);
|
LoggerFactory.getLogger(DelegationTokenAuthenticator.class);
|
||||||
|
|
||||||
private static final String CONTENT_TYPE = "Content-Type";
|
private static final String CONTENT_TYPE = "Content-Type";
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -41,8 +42,7 @@ import java.util.Map;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public final class KMSUtil {
|
public final class KMSUtil {
|
||||||
public static final Logger LOG =
|
public static final Logger LOG = LoggerFactory.getLogger(KMSUtil.class);
|
||||||
LoggerFactory.getLogger(KMSUtil.class);
|
|
||||||
|
|
||||||
private KMSUtil() { /* Hidden constructor */ }
|
private KMSUtil() { /* Hidden constructor */ }
|
||||||
|
|
||||||
|
@ -64,6 +64,13 @@ public final class KMSUtil {
|
||||||
if (providerUriStr == null || providerUriStr.isEmpty()) {
|
if (providerUriStr == null || providerUriStr.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
KeyProvider kp = KMSUtilFaultInjector.get().createKeyProviderForTests(
|
||||||
|
providerUriStr, conf);
|
||||||
|
if (kp != null) {
|
||||||
|
LOG.info("KeyProvider is created with uri: {}. This should happen only " +
|
||||||
|
"in tests.", providerUriStr);
|
||||||
|
return kp;
|
||||||
|
}
|
||||||
return createKeyProviderFromUri(conf, URI.create(providerUriStr));
|
return createKeyProviderFromUri(conf, URI.create(providerUriStr));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -205,4 +212,38 @@ public final class KMSUtil {
|
||||||
}
|
}
|
||||||
return metadata;
|
return metadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a key provider from token service field, which must be URI format.
|
||||||
|
*
|
||||||
|
* @param conf
|
||||||
|
* @param tokenServiceValue
|
||||||
|
* @return new KeyProvider or null
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static KeyProvider createKeyProviderFromTokenService(
|
||||||
|
final Configuration conf, final String tokenServiceValue)
|
||||||
|
throws IOException {
|
||||||
|
LOG.debug("Creating key provider from token service value {}. ",
|
||||||
|
tokenServiceValue);
|
||||||
|
final KeyProvider kp = KMSUtilFaultInjector.get()
|
||||||
|
.createKeyProviderForTests(tokenServiceValue, conf);
|
||||||
|
if (kp != null) {
|
||||||
|
LOG.info("KeyProvider is created with uri: {}. This should happen only "
|
||||||
|
+ "in tests.", tokenServiceValue);
|
||||||
|
return kp;
|
||||||
|
}
|
||||||
|
if (!tokenServiceValue.contains("://")) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid token service " + tokenServiceValue);
|
||||||
|
}
|
||||||
|
final URI tokenServiceUri;
|
||||||
|
try {
|
||||||
|
tokenServiceUri = new URI(tokenServiceValue);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Invalid token service " + tokenServiceValue, e);
|
||||||
|
}
|
||||||
|
return createKeyProviderFromUri(conf, tokenServiceUri);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used for returning custom KeyProvider from test methods.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public class KMSUtilFaultInjector {
|
||||||
|
private static KMSUtilFaultInjector instance = new KMSUtilFaultInjector();
|
||||||
|
|
||||||
|
public static KMSUtilFaultInjector get() {
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void set(KMSUtilFaultInjector injector) {
|
||||||
|
instance = injector;
|
||||||
|
}
|
||||||
|
|
||||||
|
public KeyProvider createKeyProviderForTests(String value, Configuration conf)
|
||||||
|
throws IOException {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
|
@ -12,3 +12,4 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSDelegationTokenIdentifier
|
org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSDelegationTokenIdentifier
|
||||||
|
org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSLegacyDelegationTokenIdentifier
|
||||||
|
|
|
@ -11,4 +11,5 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
#
|
#
|
||||||
org.apache.hadoop.crypto.key.kms.KMSClientProvider$KMSTokenRenewer
|
org.apache.hadoop.crypto.key.kms.KMSTokenRenewer
|
||||||
|
org.apache.hadoop.crypto.key.kms.KMSLegacyTokenRenewer
|
|
@ -2602,6 +2602,26 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>hadoop.security.kms.client.copy.legacy.token</name>
|
||||||
|
<value>true</value>
|
||||||
|
<description>
|
||||||
|
Expert only. Whether the KMS client provider should copy a token to legacy
|
||||||
|
kind. This is for KMS_DELEGATION_TOKEN to be backwards compatible. With the
|
||||||
|
default value set to true, the client will locally duplicate the
|
||||||
|
KMS_DELEGATION_TOKEN token and create a kms-dt token, with the service field
|
||||||
|
conforming to kms-dt. All other parts of the token remain the same.
|
||||||
|
Then the new clients will use KMS_DELEGATION_TOKEN and old clients will
|
||||||
|
use kms-dt to authenticate. Default value is true.
|
||||||
|
You should only change this to false if you know all the KMS servers
|
||||||
|
, clients (including both job submitters and job runners) and the
|
||||||
|
token renewers (usually Yarn RM) are on a version that supports
|
||||||
|
KMS_DELEGATION_TOKEN.
|
||||||
|
Turning this off prematurely may result in old clients failing to
|
||||||
|
authenticate with new servers.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>hadoop.security.kms.client.failover.sleep.max.millis</name>
|
<name>hadoop.security.kms.client.failover.sleep.max.millis</name>
|
||||||
<value>2000</value>
|
<value>2000</value>
|
||||||
|
|
|
@ -0,0 +1,162 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.crypto.key.kms;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.Credentials;
|
||||||
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.slf4j.event.Level;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URL;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
|
||||||
|
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit test for {@link KMSClientProvider} class.
|
||||||
|
*/
|
||||||
|
public class TestKMSClientProvider {
|
||||||
|
|
||||||
|
public static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestKMSClientProvider.class);
|
||||||
|
|
||||||
|
private final Token token = new Token();
|
||||||
|
private final Token legacyToken = new Token();
|
||||||
|
private final String uriString = "kms://https@host:16000/kms";
|
||||||
|
private final String legacyTokenService = "host:16000";
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout globalTimeout = new Timeout(30000);
|
||||||
|
|
||||||
|
{
|
||||||
|
GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
SecurityUtil.setTokenServiceUseIp(false);
|
||||||
|
token.setKind(TOKEN_KIND);
|
||||||
|
token.setService(new Text(uriString));
|
||||||
|
legacyToken.setKind(TOKEN_LEGACY_KIND);
|
||||||
|
legacyToken.setService(new Text(legacyTokenService));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNotCopyFromLegacyToken() throws Exception {
|
||||||
|
final DelegationTokenAuthenticatedURL url =
|
||||||
|
mock(DelegationTokenAuthenticatedURL.class);
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
final URI uri = new URI(uriString);
|
||||||
|
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
|
||||||
|
try {
|
||||||
|
final KMSClientProvider spyKp = spy(kp);
|
||||||
|
when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
|
||||||
|
when(url.getDelegationToken(any(URL.class),
|
||||||
|
any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
|
||||||
|
any(String.class))).thenReturn(legacyToken);
|
||||||
|
|
||||||
|
final Credentials creds = new Credentials();
|
||||||
|
final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
|
||||||
|
LOG.info("Got tokens: {}", tokens);
|
||||||
|
assertEquals(1, tokens.length);
|
||||||
|
LOG.info("uri:" + uriString);
|
||||||
|
// if KMS server returned a legacy token, new client should leave the
|
||||||
|
// service being legacy and not set uri string
|
||||||
|
assertEquals(legacyTokenService, tokens[0].getService().toString());
|
||||||
|
} finally {
|
||||||
|
kp.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCopyFromToken() throws Exception {
|
||||||
|
final DelegationTokenAuthenticatedURL url =
|
||||||
|
mock(DelegationTokenAuthenticatedURL.class);
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
final URI uri = new URI(uriString);
|
||||||
|
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
|
||||||
|
try {
|
||||||
|
final KMSClientProvider spyKp = spy(kp);
|
||||||
|
when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
|
||||||
|
when(url.getDelegationToken(any(URL.class),
|
||||||
|
any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
|
||||||
|
any(String.class))).thenReturn(token);
|
||||||
|
|
||||||
|
final Credentials creds = new Credentials();
|
||||||
|
final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
|
||||||
|
LOG.info("Got tokens: {}", tokens);
|
||||||
|
assertEquals(2, tokens.length);
|
||||||
|
assertTrue(creds.getAllTokens().contains(token));
|
||||||
|
assertNotNull(creds.getToken(legacyToken.getService()));
|
||||||
|
} finally {
|
||||||
|
kp.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSelectTokenWhenBothExist() throws Exception {
|
||||||
|
final Credentials creds = new Credentials();
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
final URI uri = new URI(uriString);
|
||||||
|
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
|
||||||
|
try {
|
||||||
|
creds.addToken(token.getService(), token);
|
||||||
|
creds.addToken(legacyToken.getService(), legacyToken);
|
||||||
|
Token t = kp.selectKMSDelegationToken(creds);
|
||||||
|
assertEquals(token, t);
|
||||||
|
} finally {
|
||||||
|
kp.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSelectTokenLegacyService() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
final URI uri = new URI(uriString);
|
||||||
|
final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
|
||||||
|
try {
|
||||||
|
Text legacyService = new Text(legacyTokenService);
|
||||||
|
token.setService(legacyService);
|
||||||
|
final Credentials creds = new Credentials();
|
||||||
|
creds.addToken(legacyService, token);
|
||||||
|
Token t = kp.selectKMSDelegationToken(creds);
|
||||||
|
assertEquals(token, t);
|
||||||
|
} finally {
|
||||||
|
kp.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -42,7 +42,8 @@ import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.junit.After;
|
import org.apache.hadoop.util.KMSUtil;
|
||||||
|
import org.apache.hadoop.util.KMSUtilFaultInjector;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
@ -56,33 +57,68 @@ public class TestLoadBalancingKMSClientProvider {
|
||||||
SecurityUtil.setTokenServiceUseIp(false);
|
SecurityUtil.setTokenServiceUseIp(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
private void setKMSUtilFaultInjector() {
|
||||||
public void teardown() throws IOException {
|
KMSUtilFaultInjector injector = new KMSUtilFaultInjector() {
|
||||||
KMSClientProvider.fallbackDefaultPortForTesting = false;
|
@Override
|
||||||
|
public KeyProvider createKeyProviderForTests(
|
||||||
|
String value, Configuration conf) throws IOException {
|
||||||
|
return TestLoadBalancingKMSClientProvider
|
||||||
|
.createKeyProviderForTests(value, conf);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
KMSUtilFaultInjector.set(injector);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static KeyProvider createKeyProviderForTests(
|
||||||
|
String value, Configuration conf) throws IOException {
|
||||||
|
// The syntax for kms servers will be
|
||||||
|
// kms://http@localhost:port1/kms,kms://http@localhost:port2/kms
|
||||||
|
if (!value.contains(",")) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
String[] keyProviderUrisStr = value.split(",");
|
||||||
|
KMSClientProvider[] keyProviderArr =
|
||||||
|
new KMSClientProvider[keyProviderUrisStr.length];
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
for (String keyProviderUri: keyProviderUrisStr) {
|
||||||
|
KMSClientProvider kmcp =
|
||||||
|
new KMSClientProvider(URI.create(keyProviderUri), conf, URI
|
||||||
|
.create(value));
|
||||||
|
keyProviderArr[i] = kmcp;
|
||||||
|
i++;
|
||||||
|
}
|
||||||
|
LoadBalancingKMSClientProvider lbkcp =
|
||||||
|
new LoadBalancingKMSClientProvider(keyProviderArr, conf);
|
||||||
|
return lbkcp;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCreation() throws Exception {
|
public void testCreation() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
KMSClientProvider.fallbackDefaultPortForTesting = true;
|
|
||||||
KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
|
KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
|
||||||
"kms://http@host1/kms/foo"), conf);
|
"kms://http@host1:9600/kms/foo"), conf);
|
||||||
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
||||||
KMSClientProvider[] providers =
|
KMSClientProvider[] providers =
|
||||||
((LoadBalancingKMSClientProvider) kp).getProviders();
|
((LoadBalancingKMSClientProvider) kp).getProviders();
|
||||||
assertEquals(1, providers.length);
|
assertEquals(1, providers.length);
|
||||||
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"),
|
assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"),
|
||||||
Sets.newHashSet(providers[0].getKMSUrl()));
|
Sets.newHashSet(providers[0].getKMSUrl()));
|
||||||
|
setKMSUtilFaultInjector();
|
||||||
kp = new KMSClientProvider.Factory().createProvider(new URI(
|
String uriStr = "kms://http@host1:9600/kms/foo," +
|
||||||
"kms://http@host1;host2;host3/kms/foo"), conf);
|
"kms://http@host2:9600/kms/foo," +
|
||||||
|
"kms://http@host3:9600/kms/foo";
|
||||||
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
|
||||||
|
uriStr);
|
||||||
|
kp = KMSUtil.createKeyProvider(conf, CommonConfigurationKeysPublic
|
||||||
|
.HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||||
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
assertTrue(kp instanceof LoadBalancingKMSClientProvider);
|
||||||
providers =
|
providers =
|
||||||
((LoadBalancingKMSClientProvider) kp).getProviders();
|
((LoadBalancingKMSClientProvider) kp).getProviders();
|
||||||
assertEquals(3, providers.length);
|
assertEquals(3, providers.length);
|
||||||
assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/",
|
assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/",
|
||||||
"http://host2/kms/foo/v1/",
|
"http://host2:9600/kms/foo/v1/",
|
||||||
"http://host3/kms/foo/v1/"),
|
"http://host3:9600/kms/foo/v1/"),
|
||||||
Sets.newHashSet(providers[0].getKMSUrl(),
|
Sets.newHashSet(providers[0].getKMSUrl(),
|
||||||
providers[1].getKMSUrl(),
|
providers[1].getKMSUrl(),
|
||||||
providers[2].getKMSUrl()));
|
providers[2].getKMSUrl()));
|
||||||
|
@ -208,7 +244,7 @@ public class TestLoadBalancingKMSClientProvider {
|
||||||
|
|
||||||
private class MyKMSClientProvider extends KMSClientProvider {
|
private class MyKMSClientProvider extends KMSClientProvider {
|
||||||
public MyKMSClientProvider(URI uri, Configuration conf) throws IOException {
|
public MyKMSClientProvider(URI uri, Configuration conf) throws IOException {
|
||||||
super(uri, conf);
|
super(uri, conf, uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -245,9 +281,8 @@ public class TestLoadBalancingKMSClientProvider {
|
||||||
@Test
|
@Test
|
||||||
public void testClassCastException() throws Exception {
|
public void testClassCastException() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
KMSClientProvider.fallbackDefaultPortForTesting = true;
|
|
||||||
KMSClientProvider p1 = new MyKMSClientProvider(
|
KMSClientProvider p1 = new MyKMSClientProvider(
|
||||||
new URI("kms://http@host1/kms/foo"), conf);
|
new URI("kms://http@host1:9600/kms/foo"), conf);
|
||||||
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
|
||||||
new KMSClientProvider[] {p1}, 0, conf);
|
new KMSClientProvider[] {p1}, 0, conf);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.crypto.key.KeyProvider;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Timeout;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test {@link KMSUtil}.
|
||||||
|
*/
|
||||||
|
public class TestKMSUtil {
|
||||||
|
|
||||||
|
public static final Logger LOG = LoggerFactory.getLogger(TestKMSUtil.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Timeout globalTimeout = new Timeout(90000);
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCreateKeyProviderFromTokenService() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
KeyProvider kp = KMSUtil.createKeyProviderFromTokenService(conf,
|
||||||
|
"kms://https@localhost:9600/kms");
|
||||||
|
assertNotNull(kp);
|
||||||
|
kp.close();
|
||||||
|
|
||||||
|
kp = KMSUtil.createKeyProviderFromTokenService(conf,
|
||||||
|
"kms://https@localhost:9600/kms,kms://localhost1:9600/kms");
|
||||||
|
assertNotNull(kp);
|
||||||
|
kp.close();
|
||||||
|
|
||||||
|
String invalidService = "whatever:9600";
|
||||||
|
try {
|
||||||
|
KMSUtil.createKeyProviderFromTokenService(conf, invalidService);
|
||||||
|
} catch (Exception ex) {
|
||||||
|
LOG.info("Expected exception:", ex);
|
||||||
|
assertTrue(ex instanceof IllegalArgumentException);
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Invalid token service " + invalidService, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,3 +1,4 @@
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
@ -31,26 +32,35 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
|
||||||
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
||||||
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
|
import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
|
||||||
import org.apache.hadoop.crypto.key.kms.KMSDelegationToken;
|
import org.apache.hadoop.crypto.key.kms.KMSDelegationToken;
|
||||||
|
import org.apache.hadoop.crypto.key.kms.KMSTokenRenewer;
|
||||||
import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
|
import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
|
||||||
|
import org.apache.hadoop.crypto.key.kms.TestLoadBalancingKMSClientProvider;
|
||||||
import org.apache.hadoop.crypto.key.kms.ValueQueue;
|
import org.apache.hadoop.crypto.key.kms.ValueQueue;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.minikdc.MiniKdc;
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
|
||||||
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
||||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.util.KMSUtil;
|
||||||
|
import org.apache.hadoop.util.KMSUtilFaultInjector;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.http.client.utils.URIBuilder;
|
import org.apache.http.client.utils.URIBuilder;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.Timeout;
|
import org.junit.rules.Timeout;
|
||||||
|
@ -71,7 +81,6 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.io.Writer;
|
import java.io.Writer;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.net.ServerSocket;
|
import java.net.ServerSocket;
|
||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
@ -96,6 +105,10 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
|
||||||
|
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
|
||||||
|
import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
@ -113,6 +126,20 @@ public class TestKMS {
|
||||||
|
|
||||||
private SSLFactory sslFactory;
|
private SSLFactory sslFactory;
|
||||||
|
|
||||||
|
private final KMSUtilFaultInjector oldInjector =
|
||||||
|
KMSUtilFaultInjector.get();
|
||||||
|
|
||||||
|
// Injector to create providers with different ports. Can only happen in tests
|
||||||
|
private final KMSUtilFaultInjector testInjector =
|
||||||
|
new KMSUtilFaultInjector() {
|
||||||
|
@Override
|
||||||
|
public KeyProvider createKeyProviderForTests(String value,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
return TestLoadBalancingKMSClientProvider
|
||||||
|
.createKeyProviderForTests(value, conf);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// Keep track of all key providers created during a test case, so they can be
|
// Keep track of all key providers created during a test case, so they can be
|
||||||
// closed at test tearDown.
|
// closed at test tearDown.
|
||||||
private List<KeyProvider> providersCreated = new LinkedList<>();
|
private List<KeyProvider> providersCreated = new LinkedList<>();
|
||||||
|
@ -122,7 +149,12 @@ public class TestKMS {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
setUpMiniKdc();
|
GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
|
||||||
|
GenericTestUtils
|
||||||
|
.setLogLevel(DelegationTokenAuthenticationHandler.LOG, Level.TRACE);
|
||||||
|
GenericTestUtils
|
||||||
|
.setLogLevel(DelegationTokenAuthenticator.LOG, Level.TRACE);
|
||||||
|
GenericTestUtils.setLogLevel(KMSUtil.LOG, Level.TRACE);
|
||||||
// resetting kerberos security
|
// resetting kerberos security
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
|
@ -141,24 +173,78 @@ public class TestKMS {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static abstract class KMSCallable<T> implements Callable<T> {
|
public static abstract class KMSCallable<T> implements Callable<T> {
|
||||||
private URL kmsUrl;
|
private List<URL> kmsUrl;
|
||||||
|
|
||||||
protected URL getKMSUrl() {
|
protected URL getKMSUrl() {
|
||||||
return kmsUrl;
|
return kmsUrl.get(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected URL[] getKMSHAUrl() {
|
||||||
|
URL[] urls = new URL[kmsUrl.size()];
|
||||||
|
return kmsUrl.toArray(urls);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addKMSUrl(URL url) {
|
||||||
|
if (kmsUrl == null) {
|
||||||
|
kmsUrl = new ArrayList<URL>();
|
||||||
|
}
|
||||||
|
kmsUrl.add(url);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The format of the returned value will be
|
||||||
|
* kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
|
||||||
|
*/
|
||||||
|
protected String generateLoadBalancingKeyProviderUriString() {
|
||||||
|
if (kmsUrl == null || kmsUrl.size() == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
StringBuffer sb = new StringBuffer();
|
||||||
|
|
||||||
|
for (int i = 0; i < kmsUrl.size(); i++) {
|
||||||
|
sb.append(KMSClientProvider.SCHEME_NAME + "://" +
|
||||||
|
kmsUrl.get(0).getProtocol() + "@");
|
||||||
|
URL url = kmsUrl.get(i);
|
||||||
|
sb.append(url.getAuthority());
|
||||||
|
if (url.getPath() != null) {
|
||||||
|
sb.append(url.getPath());
|
||||||
|
}
|
||||||
|
if (i < kmsUrl.size() - 1) {
|
||||||
|
sb.append(",");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sb.toString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected KeyProvider createProvider(URI uri, Configuration conf)
|
protected KeyProvider createProvider(URI uri, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final KeyProvider ret = new LoadBalancingKMSClientProvider(
|
final KeyProvider ret = new LoadBalancingKMSClientProvider(
|
||||||
new KMSClientProvider[] {new KMSClientProvider(uri, conf)}, conf);
|
new KMSClientProvider[] {new KMSClientProvider(uri, conf, uri)}, conf);
|
||||||
providersCreated.add(ret);
|
providersCreated.add(ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* create a LoadBalancingKMSClientProvider from an array of URIs.
|
||||||
|
* @param uris an array of KMS URIs
|
||||||
|
* @param conf configuration object
|
||||||
|
* @return a LoadBalancingKMSClientProvider object
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
protected LoadBalancingKMSClientProvider createHAProvider(URI[] uris,
|
||||||
|
Configuration conf, String originalUri) throws IOException {
|
||||||
|
KMSClientProvider[] providers = new KMSClientProvider[uris.length];
|
||||||
|
for (int i = 0; i < providers.length; i++) {
|
||||||
|
providers[i] =
|
||||||
|
new KMSClientProvider(uris[i], conf, URI.create(originalUri));
|
||||||
|
}
|
||||||
|
return new LoadBalancingKMSClientProvider(providers, conf);
|
||||||
|
}
|
||||||
|
|
||||||
private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf)
|
private KMSClientProvider createKMSClientProvider(URI uri, Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
final KMSClientProvider ret = new KMSClientProvider(uri, conf);
|
final KMSClientProvider ret = new KMSClientProvider(uri, conf, uri);
|
||||||
providersCreated.add(ret);
|
providersCreated.add(ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -170,24 +256,35 @@ public class TestKMS {
|
||||||
|
|
||||||
protected <T> T runServer(int port, String keystore, String password, File confDir,
|
protected <T> T runServer(int port, String keystore, String password, File confDir,
|
||||||
KMSCallable<T> callable) throws Exception {
|
KMSCallable<T> callable) throws Exception {
|
||||||
|
return runServer(new int[] {port}, keystore, password, confDir, callable);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected <T> T runServer(int[] ports, String keystore, String password,
|
||||||
|
File confDir, KMSCallable<T> callable) throws Exception {
|
||||||
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
|
MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
|
||||||
.setLog4jConfFile("log4j.properties");
|
.setLog4jConfFile("log4j.properties");
|
||||||
if (keystore != null) {
|
if (keystore != null) {
|
||||||
miniKMSBuilder.setSslConf(new File(keystore), password);
|
miniKMSBuilder.setSslConf(new File(keystore), password);
|
||||||
}
|
}
|
||||||
if (port > 0) {
|
final List<MiniKMS> kmsList = new ArrayList<>();
|
||||||
miniKMSBuilder.setPort(port);
|
for (int i=0; i< ports.length; i++) {
|
||||||
|
if (ports[i] > 0) {
|
||||||
|
miniKMSBuilder.setPort(ports[i]);
|
||||||
}
|
}
|
||||||
MiniKMS miniKMS = miniKMSBuilder.build();
|
MiniKMS miniKMS = miniKMSBuilder.build();
|
||||||
|
kmsList.add(miniKMS);
|
||||||
miniKMS.start();
|
miniKMS.start();
|
||||||
|
LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
|
||||||
|
callable.addKMSUrl(miniKMS.getKMSUrl());
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
|
|
||||||
callable.kmsUrl = miniKMS.getKMSUrl();
|
|
||||||
return callable.call();
|
return callable.call();
|
||||||
} finally {
|
} finally {
|
||||||
|
for (MiniKMS miniKMS: kmsList) {
|
||||||
miniKMS.stop();
|
miniKMS.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
|
protected Configuration createBaseKMSConf(File keyStoreDir) throws Exception {
|
||||||
return createBaseKMSConf(keyStoreDir, null);
|
return createBaseKMSConf(keyStoreDir, null);
|
||||||
|
@ -240,6 +337,13 @@ public class TestKMS {
|
||||||
return new URI("kms://" + str);
|
return new URI("kms://" + str);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
|
||||||
|
URI[] uris = new URI[kmsUrls.length];
|
||||||
|
for (int i = 0; i < kmsUrls.length; i++) {
|
||||||
|
uris[i] = createKMSUri(kmsUrls[i]);
|
||||||
|
}
|
||||||
|
return uris;
|
||||||
|
}
|
||||||
|
|
||||||
private static class KerberosConfiguration
|
private static class KerberosConfiguration
|
||||||
extends javax.security.auth.login.Configuration {
|
extends javax.security.auth.login.Configuration {
|
||||||
|
@ -315,19 +419,17 @@ public class TestKMS {
|
||||||
principals.toArray(new String[principals.size()]));
|
principals.toArray(new String[principals.size()]));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setUpMiniKdc() throws Exception {
|
@BeforeClass
|
||||||
|
public static void setUpMiniKdc() throws Exception {
|
||||||
Properties kdcConf = MiniKdc.createConf();
|
Properties kdcConf = MiniKdc.createConf();
|
||||||
setUpMiniKdc(kdcConf);
|
setUpMiniKdc(kdcConf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
if (kdc != null) {
|
|
||||||
kdc.stop();
|
|
||||||
kdc = null;
|
|
||||||
}
|
|
||||||
UserGroupInformation.setShouldRenewImmediatelyForTests(false);
|
UserGroupInformation.setShouldRenewImmediatelyForTests(false);
|
||||||
UserGroupInformation.reset();
|
UserGroupInformation.reset();
|
||||||
|
KMSUtilFaultInjector.set(oldInjector);
|
||||||
if (!providersCreated.isEmpty()) {
|
if (!providersCreated.isEmpty()) {
|
||||||
final MultipleIOException.Builder b = new MultipleIOException.Builder();
|
final MultipleIOException.Builder b = new MultipleIOException.Builder();
|
||||||
for (KeyProvider kp : providersCreated) {
|
for (KeyProvider kp : providersCreated) {
|
||||||
|
@ -345,6 +447,14 @@ public class TestKMS {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void shutdownMiniKdc() {
|
||||||
|
if (kdc != null) {
|
||||||
|
kdc.stop();
|
||||||
|
kdc = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
|
private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
UserGroupInformation.loginUserFromKeytab(user, keytab.getAbsolutePath());
|
UserGroupInformation.loginUserFromKeytab(user, keytab.getAbsolutePath());
|
||||||
|
@ -501,8 +611,10 @@ public class TestKMS {
|
||||||
Token<?>[] tokens =
|
Token<?>[] tokens =
|
||||||
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
|
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
|
||||||
.addDelegationTokens("myuser", new Credentials());
|
.addDelegationTokens("myuser", new Credentials());
|
||||||
Assert.assertEquals(1, tokens.length);
|
assertEquals(2, tokens.length);
|
||||||
Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
|
assertEquals(KMSDelegationToken.TOKEN_KIND,
|
||||||
|
tokens[0].getKind());
|
||||||
|
kp.close();
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -518,8 +630,9 @@ public class TestKMS {
|
||||||
Token<?>[] tokens =
|
Token<?>[] tokens =
|
||||||
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
|
((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
|
||||||
.addDelegationTokens("myuser", new Credentials());
|
.addDelegationTokens("myuser", new Credentials());
|
||||||
Assert.assertEquals(1, tokens.length);
|
assertEquals(2, tokens.length);
|
||||||
Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
|
assertEquals(KMSDelegationToken.TOKEN_KIND, tokens[0].getKind());
|
||||||
|
kp.close();
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -2011,7 +2124,6 @@ public class TestKMS {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
nonKerberosUgi.addCredentials(credentials);
|
nonKerberosUgi.addCredentials(credentials);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -2067,6 +2179,17 @@ public class TestKMS {
|
||||||
testDelegationTokensOps(true, true);
|
testDelegationTokensOps(true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private Text getTokenService(KeyProvider provider) {
|
||||||
|
assertTrue("KeyProvider should be an instance of KMSClientProvider",
|
||||||
|
(provider instanceof LoadBalancingKMSClientProvider));
|
||||||
|
assertEquals("Num client providers should be 1", 1,
|
||||||
|
((LoadBalancingKMSClientProvider)provider).getProviders().length);
|
||||||
|
Text tokenService =
|
||||||
|
(((LoadBalancingKMSClientProvider)provider).getProviders()[0])
|
||||||
|
.getDelegationTokenService();
|
||||||
|
return tokenService;
|
||||||
|
}
|
||||||
|
|
||||||
private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
|
private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
|
||||||
throws Exception {
|
throws Exception {
|
||||||
final File confDir = getTestDir();
|
final File confDir = getTestDir();
|
||||||
|
@ -2098,11 +2221,16 @@ public class TestKMS {
|
||||||
final URI uri = createKMSUri(getKMSUrl());
|
final URI uri = createKMSUri(getKMSUrl());
|
||||||
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||||
createKMSUri(getKMSUrl()).toString());
|
createKMSUri(getKMSUrl()).toString());
|
||||||
|
clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
|
||||||
|
|
||||||
doAs("client", new PrivilegedExceptionAction<Void>() {
|
doAs("client", new PrivilegedExceptionAction<Void>() {
|
||||||
@Override
|
@Override
|
||||||
public Void run() throws Exception {
|
public Void run() throws Exception {
|
||||||
KeyProvider kp = createProvider(uri, clientConf);
|
KeyProvider kp = createProvider(uri, clientConf);
|
||||||
|
// Unset the conf value for key provider path just to be sure that
|
||||||
|
// the key provider created for renew and cancel token is from
|
||||||
|
// token service field.
|
||||||
|
clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||||
// test delegation token retrieval
|
// test delegation token retrieval
|
||||||
KeyProviderDelegationTokenExtension kpdte =
|
KeyProviderDelegationTokenExtension kpdte =
|
||||||
KeyProviderDelegationTokenExtension.
|
KeyProviderDelegationTokenExtension.
|
||||||
|
@ -2110,13 +2238,10 @@ public class TestKMS {
|
||||||
final Credentials credentials = new Credentials();
|
final Credentials credentials = new Credentials();
|
||||||
final Token<?>[] tokens =
|
final Token<?>[] tokens =
|
||||||
kpdte.addDelegationTokens("client1", credentials);
|
kpdte.addDelegationTokens("client1", credentials);
|
||||||
Assert.assertEquals(1, credentials.getAllTokens().size());
|
Text tokenService = getTokenService(kp);
|
||||||
InetSocketAddress kmsAddr =
|
assertEquals(1, credentials.getAllTokens().size());
|
||||||
new InetSocketAddress(getKMSUrl().getHost(),
|
assertEquals(TOKEN_KIND,
|
||||||
getKMSUrl().getPort());
|
credentials.getToken(tokenService).getKind());
|
||||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
|
||||||
credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
|
||||||
getKind());
|
|
||||||
|
|
||||||
// Test non-renewer user cannot renew.
|
// Test non-renewer user cannot renew.
|
||||||
for (Token<?> token : tokens) {
|
for (Token<?> token : tokens) {
|
||||||
|
@ -2243,12 +2368,11 @@ public class TestKMS {
|
||||||
final URI uri = createKMSUri(getKMSUrl());
|
final URI uri = createKMSUri(getKMSUrl());
|
||||||
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
|
||||||
createKMSUri(getKMSUrl()).toString());
|
createKMSUri(getKMSUrl()).toString());
|
||||||
|
clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
|
||||||
final KeyProvider kp = createProvider(uri, clientConf);
|
final KeyProvider kp = createProvider(uri, clientConf);
|
||||||
final KeyProviderDelegationTokenExtension kpdte =
|
final KeyProviderDelegationTokenExtension kpdte =
|
||||||
KeyProviderDelegationTokenExtension.
|
KeyProviderDelegationTokenExtension.
|
||||||
createKeyProviderDelegationTokenExtension(kp);
|
createKeyProviderDelegationTokenExtension(kp);
|
||||||
final InetSocketAddress kmsAddr =
|
|
||||||
new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort());
|
|
||||||
|
|
||||||
// Job 1 (e.g. YARN log aggregation job), with user DT.
|
// Job 1 (e.g. YARN log aggregation job), with user DT.
|
||||||
final Collection<Token<?>> job1Token = new HashSet<>();
|
final Collection<Token<?>> job1Token = new HashSet<>();
|
||||||
|
@ -2258,16 +2382,17 @@ public class TestKMS {
|
||||||
// Get a DT and use it.
|
// Get a DT and use it.
|
||||||
final Credentials credentials = new Credentials();
|
final Credentials credentials = new Credentials();
|
||||||
kpdte.addDelegationTokens("client", credentials);
|
kpdte.addDelegationTokens("client", credentials);
|
||||||
|
Text tokenService = getTokenService(kp);
|
||||||
Assert.assertEquals(1, credentials.getAllTokens().size());
|
Assert.assertEquals(1, credentials.getAllTokens().size());
|
||||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
|
|
||||||
getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
|
|
||||||
UserGroupInformation.getCurrentUser().addCredentials(credentials);
|
UserGroupInformation.getCurrentUser().addCredentials(credentials);
|
||||||
LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
|
LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
|
||||||
getCurrentUser().getCredentials().getAllTokens());
|
getCurrentUser().getCredentials().getAllTokens());
|
||||||
Token<?> token =
|
final Token<?> token =
|
||||||
UserGroupInformation.getCurrentUser().getCredentials()
|
UserGroupInformation.getCurrentUser().getCredentials()
|
||||||
.getToken(SecurityUtil.buildTokenService(kmsAddr));
|
.getToken(tokenService);
|
||||||
Assert.assertNotNull(token);
|
assertNotNull(token);
|
||||||
|
assertEquals(TOKEN_KIND, token.getKind());
|
||||||
job1Token.add(token);
|
job1Token.add(token);
|
||||||
|
|
||||||
// Decode the token to get max time.
|
// Decode the token to get max time.
|
||||||
|
@ -2302,17 +2427,16 @@ public class TestKMS {
|
||||||
// Get a new DT, but don't use it yet.
|
// Get a new DT, but don't use it yet.
|
||||||
final Credentials newCreds = new Credentials();
|
final Credentials newCreds = new Credentials();
|
||||||
kpdte.addDelegationTokens("client", newCreds);
|
kpdte.addDelegationTokens("client", newCreds);
|
||||||
Assert.assertEquals(1, newCreds.getAllTokens().size());
|
assertEquals(1, newCreds.getAllTokens().size());
|
||||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
final Text tokenService = getTokenService(kp);
|
||||||
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
assertEquals(TOKEN_KIND,
|
||||||
getKind());
|
newCreds.getToken(tokenService).getKind());
|
||||||
|
|
||||||
// Using job 1's DT should fail.
|
// Using job 1's DT should fail.
|
||||||
final Credentials oldCreds = new Credentials();
|
final Credentials oldCreds = new Credentials();
|
||||||
for (Token<?> token : job1Token) {
|
for (Token<?> token : job1Token) {
|
||||||
if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
|
if (token.getKind().equals(TOKEN_KIND)) {
|
||||||
oldCreds
|
oldCreds.addToken(tokenService, token);
|
||||||
.addToken(SecurityUtil.buildTokenService(kmsAddr), token);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
|
UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
|
||||||
|
@ -2326,12 +2450,11 @@ public class TestKMS {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Using the new DT should succeed.
|
// Using the new DT should succeed.
|
||||||
Assert.assertEquals(1, newCreds.getAllTokens().size());
|
assertEquals(1, newCreds.getAllTokens().size());
|
||||||
Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
|
assertEquals(TOKEN_KIND,
|
||||||
newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
|
newCreds.getToken(tokenService).getKind());
|
||||||
getKind());
|
|
||||||
UserGroupInformation.getCurrentUser().addCredentials(newCreds);
|
UserGroupInformation.getCurrentUser().addCredentials(newCreds);
|
||||||
LOG.info("Credetials now are: {}", UserGroupInformation
|
LOG.info("Credentials now are: {}", UserGroupInformation
|
||||||
.getCurrentUser().getCredentials().getAllTokens());
|
.getCurrentUser().getCredentials().getAllTokens());
|
||||||
kp.getKeys();
|
kp.getKeys();
|
||||||
return null;
|
return null;
|
||||||
|
@ -2357,7 +2480,13 @@ public class TestKMS {
|
||||||
doKMSWithZK(true, true);
|
doKMSWithZK(true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
|
||||||
|
KMSCallable<T> callable) throws Exception {
|
||||||
|
return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
|
||||||
|
KMSCallable<T> callable, int kmsSize) throws Exception {
|
||||||
TestingServer zkServer = null;
|
TestingServer zkServer = null;
|
||||||
try {
|
try {
|
||||||
zkServer = new TestingServer();
|
zkServer = new TestingServer();
|
||||||
|
@ -2403,6 +2532,20 @@ public class TestKMS {
|
||||||
|
|
||||||
writeConf(testDir, conf);
|
writeConf(testDir, conf);
|
||||||
|
|
||||||
|
int[] ports = new int[kmsSize];
|
||||||
|
for (int i = 0; i < ports.length; i++) {
|
||||||
|
ports[i] = -1;
|
||||||
|
}
|
||||||
|
return runServer(ports, null, null, testDir, callable);
|
||||||
|
} finally {
|
||||||
|
if (zkServer != null) {
|
||||||
|
zkServer.stop();
|
||||||
|
zkServer.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
|
||||||
KMSCallable<KeyProvider> c =
|
KMSCallable<KeyProvider> c =
|
||||||
new KMSCallable<KeyProvider>() {
|
new KMSCallable<KeyProvider>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -2430,16 +2573,224 @@ public class TestKMS {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
runServer(null, null, testDir, c);
|
runServerWithZooKeeper(zkDTSM, zkSigner, c);
|
||||||
} finally {
|
|
||||||
if (zkServer != null) {
|
|
||||||
zkServer.stop();
|
|
||||||
zkServer.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void doKMSHAZKWithDelegationTokenAccess() throws Exception {
|
||||||
|
KMSCallable<Void> c = new KMSCallable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
|
||||||
|
final URI[] uris = createKMSHAUri(getKMSHAUrl());
|
||||||
|
final Credentials credentials = new Credentials();
|
||||||
|
final String lbUri = generateLoadBalancingKeyProviderUriString();
|
||||||
|
final LoadBalancingKMSClientProvider lbkp =
|
||||||
|
createHAProvider(uris, conf, lbUri);
|
||||||
|
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||||
|
// Login as a Kerberos user principal using keytab.
|
||||||
|
// Connect to KMS to create a delegation token and add it to credentials
|
||||||
|
final String keyName = "k0";
|
||||||
|
doAs("SET_KEY_MATERIAL",
|
||||||
|
new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
KeyProviderDelegationTokenExtension kpdte =
|
||||||
|
KeyProviderDelegationTokenExtension.
|
||||||
|
createKeyProviderDelegationTokenExtension(lbkp);
|
||||||
|
kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
|
||||||
|
kpdte.createKey(keyName, new KeyProvider.Options(conf));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
assertTokenIdentifierEquals(credentials);
|
||||||
|
|
||||||
|
final LoadBalancingKMSClientProvider lbkp1 =
|
||||||
|
createHAProvider(uris, conf, lbUri);
|
||||||
|
// verify both tokens can be used to authenticate
|
||||||
|
for (Token t : credentials.getAllTokens()) {
|
||||||
|
assertTokenAccess(lbkp1, keyName, t);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
runServerWithZooKeeper(true, true, c, 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that the passed in credentials have 2 tokens, of kind
|
||||||
|
* {@link KMSDelegationToken#TOKEN_KIND} and
|
||||||
|
* {@link KMSDelegationToken#TOKEN_LEGACY_KIND}. Assert that the 2 tokens have
|
||||||
|
* the same identifier.
|
||||||
|
*/
|
||||||
|
private void assertTokenIdentifierEquals(Credentials credentials)
|
||||||
|
throws IOException {
|
||||||
|
// verify the 2 tokens have the same identifier
|
||||||
|
assertEquals(2, credentials.getAllTokens().size());
|
||||||
|
Token token = null;
|
||||||
|
Token legacyToken = null;
|
||||||
|
for (Token t : credentials.getAllTokens()) {
|
||||||
|
if (KMSDelegationToken.TOKEN_KIND.equals(t.getKind())) {
|
||||||
|
token = t;
|
||||||
|
} else if (KMSDelegationToken.TOKEN_LEGACY_KIND.equals(t.getKind())) {
|
||||||
|
legacyToken = t;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertNotNull(token);
|
||||||
|
assertNotNull(legacyToken);
|
||||||
|
final DelegationTokenIdentifier tokenId =
|
||||||
|
(DelegationTokenIdentifier) token.decodeIdentifier();
|
||||||
|
final DelegationTokenIdentifier legacyTokenId =
|
||||||
|
(DelegationTokenIdentifier) legacyToken.decodeIdentifier();
|
||||||
|
assertEquals("KMS DT and legacy dt should have identical identifier",
|
||||||
|
tokenId, legacyTokenId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests token access with each providers in the
|
||||||
|
* {@link LoadBalancingKMSClientProvider}. This is to make sure the 2 token
|
||||||
|
* kinds are compatible and can both be used to authenticate.
|
||||||
|
*/
|
||||||
|
private void assertTokenAccess(final LoadBalancingKMSClientProvider lbkp,
|
||||||
|
final String keyName, final Token token) throws Exception {
|
||||||
|
UserGroupInformation tokenUgi =
|
||||||
|
UserGroupInformation.createUserForTesting("test", new String[] {});
|
||||||
|
// Verify the tokens can authenticate to any KMS
|
||||||
|
tokenUgi.addToken(token);
|
||||||
|
tokenUgi.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
// Create a kms client with one provider at a time. Must use one
|
||||||
|
// provider so that if it fails to authenticate, it does not fall
|
||||||
|
// back to the next KMS instance.
|
||||||
|
// It should succeed because its delegation token can access any
|
||||||
|
// KMS instances.
|
||||||
|
for (KMSClientProvider provider : lbkp.getProviders()) {
|
||||||
|
if (token.getKind().equals(TOKEN_LEGACY_KIND) && !token.getService()
|
||||||
|
.equals(provider.getDelegationTokenService())) {
|
||||||
|
// Historically known issue: Legacy token can only work with the
|
||||||
|
// key provider specified in the token's Service
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("Rolling key {} via provider {} with token {}.", keyName,
|
||||||
|
provider, token);
|
||||||
|
provider.rollNewVersion(keyName);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKMSHAZKDelegationTokenRenewCancel() throws Exception {
|
||||||
|
testKMSHAZKDelegationTokenRenewCancel(TOKEN_KIND);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testKMSHAZKDelegationTokenRenewCancelLegacy() throws Exception {
|
||||||
|
testKMSHAZKDelegationTokenRenewCancel(TOKEN_LEGACY_KIND);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testKMSHAZKDelegationTokenRenewCancel(final Text tokenKind)
|
||||||
|
throws Exception {
|
||||||
|
GenericTestUtils.setLogLevel(KMSTokenRenewer.LOG, Level.TRACE);
|
||||||
|
assertTrue(tokenKind == TOKEN_KIND || tokenKind == TOKEN_LEGACY_KIND);
|
||||||
|
KMSCallable<Void> c = new KMSCallable<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void call() throws Exception {
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
final URI[] uris = createKMSHAUri(getKMSHAUrl());
|
||||||
|
final Credentials credentials = new Credentials();
|
||||||
|
// Create a UGI without Kerberos auth. It will be authenticated with
|
||||||
|
// delegation token.
|
||||||
|
final UserGroupInformation nonKerberosUgi =
|
||||||
|
UserGroupInformation.getCurrentUser();
|
||||||
|
final String lbUri = generateLoadBalancingKeyProviderUriString();
|
||||||
|
final LoadBalancingKMSClientProvider lbkp =
|
||||||
|
createHAProvider(uris, conf, lbUri);
|
||||||
|
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||||
|
// Login as a Kerberos user principal using keytab.
|
||||||
|
// Connect to KMS to create a delegation token and add it to credentials
|
||||||
|
doAs("SET_KEY_MATERIAL",
|
||||||
|
new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
KeyProviderDelegationTokenExtension kpdte =
|
||||||
|
KeyProviderDelegationTokenExtension.
|
||||||
|
createKeyProviderDelegationTokenExtension(lbkp);
|
||||||
|
kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Test token renewal and cancellation
|
||||||
|
final Collection<Token<? extends TokenIdentifier>> tokens =
|
||||||
|
credentials.getAllTokens();
|
||||||
|
doAs("SET_KEY_MATERIAL", new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
Assert.assertEquals(2, tokens.size());
|
||||||
|
boolean tokenFound = false;
|
||||||
|
for (Token token : tokens) {
|
||||||
|
if (!tokenKind.equals(token.getKind())) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
tokenFound = true;
|
||||||
|
}
|
||||||
|
KMSUtilFaultInjector.set(testInjector);
|
||||||
|
setupConfForToken(token.getKind(), conf, lbUri);
|
||||||
|
|
||||||
|
LOG.info("Testing token: {}", token);
|
||||||
|
long tokenLife = token.renew(conf);
|
||||||
|
LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
|
||||||
|
Thread.sleep(10);
|
||||||
|
long newTokenLife = token.renew(conf);
|
||||||
|
LOG.info("Renewed token {}, new lifetime:{}", token,
|
||||||
|
newTokenLife);
|
||||||
|
assertTrue(newTokenLife > tokenLife);
|
||||||
|
|
||||||
|
boolean canceled = false;
|
||||||
|
// test delegation token cancellation
|
||||||
|
if (!canceled) {
|
||||||
|
token.cancel(conf);
|
||||||
|
LOG.info("Cancelled token {}", token);
|
||||||
|
canceled = true;
|
||||||
|
}
|
||||||
|
assertTrue("token should have been canceled", canceled);
|
||||||
|
try {
|
||||||
|
token.renew(conf);
|
||||||
|
fail("should not be able to renew a canceled token " + token);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Expected exception when renewing token", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertTrue("Should have found token kind " + tokenKind + " from "
|
||||||
|
+ tokens, tokenFound);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
runServerWithZooKeeper(true, true, c, 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set or unset the key provider configuration based on token kind.
|
||||||
|
*/
|
||||||
|
private void setupConfForToken(Text tokenKind, Configuration conf,
|
||||||
|
String lbUri) {
|
||||||
|
if (tokenKind.equals(TOKEN_KIND)) {
|
||||||
|
conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
|
||||||
|
} else {
|
||||||
|
// conf is only required for legacy tokens to create provider,
|
||||||
|
// new tokens create provider by parsing its own Service field
|
||||||
|
assertEquals(TOKEN_LEGACY_KIND, tokenKind);
|
||||||
|
conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testProxyUserKerb() throws Exception {
|
public void testProxyUserKerb() throws Exception {
|
||||||
|
@ -2558,6 +2909,16 @@ public class TestKMS {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTGTRenewal() throws Exception {
|
public void testTGTRenewal() throws Exception {
|
||||||
|
shutdownMiniKdc();
|
||||||
|
try {
|
||||||
|
testTgtRenewalInt();
|
||||||
|
} finally {
|
||||||
|
shutdownMiniKdc();
|
||||||
|
setUpMiniKdc();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testTgtRenewalInt() throws Exception {
|
||||||
Properties kdcConf = MiniKdc.createConf();
|
Properties kdcConf = MiniKdc.createConf();
|
||||||
kdcConf.setProperty(MiniKdc.MAX_TICKET_LIFETIME, "3");
|
kdcConf.setProperty(MiniKdc.MAX_TICKET_LIFETIME, "3");
|
||||||
kdcConf.setProperty(MiniKdc.MIN_TICKET_LIFETIME, "3");
|
kdcConf.setProperty(MiniKdc.MIN_TICKET_LIFETIME, "3");
|
||||||
|
|
Loading…
Reference in New Issue