HADOOP-14640. Azure: Support affinity for service running on localhost and reuse SPNEGO hadoop.auth cookie for authorization, SASKey and delegation token generation. Contributed by Santhosh G Nayak.
This commit is contained in:
parent
fb3b5d33ff
commit
b0e78ae085
|
@ -105,10 +105,11 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
|
||||||
*/
|
*/
|
||||||
private static final String
|
private static final String
|
||||||
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
||||||
"1000,3,10000,2";
|
"10,3,100,2";
|
||||||
|
|
||||||
private WasbRemoteCallHelper remoteCallHelper = null;
|
private WasbRemoteCallHelper remoteCallHelper = null;
|
||||||
private boolean isKerberosSupportEnabled;
|
private boolean isKerberosSupportEnabled;
|
||||||
|
private boolean isSpnegoTokenCacheEnabled;
|
||||||
private RetryPolicy retryPolicy;
|
private RetryPolicy retryPolicy;
|
||||||
private String[] commaSeparatedUrls;
|
private String[] commaSeparatedUrls;
|
||||||
|
|
||||||
|
@ -127,13 +128,16 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
|
||||||
|
|
||||||
this.isKerberosSupportEnabled =
|
this.isKerberosSupportEnabled =
|
||||||
conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
||||||
|
this.isSpnegoTokenCacheEnabled =
|
||||||
|
conf.getBoolean(Constants.AZURE_ENABLE_SPNEGO_TOKEN_CACHE, true);
|
||||||
this.commaSeparatedUrls = conf.getTrimmedStrings(KEY_CRED_SERVICE_URLS);
|
this.commaSeparatedUrls = conf.getTrimmedStrings(KEY_CRED_SERVICE_URLS);
|
||||||
if (this.commaSeparatedUrls == null || this.commaSeparatedUrls.length <= 0) {
|
if (this.commaSeparatedUrls == null || this.commaSeparatedUrls.length <= 0) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
KEY_CRED_SERVICE_URLS + " config not set" + " in configuration.");
|
KEY_CRED_SERVICE_URLS + " config not set" + " in configuration.");
|
||||||
}
|
}
|
||||||
if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
|
if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
|
||||||
this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false);
|
this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false,
|
||||||
|
isSpnegoTokenCacheEnabled);
|
||||||
} else {
|
} else {
|
||||||
this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
|
this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,10 +93,11 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
|
||||||
* Authorization Remote http client retry policy spec default value. {@value}
|
* Authorization Remote http client retry policy spec default value. {@value}
|
||||||
*/
|
*/
|
||||||
private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
||||||
"1000,3,10000,2";
|
"10,3,100,2";
|
||||||
|
|
||||||
private WasbRemoteCallHelper remoteCallHelper = null;
|
private WasbRemoteCallHelper remoteCallHelper = null;
|
||||||
private boolean isKerberosSupportEnabled;
|
private boolean isKerberosSupportEnabled;
|
||||||
|
private boolean isSpnegoTokenCacheEnabled;
|
||||||
private RetryPolicy retryPolicy;
|
private RetryPolicy retryPolicy;
|
||||||
private String[] commaSeparatedUrls = null;
|
private String[] commaSeparatedUrls = null;
|
||||||
|
|
||||||
|
@ -111,6 +112,8 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
|
||||||
LOG.debug("Initializing RemoteWasbAuthorizerImpl instance");
|
LOG.debug("Initializing RemoteWasbAuthorizerImpl instance");
|
||||||
this.isKerberosSupportEnabled =
|
this.isKerberosSupportEnabled =
|
||||||
conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
|
||||||
|
this.isSpnegoTokenCacheEnabled =
|
||||||
|
conf.getBoolean(Constants.AZURE_ENABLE_SPNEGO_TOKEN_CACHE, true);
|
||||||
this.commaSeparatedUrls =
|
this.commaSeparatedUrls =
|
||||||
conf.getTrimmedStrings(KEY_REMOTE_AUTH_SERVICE_URLS);
|
conf.getTrimmedStrings(KEY_REMOTE_AUTH_SERVICE_URLS);
|
||||||
if (this.commaSeparatedUrls == null
|
if (this.commaSeparatedUrls == null
|
||||||
|
@ -123,7 +126,8 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
|
||||||
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_SPEC,
|
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_SPEC,
|
||||||
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
||||||
if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
|
if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
|
||||||
this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false);
|
this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false,
|
||||||
|
isSpnegoTokenCacheEnabled);
|
||||||
} else {
|
} else {
|
||||||
this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
|
this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,9 +6,9 @@
|
||||||
* to you under the Apache License, Version 2.0 (the
|
* to you under the Apache License, Version 2.0 (the
|
||||||
* "License"); you may not use this file except in compliance
|
* "License"); you may not use this file except in compliance
|
||||||
* with the License. You may obtain a copy of the License at
|
* with the License. You may obtain a copy of the License at
|
||||||
* <p>
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
* <p>
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azure;
|
||||||
|
|
||||||
import org.apache.commons.lang.Validate;
|
import org.apache.commons.lang.Validate;
|
||||||
import org.apache.hadoop.fs.azure.security.Constants;
|
import org.apache.hadoop.fs.azure.security.Constants;
|
||||||
|
import org.apache.hadoop.fs.azure.security.SpnegoToken;
|
||||||
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier;
|
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -39,6 +40,7 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -69,10 +71,21 @@ public class SecureWasbRemoteCallHelper extends WasbRemoteCallHelper {
|
||||||
*/
|
*/
|
||||||
private boolean alwaysRequiresKerberosAuth;
|
private boolean alwaysRequiresKerberosAuth;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable caching of Spnego token.
|
||||||
|
*/
|
||||||
|
private boolean isSpnegoTokenCachingEnabled;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cached SPNEGO token.
|
||||||
|
*/
|
||||||
|
private SpnegoToken spnegoToken;
|
||||||
|
|
||||||
public SecureWasbRemoteCallHelper(RetryPolicy retryPolicy,
|
public SecureWasbRemoteCallHelper(RetryPolicy retryPolicy,
|
||||||
boolean alwaysRequiresKerberosAuth) {
|
boolean alwaysRequiresKerberosAuth, boolean isSpnegoTokenCachingEnabled) {
|
||||||
super(retryPolicy);
|
super(retryPolicy);
|
||||||
this.alwaysRequiresKerberosAuth = alwaysRequiresKerberosAuth;
|
this.alwaysRequiresKerberosAuth = alwaysRequiresKerberosAuth;
|
||||||
|
this.isSpnegoTokenCachingEnabled = isSpnegoTokenCachingEnabled;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -81,9 +94,35 @@ public class SecureWasbRemoteCallHelper extends WasbRemoteCallHelper {
|
||||||
final String httpMethod) throws IOException {
|
final String httpMethod) throws IOException {
|
||||||
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
UserGroupInformation connectUgi = ugi.getRealUser();
|
||||||
if (connectUgi == null) {
|
if (connectUgi != null) {
|
||||||
|
queryParams.add(new NameValuePair() {
|
||||||
|
@Override public String getName() {
|
||||||
|
return Constants.DOAS_PARAM;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String getValue() {
|
||||||
|
return ugi.getShortUserName();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
connectUgi = ugi;
|
connectUgi = ugi;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final Token delegationToken = getDelegationToken(ugi);
|
||||||
|
if (!alwaysRequiresKerberosAuth && delegationToken != null) {
|
||||||
|
final String delegationTokenEncodedUrlString =
|
||||||
|
delegationToken.encodeToUrlString();
|
||||||
|
queryParams.add(new NameValuePair() {
|
||||||
|
@Override public String getName() {
|
||||||
|
return DELEGATION_TOKEN_QUERY_PARAM_NAME;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String getValue() {
|
||||||
|
return delegationTokenEncodedUrlString;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
if (delegationToken == null) {
|
if (delegationToken == null) {
|
||||||
connectUgi.checkTGTAndReloginFromKeytab();
|
connectUgi.checkTGTAndReloginFromKeytab();
|
||||||
}
|
}
|
||||||
|
@ -103,39 +142,13 @@ public class SecureWasbRemoteCallHelper extends WasbRemoteCallHelper {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public HttpUriRequest getHttpRequest(String[] urls, String path,
|
public HttpUriRequest getHttpRequest(String[] urls, String path,
|
||||||
List<NameValuePair> queryParams, int urlIndex, String httpMethod)
|
List<NameValuePair> queryParams, int urlIndex, String httpMethod,
|
||||||
throws URISyntaxException, IOException {
|
boolean requiresNewAuth) throws URISyntaxException, IOException {
|
||||||
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
|
||||||
UserGroupInformation connectUgi = ugi.getRealUser();
|
|
||||||
if (connectUgi != null) {
|
|
||||||
queryParams.add(new NameValuePair() {
|
|
||||||
@Override public String getName() {
|
|
||||||
return Constants.DOAS_PARAM;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public String getValue() {
|
|
||||||
return ugi.getShortUserName();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
final Token delegationToken = getDelegationToken(ugi);
|
|
||||||
if (!alwaysRequiresKerberosAuth && delegationToken != null) {
|
|
||||||
final String delegationTokenEncodedUrlString =
|
|
||||||
delegationToken.encodeToUrlString();
|
|
||||||
queryParams.add(new NameValuePair() {
|
|
||||||
@Override public String getName() {
|
|
||||||
return DELEGATION_TOKEN_QUERY_PARAM_NAME;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override public String getValue() {
|
|
||||||
return delegationTokenEncodedUrlString;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
URIBuilder uriBuilder =
|
URIBuilder uriBuilder =
|
||||||
new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
|
new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
|
||||||
|
if (uriBuilder.getHost().equals("localhost")) {
|
||||||
|
uriBuilder.setHost(InetAddress.getLocalHost().getCanonicalHostName());
|
||||||
|
}
|
||||||
HttpUriRequest httpUriRequest = null;
|
HttpUriRequest httpUriRequest = null;
|
||||||
switch (httpMethod) {
|
switch (httpMethod) {
|
||||||
case HttpPut.METHOD_NAME:
|
case HttpPut.METHOD_NAME:
|
||||||
|
@ -152,11 +165,18 @@ public class SecureWasbRemoteCallHelper extends WasbRemoteCallHelper {
|
||||||
LOG.debug("SecureWasbRemoteCallHelper#getHttpRequest() {}",
|
LOG.debug("SecureWasbRemoteCallHelper#getHttpRequest() {}",
|
||||||
uriBuilder.build().toURL());
|
uriBuilder.build().toURL());
|
||||||
if (alwaysRequiresKerberosAuth || delegationToken == null) {
|
if (alwaysRequiresKerberosAuth || delegationToken == null) {
|
||||||
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
|
AuthenticatedURL.Token token = null;
|
||||||
final Authenticator kerberosAuthenticator =
|
final Authenticator kerberosAuthenticator =
|
||||||
new KerberosDelegationTokenAuthenticator();
|
new KerberosDelegationTokenAuthenticator();
|
||||||
try {
|
try {
|
||||||
|
if (isSpnegoTokenCachingEnabled && !requiresNewAuth
|
||||||
|
&& spnegoToken != null && spnegoToken.isTokenValid()){
|
||||||
|
token = spnegoToken.getToken();
|
||||||
|
} else {
|
||||||
|
token = new AuthenticatedURL.Token();
|
||||||
kerberosAuthenticator.authenticate(uriBuilder.build().toURL(), token);
|
kerberosAuthenticator.authenticate(uriBuilder.build().toURL(), token);
|
||||||
|
spnegoToken = new SpnegoToken(token);
|
||||||
|
}
|
||||||
} catch (AuthenticationException e) {
|
} catch (AuthenticationException e) {
|
||||||
throw new WasbRemoteCallException(
|
throw new WasbRemoteCallException(
|
||||||
Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE, e);
|
Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE, e);
|
||||||
|
@ -170,7 +190,7 @@ public class SecureWasbRemoteCallHelper extends WasbRemoteCallHelper {
|
||||||
return httpUriRequest;
|
return httpUriRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized Token<?> getDelegationToken(
|
private Token<?> getDelegationToken(
|
||||||
UserGroupInformation userGroupInformation) throws IOException {
|
UserGroupInformation userGroupInformation) throws IOException {
|
||||||
if (this.delegationToken == null) {
|
if (this.delegationToken == null) {
|
||||||
Token<?> token = null;
|
Token<?> token = null;
|
||||||
|
|
|
@ -6,9 +6,9 @@
|
||||||
* to you under the Apache License, Version 2.0 (the
|
* to you under the Apache License, Version 2.0 (the
|
||||||
* "License"); you may not use this file except in compliance
|
* "License"); you may not use this file except in compliance
|
||||||
* with the License. You may obtain a copy of the License at
|
* with the License. You may obtain a copy of the License at
|
||||||
* <p>
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
* <p>
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -40,6 +40,7 @@ import java.io.BufferedReader;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
|
import java.net.InetAddress;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -84,8 +85,7 @@ public class WasbRemoteCallHelper {
|
||||||
this.retryPolicy = retryPolicy;
|
this.retryPolicy = retryPolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting public void updateHttpClient(HttpClient client) {
|
||||||
public void updateHttpClient(HttpClient client) {
|
|
||||||
this.client = client;
|
this.client = client;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,25 +111,57 @@ public class WasbRemoteCallHelper {
|
||||||
HttpResponse response = null;
|
HttpResponse response = null;
|
||||||
HttpUriRequest httpRequest = null;
|
HttpUriRequest httpRequest = null;
|
||||||
|
|
||||||
for (int retry = 0, index =
|
/**
|
||||||
random.nextInt(urls.length);; retry++, index++) {
|
* Get the index of local url if any. If list of urls contains strings like
|
||||||
|
* "https://localhost:" or "http://localhost", consider it as local url and
|
||||||
|
* give it affinity more than other urls in the list.
|
||||||
|
*/
|
||||||
|
|
||||||
|
int indexOfLocalUrl = -1;
|
||||||
|
for (int i = 0; i < urls.length; i++) {
|
||||||
|
if (urls[i].toLowerCase().startsWith("https://localhost:") || urls[i]
|
||||||
|
.toLowerCase().startsWith("http://localhost:")) {
|
||||||
|
indexOfLocalUrl = i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean requiresNewAuth = false;
|
||||||
|
for (int retry = 0, index = (indexOfLocalUrl != -1)
|
||||||
|
? indexOfLocalUrl
|
||||||
|
: random
|
||||||
|
.nextInt(urls.length);; retry++, index++) {
|
||||||
if (index >= urls.length) {
|
if (index >= urls.length) {
|
||||||
index = index % urls.length;
|
index = index % urls.length;
|
||||||
}
|
}
|
||||||
|
/**
|
||||||
|
* If the first request fails to localhost, then randomly pick the next url
|
||||||
|
* from the remaining urls in the list, so that load can be balanced.
|
||||||
|
*/
|
||||||
|
if (indexOfLocalUrl != -1 && retry == 1) {
|
||||||
|
index = (index + random.nextInt(urls.length)) % urls.length;
|
||||||
|
if (index == indexOfLocalUrl) {
|
||||||
|
index = (index + 1) % urls.length;
|
||||||
|
}
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
httpRequest =
|
httpRequest =
|
||||||
getHttpRequest(urls, path, queryParams, index, httpMethod);
|
getHttpRequest(urls, path, queryParams, index, httpMethod,
|
||||||
|
requiresNewAuth);
|
||||||
httpRequest.setHeader("Accept", APPLICATION_JSON);
|
httpRequest.setHeader("Accept", APPLICATION_JSON);
|
||||||
response = client.execute(httpRequest);
|
response = client.execute(httpRequest);
|
||||||
StatusLine statusLine = response.getStatusLine();
|
StatusLine statusLine = response.getStatusLine();
|
||||||
if (statusLine == null
|
if (statusLine == null
|
||||||
|| statusLine.getStatusCode() != HttpStatus.SC_OK) {
|
|| statusLine.getStatusCode() != HttpStatus.SC_OK) {
|
||||||
|
requiresNewAuth =
|
||||||
|
(statusLine == null)
|
||||||
|
|| (statusLine.getStatusCode() == HttpStatus.SC_UNAUTHORIZED);
|
||||||
|
|
||||||
throw new WasbRemoteCallException(
|
throw new WasbRemoteCallException(
|
||||||
httpRequest.getURI().toString() + ":" + ((statusLine != null)
|
httpRequest.getURI().toString() + ":" + ((statusLine != null)
|
||||||
? statusLine.toString()
|
? statusLine.toString()
|
||||||
: "NULL"));
|
: "NULL"));
|
||||||
|
} else {
|
||||||
|
requiresNewAuth = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
Header contentTypeHeader = response.getFirstHeader("Content-Type");
|
Header contentTypeHeader = response.getFirstHeader("Content-Type");
|
||||||
|
@ -200,11 +232,14 @@ public class WasbRemoteCallHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected HttpUriRequest getHttpRequest(String[] urls, String path,
|
protected HttpUriRequest getHttpRequest(String[] urls, String path,
|
||||||
List<NameValuePair> queryParams, int urlIndex, String httpMethod)
|
List<NameValuePair> queryParams, int urlIndex, String httpMethod,
|
||||||
throws URISyntaxException, IOException {
|
boolean requiresNewAuth) throws URISyntaxException, IOException {
|
||||||
URIBuilder uriBuilder = null;
|
URIBuilder uriBuilder = null;
|
||||||
uriBuilder =
|
uriBuilder =
|
||||||
new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
|
new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
|
||||||
|
if (uriBuilder.getHost().equals("localhost")) {
|
||||||
|
uriBuilder.setHost(InetAddress.getLocalHost().getCanonicalHostName());
|
||||||
|
}
|
||||||
HttpUriRequest httpUriRequest = null;
|
HttpUriRequest httpUriRequest = null;
|
||||||
switch (httpMethod) {
|
switch (httpMethod) {
|
||||||
case HttpPut.METHOD_NAME:
|
case HttpPut.METHOD_NAME:
|
||||||
|
|
|
@ -23,22 +23,27 @@ package org.apache.hadoop.fs.azure.security;
|
||||||
*/
|
*/
|
||||||
public final class Constants {
|
public final class Constants {
|
||||||
|
|
||||||
private Constants() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The configuration property to enable Kerberos support.
|
* The configuration property to enable Kerberos support.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
public static final String AZURE_KERBEROS_SUPPORT_PROPERTY_NAME = "fs.azure.enable.kerberos.support";
|
public static final String AZURE_KERBEROS_SUPPORT_PROPERTY_NAME =
|
||||||
|
"fs.azure.enable.kerberos.support";
|
||||||
|
/**
|
||||||
|
* The configuration property to enable SPNEGO token cache.
|
||||||
|
*/
|
||||||
|
public static final String AZURE_ENABLE_SPNEGO_TOKEN_CACHE =
|
||||||
|
"fs.azure.enable.spnego.token.cache";
|
||||||
/**
|
/**
|
||||||
* Parameter to be used for impersonation.
|
* Parameter to be used for impersonation.
|
||||||
*/
|
*/
|
||||||
public static final String DOAS_PARAM = "doas";
|
public static final String DOAS_PARAM = "doas";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Error message for Authentication failures.
|
* Error message for Authentication failures.
|
||||||
*/
|
*/
|
||||||
public static final String AUTHENTICATION_FAILED_ERROR_MESSAGE = "Authentication Failed ";
|
public static final String AUTHENTICATION_FAILED_ERROR_MESSAGE =
|
||||||
|
"Authentication Failed ";
|
||||||
|
|
||||||
|
private Constants() {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,9 +6,9 @@
|
||||||
* to you under the Apache License, Version 2.0 (the
|
* to you under the Apache License, Version 2.0 (the
|
||||||
* "License"); you may not use this file except in compliance
|
* "License"); you may not use this file except in compliance
|
||||||
* with the License. You may obtain a copy of the License at
|
* with the License. You may obtain a copy of the License at
|
||||||
* <p>
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
* <p>
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
@ -64,7 +64,7 @@ public class RemoteWasbDelegationTokenManager
|
||||||
* Default for delegation token service http retry policy spec.
|
* Default for delegation token service http retry policy spec.
|
||||||
*/
|
*/
|
||||||
private static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
private static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
|
||||||
"1000,3,10000,2";
|
"10,3,100,2";
|
||||||
|
|
||||||
private static final boolean
|
private static final boolean
|
||||||
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = true;
|
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = true;
|
||||||
|
@ -77,11 +77,13 @@ public class RemoteWasbDelegationTokenManager
|
||||||
/**
|
/**
|
||||||
* Query parameter value for renewing delegation token http request
|
* Query parameter value for renewing delegation token http request
|
||||||
*/
|
*/
|
||||||
private static final String RENEW_DELEGATION_TOKEN_OP = "RENEWDELEGATIONTOKEN";
|
private static final String RENEW_DELEGATION_TOKEN_OP =
|
||||||
|
"RENEWDELEGATIONTOKEN";
|
||||||
/**
|
/**
|
||||||
* Query parameter value for canceling the delegation token http request
|
* Query parameter value for canceling the delegation token http request
|
||||||
*/
|
*/
|
||||||
private static final String CANCEL_DELEGATION_TOKEN_OP = "CANCELDELEGATIONTOKEN";
|
private static final String CANCEL_DELEGATION_TOKEN_OP =
|
||||||
|
"CANCELDELEGATIONTOKEN";
|
||||||
/**
|
/**
|
||||||
* op parameter to represent the operation.
|
* op parameter to represent the operation.
|
||||||
*/
|
*/
|
||||||
|
@ -100,6 +102,7 @@ public class RemoteWasbDelegationTokenManager
|
||||||
private static final String TOKEN_PARAM_KEY_NAME = "token";
|
private static final String TOKEN_PARAM_KEY_NAME = "token";
|
||||||
private WasbRemoteCallHelper remoteCallHelper;
|
private WasbRemoteCallHelper remoteCallHelper;
|
||||||
private String[] dtServiceUrls;
|
private String[] dtServiceUrls;
|
||||||
|
private boolean isSpnegoTokenCacheEnabled;
|
||||||
|
|
||||||
public RemoteWasbDelegationTokenManager(Configuration conf)
|
public RemoteWasbDelegationTokenManager(Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -108,8 +111,11 @@ public class RemoteWasbDelegationTokenManager
|
||||||
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
|
||||||
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
|
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
|
||||||
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
|
||||||
|
this.isSpnegoTokenCacheEnabled =
|
||||||
|
conf.getBoolean(Constants.AZURE_ENABLE_SPNEGO_TOKEN_CACHE, true);
|
||||||
|
|
||||||
remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, true);
|
remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, true,
|
||||||
|
isSpnegoTokenCacheEnabled);
|
||||||
this.dtServiceUrls =
|
this.dtServiceUrls =
|
||||||
conf.getTrimmedStrings(KEY_DELEGATION_TOKEN_SERVICE_URLS);
|
conf.getTrimmedStrings(KEY_DELEGATION_TOKEN_SERVICE_URLS);
|
||||||
if (this.dtServiceUrls == null || this.dtServiceUrls.length <= 0) {
|
if (this.dtServiceUrls == null || this.dtServiceUrls.length <= 0) {
|
||||||
|
@ -126,7 +132,8 @@ public class RemoteWasbDelegationTokenManager
|
||||||
new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
|
new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
|
||||||
.addParameter(OP_PARAM_KEY_NAME, GET_DELEGATION_TOKEN_OP)
|
.addParameter(OP_PARAM_KEY_NAME, GET_DELEGATION_TOKEN_OP)
|
||||||
.addParameter(RENEWER_PARAM_KEY_NAME, renewer)
|
.addParameter(RENEWER_PARAM_KEY_NAME, renewer)
|
||||||
.addParameter(SERVICE_PARAM_KEY_NAME, WASB_DT_SERVICE_NAME.toString());
|
.addParameter(SERVICE_PARAM_KEY_NAME,
|
||||||
|
WASB_DT_SERVICE_NAME.toString());
|
||||||
String responseBody = remoteCallHelper
|
String responseBody = remoteCallHelper
|
||||||
.makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
|
.makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
|
||||||
uriBuilder.getQueryParams(), HttpGet.METHOD_NAME);
|
uriBuilder.getQueryParams(), HttpGet.METHOD_NAME);
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.fs.azure.security;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to represent SPNEGO token.
|
||||||
|
*/
|
||||||
|
public class SpnegoToken {
|
||||||
|
private AuthenticatedURL.Token token;
|
||||||
|
private long expiryTime;
|
||||||
|
private static final long TOKEN_VALIDITY_TIME_IN_MS = 60 * 60 * 1000L;
|
||||||
|
|
||||||
|
public SpnegoToken(AuthenticatedURL.Token token) {
|
||||||
|
this.token = token;
|
||||||
|
//set the expiry time of the token to be 60 minutes,
|
||||||
|
// actual token will be valid for more than few hours and treating token as opaque.
|
||||||
|
this.expiryTime = System.currentTimeMillis() + TOKEN_VALIDITY_TIME_IN_MS;
|
||||||
|
}
|
||||||
|
|
||||||
|
public AuthenticatedURL.Token getToken() {
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getExpiryTime() {
|
||||||
|
return expiryTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isTokenValid() {
|
||||||
|
return (expiryTime >= System.currentTimeMillis());
|
||||||
|
}
|
||||||
|
}
|
|
@ -43,6 +43,8 @@ import org.mockito.ArgumentMatcher;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.net.UnknownHostException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
|
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
|
||||||
|
@ -62,7 +64,7 @@ public class TestWasbRemoteCallHelper
|
||||||
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
|
conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
|
||||||
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost1/,http://localhost2/");
|
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost1/,http://localhost2/,http://localhost:8080");
|
||||||
return AzureBlobStorageTestAccount.create(conf);
|
return AzureBlobStorageTestAccount.create(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,6 +306,18 @@ public class TestWasbRemoteCallHelper
|
||||||
Mockito.when(mockHttpResponseService2.getEntity())
|
Mockito.when(mockHttpResponseService2.getEntity())
|
||||||
.thenReturn(mockHttpEntity);
|
.thenReturn(mockHttpEntity);
|
||||||
|
|
||||||
|
HttpResponse mockHttpResponseServiceLocal = Mockito.mock(HttpResponse.class);
|
||||||
|
Mockito.when(mockHttpResponseServiceLocal.getStatusLine())
|
||||||
|
.thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
|
||||||
|
Mockito.when(mockHttpResponseServiceLocal.getFirstHeader("Content-Type"))
|
||||||
|
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||||
|
Mockito.when(mockHttpResponseServiceLocal.getFirstHeader("Content-Length"))
|
||||||
|
.thenReturn(newHeader("Content-Length", "1024"));
|
||||||
|
Mockito.when(mockHttpResponseServiceLocal.getEntity())
|
||||||
|
.thenReturn(mockHttpEntity);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
|
class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
|
||||||
@Override public boolean matches(Object o) {
|
@Override public boolean matches(Object o) {
|
||||||
return checkHttpGetMatchHost((HttpGet) o, "localhost1");
|
return checkHttpGetMatchHost((HttpGet) o, "localhost1");
|
||||||
|
@ -314,10 +328,21 @@ public class TestWasbRemoteCallHelper
|
||||||
return checkHttpGetMatchHost((HttpGet) o, "localhost2");
|
return checkHttpGetMatchHost((HttpGet) o, "localhost2");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
class HttpGetForServiceLocal extends ArgumentMatcher<HttpGet>{
|
||||||
|
@Override public boolean matches(Object o) {
|
||||||
|
try {
|
||||||
|
return checkHttpGetMatchHost((HttpGet) o, InetAddress.getLocalHost().getCanonicalHostName());
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
return checkHttpGetMatchHost((HttpGet) o, "localhost");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
|
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
|
||||||
.thenReturn(mockHttpResponseService1);
|
.thenReturn(mockHttpResponseService1);
|
||||||
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
|
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
|
||||||
.thenReturn(mockHttpResponseService2);
|
.thenReturn(mockHttpResponseService2);
|
||||||
|
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForServiceLocal())))
|
||||||
|
.thenReturn(mockHttpResponseServiceLocal);
|
||||||
|
|
||||||
//Need 3 times because performop() does 3 fs operations.
|
//Need 3 times because performop() does 3 fs operations.
|
||||||
Mockito.when(mockHttpEntity.getContent())
|
Mockito.when(mockHttpEntity.getContent())
|
||||||
|
@ -331,6 +356,7 @@ public class TestWasbRemoteCallHelper
|
||||||
|
|
||||||
performop(mockHttpClient);
|
performop(mockHttpClient);
|
||||||
|
|
||||||
|
Mockito.verify(mockHttpClient, times(3)).execute(Mockito.argThat(new HttpGetForServiceLocal()));
|
||||||
Mockito.verify(mockHttpClient, times(3)).execute(Mockito.argThat(new HttpGetForService2()));
|
Mockito.verify(mockHttpClient, times(3)).execute(Mockito.argThat(new HttpGetForService2()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,6 +388,17 @@ public class TestWasbRemoteCallHelper
|
||||||
Mockito.when(mockHttpResponseService2.getEntity())
|
Mockito.when(mockHttpResponseService2.getEntity())
|
||||||
.thenReturn(mockHttpEntity);
|
.thenReturn(mockHttpEntity);
|
||||||
|
|
||||||
|
HttpResponse mockHttpResponseService3 = Mockito.mock(HttpResponse.class);
|
||||||
|
Mockito.when(mockHttpResponseService3.getStatusLine())
|
||||||
|
.thenReturn(newStatusLine(
|
||||||
|
HttpStatus.SC_INTERNAL_SERVER_ERROR));
|
||||||
|
Mockito.when(mockHttpResponseService3.getFirstHeader("Content-Type"))
|
||||||
|
.thenReturn(newHeader("Content-Type", "application/json"));
|
||||||
|
Mockito.when(mockHttpResponseService3.getFirstHeader("Content-Length"))
|
||||||
|
.thenReturn(newHeader("Content-Length", "1024"));
|
||||||
|
Mockito.when(mockHttpResponseService3.getEntity())
|
||||||
|
.thenReturn(mockHttpEntity);
|
||||||
|
|
||||||
class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
|
class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
|
||||||
@Override public boolean matches(Object o) {
|
@Override public boolean matches(Object o) {
|
||||||
return checkHttpGetMatchHost((HttpGet) o, "localhost1");
|
return checkHttpGetMatchHost((HttpGet) o, "localhost1");
|
||||||
|
@ -372,10 +409,21 @@ public class TestWasbRemoteCallHelper
|
||||||
return checkHttpGetMatchHost((HttpGet) o, "localhost2");
|
return checkHttpGetMatchHost((HttpGet) o, "localhost2");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
class HttpGetForService3 extends ArgumentMatcher<HttpGet> {
|
||||||
|
@Override public boolean matches(Object o){
|
||||||
|
try {
|
||||||
|
return checkHttpGetMatchHost((HttpGet) o, InetAddress.getLocalHost().getCanonicalHostName());
|
||||||
|
} catch (UnknownHostException e) {
|
||||||
|
return checkHttpGetMatchHost((HttpGet) o, "localhost");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
|
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
|
||||||
.thenReturn(mockHttpResponseService1);
|
.thenReturn(mockHttpResponseService1);
|
||||||
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
|
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
|
||||||
.thenReturn(mockHttpResponseService2);
|
.thenReturn(mockHttpResponseService2);
|
||||||
|
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService3())))
|
||||||
|
.thenReturn(mockHttpResponseService3);
|
||||||
|
|
||||||
//Need 3 times because performop() does 3 fs operations.
|
//Need 3 times because performop() does 3 fs operations.
|
||||||
Mockito.when(mockHttpEntity.getContent())
|
Mockito.when(mockHttpEntity.getContent())
|
||||||
|
@ -390,10 +438,12 @@ public class TestWasbRemoteCallHelper
|
||||||
performop(mockHttpClient);
|
performop(mockHttpClient);
|
||||||
}catch (WasbAuthorizationException e){
|
}catch (WasbAuthorizationException e){
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
Mockito.verify(mockHttpClient, atLeast(3))
|
Mockito.verify(mockHttpClient, atLeast(2))
|
||||||
.execute(argThat(new HttpGetForService1()));
|
.execute(argThat(new HttpGetForService1()));
|
||||||
Mockito.verify(mockHttpClient, atLeast(3))
|
Mockito.verify(mockHttpClient, atLeast(2))
|
||||||
.execute(argThat(new HttpGetForService2()));
|
.execute(argThat(new HttpGetForService2()));
|
||||||
|
Mockito.verify(mockHttpClient, atLeast(3))
|
||||||
|
.execute(argThat(new HttpGetForService3()));
|
||||||
Mockito.verify(mockHttpClient, times(7)).execute(Mockito.<HttpGet>any());
|
Mockito.verify(mockHttpClient, times(7)).execute(Mockito.<HttpGet>any());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -425,7 +475,7 @@ public class TestWasbRemoteCallHelper
|
||||||
expectedEx.expectMessage(new MatchesPattern(
|
expectedEx.expectMessage(new MatchesPattern(
|
||||||
"org\\.apache\\.hadoop\\.fs\\.azure\\.WasbRemoteCallException: "
|
"org\\.apache\\.hadoop\\.fs\\.azure\\.WasbRemoteCallException: "
|
||||||
+ "Encountered error while making remote call to "
|
+ "Encountered error while making remote call to "
|
||||||
+ "http:\\/\\/localhost1\\/,http:\\/\\/localhost2\\/ retried 6 time\\(s\\)\\."));
|
+ "http:\\/\\/localhost1\\/,http:\\/\\/localhost2\\/,http:\\/\\/localhost:8080 retried 6 time\\(s\\)\\."));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void performop(HttpClient mockHttpClient) throws Throwable {
|
private void performop(HttpClient mockHttpClient) throws Throwable {
|
||||||
|
|
Loading…
Reference in New Issue