HADOOP-14443. Azure: Support retry and client side failover for authorization, SASKey and delegation token generation. Contributed by Santhosh G Nayak.

This commit is contained in:
Jitendra Pandey 2017-07-10 17:30:31 -07:00
parent 20a2770d72
commit d94b30cb03
15 changed files with 1184 additions and 627 deletions

View File

@ -27,9 +27,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
@ -60,15 +58,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.SecurityUtils;
import org.apache.hadoop.fs.azure.security.RemoteWasbDelegationTokenManager;
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenManager;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.codehaus.jackson.JsonNode;
@ -1175,7 +1172,7 @@ public class NativeAzureFileSystem extends FileSystem {
private UserGroupInformation ugi;
private String delegationToken = null;
private WasbDelegationTokenManager wasbDelegationTokenManager;
public NativeAzureFileSystem() {
// set store in initialize()
@ -1325,9 +1322,7 @@ public class NativeAzureFileSystem extends FileSystem {
}
if (UserGroupInformation.isSecurityEnabled() && kerberosSupportEnabled) {
DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
authURL = new DelegationTokenAuthenticatedURL(authenticator);
credServiceUrl = SecurityUtils.getCredServiceUrls(conf);
this.wasbDelegationTokenManager = new RemoteWasbDelegationTokenManager(conf);
}
}
@ -3010,31 +3005,7 @@ public class NativeAzureFileSystem extends FileSystem {
@Override
public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
if (kerberosSupportEnabled) {
try {
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
final UserGroupInformation proxyUser = connectUgi;
if (connectUgi == null) {
connectUgi = ugi;
}
connectUgi.checkTGTAndReloginFromKeytab();
return connectUgi.doAs(new PrivilegedExceptionAction<Token<?>>() {
@Override
public Token<?> run() throws Exception {
return authURL.getDelegationToken(new URL(credServiceUrl
+ Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
authToken, renewer, (proxyUser != null)? ugi.getShortUserName(): null);
}
});
} catch (Exception ex) {
LOG.error("Error in fetching the delegation token from remote service",
ex);
if (ex instanceof IOException) {
throw (IOException) ex;
} else {
throw new IOException(ex);
}
}
return wasbDelegationTokenManager.getDelegationToken(renewer);
} else {
return super.getDelegationToken(renewer);
}

View File

@ -21,25 +21,22 @@ package org.apache.hadoop.fs.azure;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.SecurityUtils;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.Authenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,53 +51,65 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
public static final Logger LOG =
LoggerFactory.getLogger(AzureNativeFileSystemStore.class);
private static final ObjectReader RESPONSE_READER = new ObjectMapper()
.reader(RemoteSASKeyGenerationResponse.class);
/**
* Configuration parameter name expected in the Configuration
* object to provide the url of the remote service {@value}
*/
public static final String KEY_CRED_SERVICE_URLS =
"fs.azure.cred.service.urls";
/**
* Configuration key to enable http retry policy for SAS Key generation. {@value}
*/
public static final String
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
"fs.azure.saskeygenerator.http.retry.policy.enabled";
/**
* Configuration key for SAS Key Generation http retry policy spec. {@value}
*/
public static final String
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY =
"fs.azure.saskeygenerator.http.retry.policy.spec";
/**
* Container SAS Key generation OP name. {@value}
*/
private static final String CONTAINER_SAS_OP = "GET_CONTAINER_SAS";
/**
* Relative Blob SAS Key generation OP name. {@value}
*/
private static final String BLOB_SAS_OP = "GET_RELATIVE_BLOB_SAS";
/**
* Query parameter specifying the expiry period to be used for sas key
* {@value}
*/
private static final String SAS_EXPIRY_QUERY_PARAM_NAME = "sas_expiry";
/**
* Query parameter name for the storage account. {@value}
*/
private static final String STORAGE_ACCOUNT_QUERY_PARAM_NAME =
"storage_account";
/**
* Query parameter name for the storage account container. {@value}
*/
private static final String CONTAINER_QUERY_PARAM_NAME =
"container";
/**
* Query parameter name for user info {@value}
*/
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
"delegation";
private static final String CONTAINER_QUERY_PARAM_NAME = "container";
/**
* Query parameter name for the relative path inside the storage
* account container. {@value}
*/
private static final String RELATIVE_PATH_QUERY_PARAM_NAME =
"relative_path";
private static final String RELATIVE_PATH_QUERY_PARAM_NAME = "relative_path";
/**
* SAS Key Generation Remote http client retry policy spec. {@value}
*/
private static final String
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
"1000,3,10000,2";
private String delegationToken;
private String credServiceUrl = "";
private WasbRemoteCallHelper remoteCallHelper = null;
private boolean isSecurityEnabled;
private boolean isKerberosSupportEnabled;
private RetryPolicy retryPolicy;
private String[] commaSeparatedUrls;
public RemoteSASKeyGeneratorImpl(Configuration conf) {
super(conf);
@ -109,147 +118,41 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
public void initialize(Configuration conf) throws IOException {
LOG.debug("Initializing RemoteSASKeyGeneratorImpl instance");
setDelegationToken();
try {
credServiceUrl = SecurityUtils.getCredServiceUrls(conf);
} catch (UnknownHostException e) {
final String msg = "Invalid CredService Url, configure it correctly";
LOG.error(msg, e);
throw new IOException(msg, e);
}
if (credServiceUrl == null || credServiceUrl.isEmpty()) {
final String msg = "CredService Url not found in configuration to "
+ "initialize RemoteSASKeyGenerator";
LOG.error(msg);
throw new IOException(msg);
}
this.retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY, true,
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
SAS_KEY_GENERATOR_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
remoteCallHelper = new WasbRemoteCallHelper();
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
this.isKerberosSupportEnabled = conf.getBoolean(
Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
this.isKerberosSupportEnabled =
conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
this.commaSeparatedUrls = conf.getTrimmedStrings(KEY_CRED_SERVICE_URLS);
if (this.commaSeparatedUrls == null || this.commaSeparatedUrls.length <= 0) {
throw new IOException(
KEY_CRED_SERVICE_URLS + " config not set" + " in configuration.");
}
if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false);
} else {
this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
}
LOG.debug("Initialization of RemoteSASKeyGenerator instance successful");
}
@Override
public URI getContainerSASUri(String storageAccount, String container)
throws SASKeyGenerationException {
public URI getContainerSASUri(String storageAccount,
String container) throws SASKeyGenerationException {
RemoteSASKeyGenerationResponse sasKeyResponse = null;
try {
LOG.debug("Generating Container SAS Key for Container {} "
+ "inside Storage Account {} ", container, storageAccount);
setDelegationToken();
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
URIBuilder uriBuilder = new URIBuilder();
uriBuilder.setPath("/" + CONTAINER_SAS_OP);
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
storageAccount);
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
container);
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+ getSasKeyExpiryPeriod());
if (isSecurityEnabled && StringUtils.isNotEmpty(delegationToken)) {
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
this.delegationToken);
}
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME, storageAccount);
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME, container);
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME,
"" + getSasKeyExpiryPeriod());
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
} else {
uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
}
return getSASKey(uriBuilder.build(), connectUgi);
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException "
+ "while building the HttpGetRequest to remote cred service",
uriSyntaxEx);
} catch (IOException e) {
throw new SASKeyGenerationException("Encountered IOException"
+ " while building the HttpGetRequest to remote service", e);
}
}
@Override
public URI getRelativeBlobSASUri(String storageAccount, String container,
String relativePath) throws SASKeyGenerationException {
try {
LOG.debug("Generating RelativePath SAS Key for relativePath {} inside"
+ " Container {} inside Storage Account {} ",
relativePath, container, storageAccount);
setDelegationToken();
URIBuilder uriBuilder = new URIBuilder(credServiceUrl);
uriBuilder.setPath("/" + BLOB_SAS_OP);
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME,
storageAccount);
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME,
container);
uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME,
relativePath);
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME, ""
+ getSasKeyExpiryPeriod());
if (isSecurityEnabled && StringUtils.isNotEmpty(
delegationToken)) {
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
this.delegationToken);
}
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
} else {
uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
}
return getSASKey(uriBuilder.build(), connectUgi);
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException"
+ " while building the HttpGetRequest to " + " remote service",
uriSyntaxEx);
} catch (IOException e) {
throw new SASKeyGenerationException("Encountered IOException"
+ " while building the HttpGetRequest to remote service", e);
}
}
private URI getSASKey(final URI uri, UserGroupInformation connectUgi)
throws URISyntaxException, SASKeyGenerationException {
final RemoteSASKeyGenerationResponse sasKeyResponse;
try {
sasKeyResponse = connectUgi.doAs(
new PrivilegedExceptionAction<RemoteSASKeyGenerationResponse>() {
@Override
public RemoteSASKeyGenerationResponse run() throws Exception {
AuthenticatedURL.Token token = null;
if (isKerberosSupportEnabled && UserGroupInformation
.isSecurityEnabled() && (delegationToken == null
|| delegationToken.isEmpty())) {
token = new AuthenticatedURL.Token();
final Authenticator kerberosAuthenticator =
new KerberosDelegationTokenAuthenticator();
try {
kerberosAuthenticator.authenticate(uri.toURL(), token);
Validate.isTrue(token.isSet(),
"Authenticated Token is NOT present. "
+ "The request cannot proceed.");
} catch (AuthenticationException e) {
throw new IOException(
"Authentication failed in check authorization", e);
}
}
return makeRemoteRequest(uri,
(token != null ? token.toString() : null));
}
});
} catch (InterruptedException | IOException e) {
final String msg = "Error fetching SAS Key from Remote Service: " + uri;
LOG.error(msg, e);
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new SASKeyGenerationException(msg, e);
}
sasKeyResponse = makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
uriBuilder.getQueryParams());
if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
return new URI(sasKeyResponse.getSasKey());
@ -258,27 +161,59 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
"Remote Service encountered error in SAS Key generation : "
+ sasKeyResponse.getResponseMessage());
}
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException"
+ " while building the HttpGetRequest to remote service for ",
uriSyntaxEx);
}
}
@Override
public URI getRelativeBlobSASUri(String storageAccount,
String container, String relativePath) throws SASKeyGenerationException {
try {
URIBuilder uriBuilder = new URIBuilder();
uriBuilder.setPath("/" + BLOB_SAS_OP);
uriBuilder.addParameter(STORAGE_ACCOUNT_QUERY_PARAM_NAME, storageAccount);
uriBuilder.addParameter(CONTAINER_QUERY_PARAM_NAME, container);
uriBuilder.addParameter(RELATIVE_PATH_QUERY_PARAM_NAME, relativePath);
uriBuilder.addParameter(SAS_EXPIRY_QUERY_PARAM_NAME,
"" + getSasKeyExpiryPeriod());
RemoteSASKeyGenerationResponse sasKeyResponse =
makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
uriBuilder.getQueryParams());
if (sasKeyResponse.getResponseCode() == REMOTE_CALL_SUCCESS_CODE) {
return new URI(sasKeyResponse.getSasKey());
} else {
throw new SASKeyGenerationException(
"Remote Service encountered error in SAS Key generation : "
+ sasKeyResponse.getResponseMessage());
}
} catch (URISyntaxException uriSyntaxEx) {
throw new SASKeyGenerationException("Encountered URISyntaxException"
+ " while building the HttpGetRequest to " + " remote service",
uriSyntaxEx);
}
}
/**
* Helper method to make a remote request.
* @param uri - Uri to use for the remote request
* @param token - hadoop.auth token for the remote request
*
* @param urls - Urls to use for the remote request
* @param path - hadoop.auth token for the remote request
* @param queryParams - queryParams to be used.
* @return RemoteSASKeyGenerationResponse
*/
private RemoteSASKeyGenerationResponse makeRemoteRequest(URI uri,
String token) throws SASKeyGenerationException {
private RemoteSASKeyGenerationResponse makeRemoteRequest(String[] urls,
String path, List<NameValuePair> queryParams)
throws SASKeyGenerationException {
try {
HttpGet httpGet = new HttpGet(uri);
if (token != null) {
httpGet.setHeader("Cookie", AuthenticatedURL.AUTH_COOKIE + "=" + token);
}
String responseBody = remoteCallHelper.makeRemoteGetRequest(httpGet);
ObjectMapper objectMapper = new ObjectMapper();
return objectMapper.readValue(responseBody,
RemoteSASKeyGenerationResponse.class);
String responseBody = remoteCallHelper
.makeRemoteRequest(urls, path, queryParams, HttpGet.METHOD_NAME);
return RESPONSE_READER.readValue(responseBody);
} catch (WasbRemoteCallException remoteCallEx) {
throw new SASKeyGenerationException("Encountered RemoteCallException"
@ -286,7 +221,8 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
} catch (JsonParseException jsonParserEx) {
throw new SASKeyGenerationException("Encountered JsonParseException "
+ "while parsing the response from remote"
+ " service into RemoteSASKeyGenerationResponse object", jsonParserEx);
+ " service into RemoteSASKeyGenerationResponse object",
jsonParserEx);
} catch (JsonMappingException jsonMappingEx) {
throw new SASKeyGenerationException("Encountered JsonMappingException"
+ " while mapping the response from remote service into "
@ -296,10 +232,6 @@ public class RemoteSASKeyGeneratorImpl extends SASKeyGeneratorImpl {
+ "accessing remote service to retrieve SAS Key", ioEx);
}
}
private void setDelegationToken() throws IOException {
this.delegationToken = SecurityUtils.getDelegationTokenFromCredentials();
}
}
/**

View File

@ -18,28 +18,23 @@
package org.apache.hadoop.fs.azure;
import com.fasterxml.jackson.core.JsonParseException;
import org.codehaus.jackson.JsonParseException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.SecurityUtils;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.Authenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
@ -53,53 +48,60 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
public static final Logger LOG = LoggerFactory
.getLogger(RemoteWasbAuthorizerImpl.class);
private String remoteAuthorizerServiceUrl = null;
private static final ObjectReader RESPONSE_READER = new ObjectMapper()
.reader(RemoteWasbAuthorizerResponse.class);
/**
* Configuration parameter name expected in the Configuration object to
* provide the url of the remote service. {@value}
* provide the urls of the remote service instances. {@value}
*/
public static final String KEY_REMOTE_AUTH_SERVICE_URL =
"fs.azure.authorization.remote.service.url";
public static final String KEY_REMOTE_AUTH_SERVICE_URLS =
"fs.azure.authorization.remote.service.urls";
/**
* Authorization operation OP name in the remote service {@value}
*/
private static final String CHECK_AUTHORIZATION_OP =
"CHECK_AUTHORIZATION";
private static final String CHECK_AUTHORIZATION_OP = "CHECK_AUTHORIZATION";
/**
* Query parameter specifying the access operation type. {@value}
*/
private static final String ACCESS_OPERATION_QUERY_PARAM_NAME =
"operation_type";
/**
* Query parameter specifying the wasb absolute path. {@value}
*/
private static final String WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME =
"wasb_absolute_path";
/**
* Query parameter name for user info {@value}
*/
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
"delegation";
/**
* Query parameter name for sending owner of the specific resource {@value}
*/
private static final String WASB_RESOURCE_OWNER_QUERY_PARAM_NAME =
"wasb_resource_owner";
private WasbRemoteCallHelper remoteCallHelper = null;
private String delegationToken;
private boolean isSecurityEnabled;
private boolean isKerberosSupportEnabled;
/**
* Authorization Remote http client retry policy enabled configuration key. {@value}
*/
private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
"fs.azure.authorizer.http.retry.policy.enabled";
@VisibleForTesting
public void updateWasbRemoteCallHelper(WasbRemoteCallHelper helper) {
/**
* Authorization Remote http client retry policy spec. {@value}
*/
private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_SPEC =
"fs.azure.authorizer.http.retry.policy.spec";
/**
* Authorization Remote http client retry policy spec default value. {@value}
*/
private static final String AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
"1000,3,10000,2";
private WasbRemoteCallHelper remoteCallHelper = null;
private boolean isKerberosSupportEnabled;
private RetryPolicy retryPolicy;
private String[] commaSeparatedUrls = null;
@VisibleForTesting public void updateWasbRemoteCallHelper(
WasbRemoteCallHelper helper) {
this.remoteCallHelper = helper;
}
@ -107,21 +109,24 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
public void init(Configuration conf)
throws WasbAuthorizationException, IOException {
LOG.debug("Initializing RemoteWasbAuthorizerImpl instance");
setDelegationToken();
remoteAuthorizerServiceUrl = SecurityUtils
.getRemoteAuthServiceUrls(conf);
if (remoteAuthorizerServiceUrl == null
|| remoteAuthorizerServiceUrl.isEmpty()) {
throw new WasbAuthorizationException(
"fs.azure.authorization.remote.service.url config not set"
this.isKerberosSupportEnabled =
conf.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
this.commaSeparatedUrls =
conf.getTrimmedStrings(KEY_REMOTE_AUTH_SERVICE_URLS);
if (this.commaSeparatedUrls == null
|| this.commaSeparatedUrls.length <= 0) {
throw new IOException(KEY_REMOTE_AUTH_SERVICE_URLS + " config not set"
+ " in configuration.");
}
this.remoteCallHelper = new WasbRemoteCallHelper();
this.isSecurityEnabled = UserGroupInformation.isSecurityEnabled();
this.isKerberosSupportEnabled = conf
.getBoolean(Constants.AZURE_KERBEROS_SUPPORT_PROPERTY_NAME, false);
this.retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY, true,
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_SPEC,
AUTHORIZER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
if (isKerberosSupportEnabled && UserGroupInformation.isSecurityEnabled()) {
this.remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, false);
} else {
this.remoteCallHelper = new WasbRemoteCallHelper(retryPolicy);
}
}
@Override
@ -130,93 +135,38 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
try {
/* Make an exception for the internal -RenamePending files */
if (wasbAbsolutePath.endsWith(NativeAzureFileSystem.FolderRenamePending.SUFFIX)) {
return true;
}
setDelegationToken();
final URIBuilder uriBuilder = new URIBuilder(remoteAuthorizerServiceUrl);
final URIBuilder uriBuilder = new URIBuilder();
uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
uriBuilder.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME,
wasbAbsolutePath);
uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME,
accessType);
if (isSecurityEnabled && StringUtils.isNotEmpty(delegationToken)) {
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
delegationToken);
}
uriBuilder
.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME, wasbAbsolutePath);
uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME, accessType);
if (resourceOwner != null && StringUtils.isNotEmpty(resourceOwner)) {
uriBuilder.addParameter(WASB_RESOURCE_OWNER_QUERY_PARAM_NAME,
resourceOwner);
}
String responseBody = null;
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
} else {
uriBuilder.addParameter(Constants.DOAS_PARAM, ugi.getShortUserName());
}
String responseBody = remoteCallHelper
.makeRemoteRequest(commaSeparatedUrls, uriBuilder.getPath(),
uriBuilder.getQueryParams(), HttpGet.METHOD_NAME);
try {
responseBody = connectUgi
.doAs(new PrivilegedExceptionAction<String>() {
@Override
public String run() throws Exception {
AuthenticatedURL.Token token = null;
HttpGet httpGet = new HttpGet(uriBuilder.build());
if (isKerberosSupportEnabled && UserGroupInformation
.isSecurityEnabled() && (delegationToken == null
|| delegationToken.isEmpty())) {
token = new AuthenticatedURL.Token();
final Authenticator kerberosAuthenticator = new KerberosDelegationTokenAuthenticator();
try {
kerberosAuthenticator
.authenticate(uriBuilder.build().toURL(), token);
Validate.isTrue(token.isSet(),
"Authenticated Token is NOT present. The request cannot proceed.");
} catch (AuthenticationException e){
throw new IOException("Authentication failed in check authorization", e);
}
if (token != null) {
httpGet.setHeader("Cookie",
AuthenticatedURL.AUTH_COOKIE + "=" + token);
}
}
return remoteCallHelper.makeRemoteGetRequest(httpGet);
}
});
} catch (InterruptedException e) {
LOG.error("Error in check authorization", e);
throw new WasbAuthorizationException("Error in check authorize", e);
}
ObjectMapper objectMapper = new ObjectMapper();
RemoteAuthorizerResponse authorizerResponse =
objectMapper
.readValue(responseBody, RemoteAuthorizerResponse.class);
RemoteWasbAuthorizerResponse authorizerResponse = RESPONSE_READER
.readValue(responseBody);
if (authorizerResponse == null) {
throw new WasbAuthorizationException(
"RemoteAuthorizerResponse object null from remote call");
"RemoteWasbAuthorizerResponse object null from remote call");
} else if (authorizerResponse.getResponseCode()
== REMOTE_CALL_SUCCESS_CODE) {
return authorizerResponse.getAuthorizationResult();
} else {
throw new WasbAuthorizationException("Remote authorization"
+ " service encountered an error "
throw new WasbAuthorizationException(
"Remote authorization" + " service encountered an error "
+ authorizerResponse.getResponseMessage());
}
} catch (URISyntaxException | WasbRemoteCallException
| JsonParseException | JsonMappingException ex) {
} catch (WasbRemoteCallException | JsonParseException | JsonMappingException ex) {
throw new WasbAuthorizationException(ex);
}
}
private void setDelegationToken() throws IOException {
this.delegationToken = SecurityUtils.getDelegationTokenFromCredentials();
}
}
/**
@ -230,25 +180,14 @@ public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
* "authorizationResult" : authorization result <boolean>
* true - if auhorization allowed
* false - otherwise.
*
* }
*/
class RemoteAuthorizerResponse {
class RemoteWasbAuthorizerResponse {
private int responseCode;
private boolean authorizationResult;
private String responseMessage;
public RemoteAuthorizerResponse(int responseCode,
boolean authorizationResult, String message) {
this.responseCode = responseCode;
this.authorizationResult = authorizationResult;
this.responseMessage = message;
}
public RemoteAuthorizerResponse() {
}
public int getResponseCode() {
return responseCode;
}

View File

@ -0,0 +1,210 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import org.apache.commons.lang.Validate;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.fs.azure.security.WasbDelegationTokenIdentifier;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.Authenticator;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.apache.http.NameValuePair;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
/**
* Helper class the has constants and helper methods
* used in WASB when integrating with a remote http cred
* service which uses Kerberos and delegation tokens.
* Currently, remote service will be used to generate
* SAS keys, authorization and delegation token operations.
*/
public class SecureWasbRemoteCallHelper extends WasbRemoteCallHelper {
public static final Logger LOG =
LoggerFactory.getLogger(SecureWasbRemoteCallHelper.class);
/**
* Delegation token query parameter to be used when making rest call.
*/
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME = "delegation";
/**
* Delegation token to be used for making the remote call.
*/
private Token<?> delegationToken = null;
/**
* Does Remote Http Call requires Kerberos Authentication always, even if the delegation token is present.
*/
private boolean alwaysRequiresKerberosAuth;
public SecureWasbRemoteCallHelper(RetryPolicy retryPolicy,
boolean alwaysRequiresKerberosAuth) {
super(retryPolicy);
this.alwaysRequiresKerberosAuth = alwaysRequiresKerberosAuth;
}
@Override
public String makeRemoteRequest(final String[] urls,
final String path, final List<NameValuePair> queryParams,
final String httpMethod) throws IOException {
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi == null) {
connectUgi = ugi;
}
if (delegationToken == null) {
connectUgi.checkTGTAndReloginFromKeytab();
}
String s = null;
try {
s = connectUgi.doAs(new PrivilegedExceptionAction<String>() {
@Override public String run() throws Exception {
return retryableRequest(urls, path, queryParams, httpMethod);
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException(e.getMessage(), e);
}
return s;
}
@Override
public HttpUriRequest getHttpRequest(String[] urls, String path,
List<NameValuePair> queryParams, int urlIndex, String httpMethod)
throws URISyntaxException, IOException {
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
if (connectUgi != null) {
queryParams.add(new NameValuePair() {
@Override public String getName() {
return Constants.DOAS_PARAM;
}
@Override public String getValue() {
return ugi.getShortUserName();
}
});
}
final Token delegationToken = getDelegationToken(ugi);
if (!alwaysRequiresKerberosAuth && delegationToken != null) {
final String delegationTokenEncodedUrlString =
delegationToken.encodeToUrlString();
queryParams.add(new NameValuePair() {
@Override public String getName() {
return DELEGATION_TOKEN_QUERY_PARAM_NAME;
}
@Override public String getValue() {
return delegationTokenEncodedUrlString;
}
});
}
URIBuilder uriBuilder =
new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
HttpUriRequest httpUriRequest = null;
switch (httpMethod) {
case HttpPut.METHOD_NAME:
httpUriRequest = new HttpPut(uriBuilder.build());
break;
case HttpPost.METHOD_NAME:
httpUriRequest = new HttpPost(uriBuilder.build());
break;
default:
httpUriRequest = new HttpGet(uriBuilder.build());
break;
}
LOG.debug("SecureWasbRemoteCallHelper#getHttpRequest() {}",
uriBuilder.build().toURL());
if (alwaysRequiresKerberosAuth || delegationToken == null) {
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
final Authenticator kerberosAuthenticator =
new KerberosDelegationTokenAuthenticator();
try {
kerberosAuthenticator.authenticate(uriBuilder.build().toURL(), token);
} catch (AuthenticationException e) {
throw new WasbRemoteCallException(
Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE, e);
}
Validate.isTrue(token.isSet(),
"Authenticated Token is NOT present. The request cannot proceed.");
httpUriRequest.setHeader("Cookie",
AuthenticatedURL.AUTH_COOKIE + "=" + token);
}
return httpUriRequest;
}
private synchronized Token<?> getDelegationToken(
UserGroupInformation userGroupInformation) throws IOException {
if (this.delegationToken == null) {
Token<?> token = null;
for (Token iterToken : userGroupInformation.getTokens()) {
if (iterToken.getKind()
.equals(WasbDelegationTokenIdentifier.TOKEN_KIND)) {
token = iterToken;
LOG.debug("{} token found in cache : {}",
WasbDelegationTokenIdentifier.TOKEN_KIND, iterToken);
break;
}
}
LOG.debug("UGI Information: {}", userGroupInformation.toString());
// ugi tokens are usually indicative of a task which can't
// refetch tokens. even if ugi has credentials, don't attempt
// to get another token to match hdfs/rpc behavior
if (token != null) {
LOG.debug("Using UGI token: {}", token);
setDelegationToken(token);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Delegation token from cache - {}", delegationToken != null
? delegationToken.encodeToUrlString()
: "null");
}
return this.delegationToken;
}
private <T extends TokenIdentifier> void setDelegationToken(
final Token<T> token) {
synchronized (this) {
this.delegationToken = token;
}
}
}

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -19,19 +19,31 @@
package org.apache.hadoop.fs.azure;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.azure.security.Constants;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
/**
* Helper class the has constants and helper methods
@ -39,101 +51,212 @@ import java.nio.charset.StandardCharsets;
* service. Currently, remote service will be used to generate
* SAS keys.
*/
class WasbRemoteCallHelper {
public class WasbRemoteCallHelper {
public static final Logger LOG =
LoggerFactory.getLogger(WasbRemoteCallHelper.class);
/**
* Return code when the remote call is successful. {@value}
*/
public static final int REMOTE_CALL_SUCCESS_CODE = 0;
/**
* Application Json content type.
*/
private static final String APPLICATION_JSON = "application/json";
/**
* Max content length of the response.
*/
private static final int MAX_CONTENT_LENGTH = 1024;
/**
* Client instance to be used for making the remote call.
*/
private HttpClient client = null;
private Random random = new Random();
private RetryPolicy retryPolicy = null;
public WasbRemoteCallHelper(RetryPolicy retryPolicy) {
this.client = HttpClientBuilder.create().build();
this.retryPolicy = retryPolicy;
}
@VisibleForTesting
public void updateHttpClient(HttpClient client) {
this.client = client;
}
public WasbRemoteCallHelper() {
this.client = HttpClientBuilder.create().build();
}
/**
* Helper method to make remote HTTP Get request.
* @param getRequest - HttpGet request object constructed by caller.
*
* @param urls - Service urls to be used, if one fails try another.
* @param path - URL endpoint for the resource.
* @param queryParams - list of query parameters
* @param httpMethod - http Method to be used.
* @return Http Response body returned as a string. The caller
* is expected to semantically understand the response.
* @throws WasbRemoteCallException
* @throws IOException
* @throws IOException when there an error in executing the remote http request.
*/
public String makeRemoteGetRequest(HttpGet getRequest)
throws WasbRemoteCallException, IOException {
public String makeRemoteRequest(String[] urls, String path,
List<NameValuePair> queryParams, String httpMethod) throws IOException {
return retryableRequest(urls, path, queryParams, httpMethod);
}
protected String retryableRequest(String[] urls, String path,
List<NameValuePair> queryParams, String httpMethod) throws IOException {
HttpResponse response = null;
HttpUriRequest httpRequest = null;
for (int retry = 0, index =
random.nextInt(urls.length);; retry++, index++) {
if (index >= urls.length) {
index = index % urls.length;
}
try {
httpRequest =
getHttpRequest(urls, path, queryParams, index, httpMethod);
final String APPLICATION_JSON = "application/json";
final int MAX_CONTENT_LENGTH = 1024;
getRequest.setHeader("Accept", APPLICATION_JSON);
HttpResponse response = client.execute(getRequest);
httpRequest.setHeader("Accept", APPLICATION_JSON);
response = client.execute(httpRequest);
StatusLine statusLine = response.getStatusLine();
if (statusLine == null || statusLine.getStatusCode() != HttpStatus.SC_OK) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
((statusLine!=null) ? statusLine.toString() : "NULL")
);
if (statusLine == null
|| statusLine.getStatusCode() != HttpStatus.SC_OK) {
throw new WasbRemoteCallException(
httpRequest.getURI().toString() + ":" + ((statusLine != null)
? statusLine.toString()
: "NULL"));
}
Header contentTypeHeader = response.getFirstHeader("Content-Type");
if (contentTypeHeader == null
|| !APPLICATION_JSON.equals(contentTypeHeader.getValue())) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Content-Type mismatch: expected: " + APPLICATION_JSON +
", got " + ((contentTypeHeader!=null) ? contentTypeHeader.getValue() : "NULL")
);
if (contentTypeHeader == null || !APPLICATION_JSON
.equals(contentTypeHeader.getValue())) {
throw new WasbRemoteCallException(
httpRequest.getURI().toString() + ":"
+ "Content-Type mismatch: expected: " + APPLICATION_JSON
+ ", got " + ((contentTypeHeader != null) ? contentTypeHeader
.getValue() : "NULL"));
}
Header contentLengthHeader = response.getFirstHeader("Content-Length");
if (contentLengthHeader == null) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Content-Length header missing"
);
throw new WasbRemoteCallException(
httpRequest.getURI().toString() + ":"
+ "Content-Length header missing");
}
try {
if (Integer.parseInt(contentLengthHeader.getValue()) > MAX_CONTENT_LENGTH) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Content-Length:" + contentLengthHeader.getValue() +
"exceeded max:" + MAX_CONTENT_LENGTH
);
if (Integer.parseInt(contentLengthHeader.getValue())
> MAX_CONTENT_LENGTH) {
throw new WasbRemoteCallException(
httpRequest.getURI().toString() + ":" + "Content-Length:"
+ contentLengthHeader.getValue() + "exceeded max:"
+ MAX_CONTENT_LENGTH);
}
}
catch (NumberFormatException nfe) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Invalid Content-Length value :" + contentLengthHeader.getValue()
);
} catch (NumberFormatException nfe) {
throw new WasbRemoteCallException(
httpRequest.getURI().toString() + ":"
+ "Invalid Content-Length value :" + contentLengthHeader
.getValue());
}
BufferedReader rd = new BufferedReader(
BufferedReader rd = null;
StringBuilder responseBody = new StringBuilder();
try {
rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent(),
StandardCharsets.UTF_8));
StringBuilder responseBody = new StringBuilder();
String responseLine = "";
while ((responseLine = rd.readLine()) != null) {
responseBody.append(responseLine);
}
} finally {
rd.close();
}
return responseBody.toString();
} catch (URISyntaxException uriSyntaxEx) {
throw new WasbRemoteCallException("Encountered URISyntaxException "
+ "while building the HttpGetRequest to remote service",
uriSyntaxEx);
} catch (IOException e) {
LOG.debug(e.getMessage(), e);
try {
shouldRetry(e, retry, (httpRequest != null)
? httpRequest.getURI().toString()
: urls[index]);
} catch (IOException ioex) {
String message =
"Encountered error while making remote call to " + String
.join(",", urls) + " retried " + retry + " time(s).";
LOG.error(message, ioex);
throw new WasbRemoteCallException(message, ioex);
}
}
}
}
} catch (ClientProtocolException clientProtocolEx) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Encountered ClientProtocolException while making remote call", clientProtocolEx);
} catch (IOException ioEx) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Encountered IOException while making remote call", ioEx);
}
protected HttpUriRequest getHttpRequest(String[] urls, String path,
List<NameValuePair> queryParams, int urlIndex, String httpMethod)
throws URISyntaxException, IOException {
URIBuilder uriBuilder = null;
uriBuilder =
new URIBuilder(urls[urlIndex]).setPath(path).setParameters(queryParams);
HttpUriRequest httpUriRequest = null;
switch (httpMethod) {
case HttpPut.METHOD_NAME:
httpUriRequest = new HttpPut(uriBuilder.build());
break;
case HttpPost.METHOD_NAME:
httpUriRequest = new HttpPost(uriBuilder.build());
break;
default:
httpUriRequest = new HttpGet(uriBuilder.build());
break;
}
return httpUriRequest;
}
private void shouldRetry(final IOException ioe, final int retry,
final String url) throws IOException {
CharSequence authenticationExceptionMessage =
Constants.AUTHENTICATION_FAILED_ERROR_MESSAGE;
if (ioe instanceof WasbRemoteCallException && ioe.getMessage()
.equals(authenticationExceptionMessage)) {
throw ioe;
}
try {
final RetryPolicy.RetryAction a = (retryPolicy != null)
? retryPolicy
.shouldRetry(ioe, retry, 0, true)
: RetryPolicy.RetryAction.FAIL;
boolean isRetry = a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
boolean isFailoverAndRetry =
a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
if (isRetry || isFailoverAndRetry) {
LOG.debug("Retrying connect to Remote service:{}. Already tried {}"
+ " time(s); retry policy is {}, " + "delay {}ms.", url, retry,
retryPolicy, a.delayMillis);
Thread.sleep(a.delayMillis);
return;
}
} catch(InterruptedIOException e) {
LOG.warn(e.getMessage(), e);
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
LOG.warn("Original exception is ", ioe);
throw new WasbRemoteCallException(e.getMessage(), e);
}
LOG.debug("Not retrying anymore, already retried the urls {} time(s)",
retry);
throw new WasbRemoteCallException(
url + ":" + "Encountered IOException while making remote call", ioe);
}
}

View File

@ -26,21 +26,6 @@ public final class Constants {
private Constants() {
}
/**
* Configuration parameter name expected in the Configuration
* object to provide the url of the remote service {@value}
*/
public static final String KEY_CRED_SERVICE_URL = "fs.azure.cred.service.url";
/**
* Default port of the remote service used as delegation token manager and Azure storage SAS key generator.
*/
public static final int DEFAULT_CRED_SERVICE_PORT = 50911;
/**
* Default remote delegation token manager endpoint.
*/
public static final String DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT = "/tokenmanager/v1";
/**
* The configuration property to enable Kerberos support.
*/
@ -51,4 +36,9 @@ public final class Constants {
* Parameter to be used for impersonation.
*/
public static final String DOAS_PARAM = "doas";
/**
* Error message for Authentication failures.
*/
public static final String AUTHENTICATION_FAILED_ERROR_MESSAGE = "Authentication Failed ";
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.security;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
/**
* Utility class to parse JSON.
*/
public final class JsonUtils {
public static final Logger LOG = LoggerFactory.getLogger(JsonUtils.class);
private JsonUtils() {
}
public static Map<?, ?> parse(final String jsonString) throws IOException {
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.reader(Map.class).readValue(jsonString);
} catch (Exception e) {
LOG.debug("JSON Parsing exception: {} while parsing {}", e.getMessage(),
jsonString);
if (jsonString.toLowerCase(Locale.ENGLISH).contains("server error")) {
LOG.error(
"Internal Server Error was encountered while making a request");
}
throw new IOException("JSON Parsing Error: " + e.getMessage(), e);
}
}
}

View File

@ -0,0 +1,162 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.SecureWasbRemoteCallHelper;
import org.apache.hadoop.fs.azure.WasbRemoteCallHelper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.utils.URIBuilder;
import java.io.IOException;
import java.util.Map;
/**
* Class to manage delegation token operations by making rest call to remote service.
*/
public class RemoteWasbDelegationTokenManager
implements WasbDelegationTokenManager {
/**
* Configuration parameter name expected in the configuration
* object to provide the url of the delegation token service to fetch the delegation tokens.
*/
public static final String KEY_DELEGATION_TOKEN_SERVICE_URLS =
"fs.azure.delegation.token.service.urls";
/**
* Configuration key to enable http retry policy for delegation token service calls.
*/
public static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY =
"fs.azure.delegationtokenservice.http.retry.policy.enabled";
/**
* Configuration key for delegation token service http retry policy spec.
*/
public static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY =
"fs.azure.delegationtokenservice.http.retry.policy.spec";
/**
* Default remote delegation token manager endpoint.
*/
private static final String DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT =
"/tokenmanager/v1";
/**
* Default for delegation token service http retry policy spec.
*/
private static final String DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
"1000,3,10000,2";
private static final boolean
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = true;
private static final Text WASB_DT_SERVICE_NAME = new Text("WASB_DT_SERVICE");
/**
* Query parameter value for Getting delegation token http request
*/
private static final String GET_DELEGATION_TOKEN_OP = "GETDELEGATIONTOKEN";
/**
* Query parameter value for renewing delegation token http request
*/
private static final String RENEW_DELEGATION_TOKEN_OP = "RENEWDELEGATIONTOKEN";
/**
* Query parameter value for canceling the delegation token http request
*/
private static final String CANCEL_DELEGATION_TOKEN_OP = "CANCELDELEGATIONTOKEN";
/**
* op parameter to represent the operation.
*/
private static final String OP_PARAM_KEY_NAME = "op";
/**
* renewer parameter to represent the renewer of the delegation token.
*/
private static final String RENEWER_PARAM_KEY_NAME = "renewer";
/**
* service parameter to represent the service which returns delegation tokens.
*/
private static final String SERVICE_PARAM_KEY_NAME = "service";
/**
* token parameter to represent the delegation token.
*/
private static final String TOKEN_PARAM_KEY_NAME = "token";
private WasbRemoteCallHelper remoteCallHelper;
private String[] dtServiceUrls;
public RemoteWasbDelegationTokenManager(Configuration conf)
throws IOException {
RetryPolicy retryPolicy = RetryUtils.getMultipleLinearRandomRetry(conf,
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY,
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
DT_MANAGER_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT);
remoteCallHelper = new SecureWasbRemoteCallHelper(retryPolicy, true);
this.dtServiceUrls =
conf.getTrimmedStrings(KEY_DELEGATION_TOKEN_SERVICE_URLS);
if (this.dtServiceUrls == null || this.dtServiceUrls.length <= 0) {
throw new IOException(
KEY_DELEGATION_TOKEN_SERVICE_URLS + " config not set"
+ " in configuration.");
}
}
@Override
public Token<DelegationTokenIdentifier> getDelegationToken(
String renewer) throws IOException {
URIBuilder uriBuilder =
new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
.addParameter(OP_PARAM_KEY_NAME, GET_DELEGATION_TOKEN_OP)
.addParameter(RENEWER_PARAM_KEY_NAME, renewer)
.addParameter(SERVICE_PARAM_KEY_NAME, WASB_DT_SERVICE_NAME.toString());
String responseBody = remoteCallHelper
.makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
uriBuilder.getQueryParams(), HttpGet.METHOD_NAME);
return TokenUtils.toDelegationToken(JsonUtils.parse(responseBody));
}
@Override
public long renewDelegationToken(Token<?> token)
throws IOException {
URIBuilder uriBuilder =
new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
.addParameter(OP_PARAM_KEY_NAME, RENEW_DELEGATION_TOKEN_OP)
.addParameter(TOKEN_PARAM_KEY_NAME, token.encodeToUrlString());
String responseBody = remoteCallHelper
.makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
uriBuilder.getQueryParams(), HttpPut.METHOD_NAME);
Map<?, ?> parsedResp = JsonUtils.parse(responseBody);
return ((Number) parsedResp.get("long")).longValue();
}
@Override
public void cancelDelegationToken(Token<?> token)
throws IOException {
URIBuilder uriBuilder =
new URIBuilder().setPath(DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT)
.addParameter(OP_PARAM_KEY_NAME, CANCEL_DELEGATION_TOKEN_OP)
.addParameter(TOKEN_PARAM_KEY_NAME, token.encodeToUrlString());
remoteCallHelper.makeRemoteRequest(dtServiceUrls, uriBuilder.getPath(),
uriBuilder.getQueryParams(), HttpPut.METHOD_NAME);
}
}

View File

@ -1,86 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.RemoteWasbAuthorizerImpl;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
/**
* Security Utils class for WASB.
*/
public final class SecurityUtils {
private SecurityUtils() {
}
/**
* Utility method to get remote service URLs from the configuration.
* @param conf configuration object.
* @return remote service URL
* @throws UnknownHostException thrown when getting the default value.
*/
public static String getCredServiceUrls(Configuration conf)
throws UnknownHostException {
return conf.get(Constants.KEY_CRED_SERVICE_URL, String
.format("http://%s:%s",
InetAddress.getLocalHost().getCanonicalHostName(),
Constants.DEFAULT_CRED_SERVICE_PORT));
}
/**
* Utility method to get remote Authorization service URLs from the configuration.
* @param conf Configuration object.
* @return remote Authorization server URL
* @throws UnknownHostException thrown when getting the default value.
*/
public static String getRemoteAuthServiceUrls(Configuration conf)
throws UnknownHostException {
return conf.get(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, String
.format("http://%s:%s",
InetAddress.getLocalHost().getCanonicalHostName(),
Constants.DEFAULT_CRED_SERVICE_PORT));
}
/**
* Utility method to get delegation token from the UGI credentials.
* @return delegation token
* @throws IOException thrown when getting the current user.
*/
public static String getDelegationTokenFromCredentials() throws IOException {
String delegationToken = null;
Iterator<Token<? extends TokenIdentifier>> tokenIterator = UserGroupInformation
.getCurrentUser().getCredentials().getAllTokens().iterator();
while (tokenIterator.hasNext()) {
Token<? extends TokenIdentifier> iteratedToken = tokenIterator.next();
if (iteratedToken.getKind()
.equals(WasbDelegationTokenIdentifier.TOKEN_KIND)) {
delegationToken = iteratedToken.encodeToUrlString();
}
}
return delegationToken;
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.security;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
/**
* Utility methods common for token management
*/
public final class TokenUtils {
public static final Logger LOG = LoggerFactory.getLogger(TokenUtils.class);
public static final String URL_STRING = "urlString";
private TokenUtils() {
}
public static Token<DelegationTokenIdentifier> toDelegationToken(
final Map<?, ?> inputMap) throws IOException {
final Map<?, ?> m = (Map<?, ?>) inputMap.get(Token.class.getSimpleName());
return (Token<DelegationTokenIdentifier>) toToken(m);
}
public static Token<? extends TokenIdentifier> toToken(final Map<?, ?> m)
throws IOException {
if (m == null) {
return null;
}
String urlString = (String) m.get(URL_STRING);
if (urlString != null) {
final Token<DelegationTokenIdentifier> token = new Token<>();
LOG.debug("Read url string param - {}", urlString);
token.decodeFromUrlString(urlString);
return token;
}
return null;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.security;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import java.io.IOException;
/**
* Interface for Managing the Delegation tokens.
*/
public interface WasbDelegationTokenManager {
/**
* Get Delegation token
* @param renewer delegation token renewer
* @return delegation token
* @throws IOException when error in getting the delegation token
*/
Token<DelegationTokenIdentifier> getDelegationToken(String renewer)
throws IOException;
/**
* Renew the delegation token
* @param token delegation token.
* @return renewed time.
* @throws IOException when error in renewing the delegation token
*/
long renewDelegationToken(Token<?> token) throws IOException;
/**
* Cancel the delegation token
* @param token delegation token.
* @throws IOException when error in cancelling the delegation token.
*/
void cancelDelegationToken(Token<?> token) throws IOException;
}

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -20,27 +20,19 @@ package org.apache.hadoop.fs.azure.security;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.security.PrivilegedExceptionAction;
/**
* Token Renewer for renewing WASB delegation tokens with remote service.
*/
public class WasbTokenRenewer extends TokenRenewer {
public static final Logger LOG = LoggerFactory
.getLogger(WasbTokenRenewer.class);
public static final Logger LOG =
LoggerFactory.getLogger(WasbTokenRenewer.class);
/**
* Checks if this particular object handles the Kind of token passed.
@ -75,32 +67,7 @@ public class WasbTokenRenewer extends TokenRenewer {
public long renew(final Token<?> token, Configuration conf)
throws IOException, InterruptedException {
LOG.debug("Renewing the delegation token");
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
final UserGroupInformation proxyUser = connectUgi;
if (connectUgi == null) {
connectUgi = ugi;
}
connectUgi.checkTGTAndReloginFromKeytab();
final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
authToken
.setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
String.format("http://%s:%s",
InetAddress.getLocalHost().getCanonicalHostName(),
Constants.DEFAULT_CRED_SERVICE_PORT));
DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
authenticator);
return connectUgi.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws Exception {
return authURL.renewDelegationToken(new URL(credServiceUrl
+ Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
}
});
return getInstance(conf).renewDelegationToken(token);
}
/**
@ -114,31 +81,11 @@ public class WasbTokenRenewer extends TokenRenewer {
public void cancel(final Token<?> token, Configuration conf)
throws IOException, InterruptedException {
LOG.debug("Cancelling the delegation token");
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
UserGroupInformation connectUgi = ugi.getRealUser();
final UserGroupInformation proxyUser = connectUgi;
if (connectUgi == null) {
connectUgi = ugi;
getInstance(conf).cancelDelegationToken(token);
}
connectUgi.checkTGTAndReloginFromKeytab();
final DelegationTokenAuthenticatedURL.Token authToken = new DelegationTokenAuthenticatedURL.Token();
authToken
.setDelegationToken((Token<AbstractDelegationTokenIdentifier>) token);
final String credServiceUrl = conf.get(Constants.KEY_CRED_SERVICE_URL,
String.format("http://%s:%s",
InetAddress.getLocalHost().getCanonicalHostName(),
Constants.DEFAULT_CRED_SERVICE_PORT));
DelegationTokenAuthenticator authenticator = new KerberosDelegationTokenAuthenticator();
final DelegationTokenAuthenticatedURL authURL = new DelegationTokenAuthenticatedURL(
authenticator);
connectUgi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
authURL.cancelDelegationToken(new URL(credServiceUrl
+ Constants.DEFAULT_DELEGATION_TOKEN_MANAGER_ENDPOINT),
authToken, (proxyUser != null) ? ugi.getShortUserName() : null);
return null;
}
});
private WasbDelegationTokenManager getInstance(Configuration conf)
throws IOException {
return new RemoteWasbDelegationTokenManager(conf);
}
}

View File

@ -313,12 +313,12 @@ To enable SAS key generation locally following property needs to be set to true.
</property>
```
To use the remote SAS key generation mode, an external REST service is expected to provided required SAS keys.
To use the remote SAS key generation mode, comma separated external REST services are expected to provided required SAS keys.
Following property can used to provide the end point to use for remote SAS Key generation:
```xml
<property>
<name>fs.azure.cred.service.url</name>
<name>fs.azure.cred.service.urls</name>
<value>{URL}</value>
</property>
```
@ -351,11 +351,11 @@ Authorization support can be enabled in WASB using the following configuration:
```
The current implementation of authorization relies on the presence of an external service that can enforce
the authorization. The service is expected to be running on a URL provided by the following config.
the authorization. The service is expected to be running on comma separated URLs provided by the following config.
```xml
<property>
<name>fs.azure.authorization.remote.service.url</name>
<name>fs.azure.authorization.remote.service.urls</name>
<value>{URL}</value>
</property>
```
@ -374,6 +374,42 @@ The service is expected to return a response in JSON format:
}
```
### Delegation token support in WASB
Delegation token support support can be enabled in WASB using the following configuration:
```xml
<property>
<name>fs.azure.enable.kerberos.support</name>
<value>true</value>
</property>
```
The current implementation of delegation token implementation relies on the presence of an external service instances that can generate and manage delegation tokens. The service is expected to be running on comma separated URLs provided by the following config.
```xml
<property>
<name>fs.azure.delegation.token.service.urls</name>
<value>{URL}</value>
</property>
```
The remote service is expected to provide support for the following REST call: ```{URL}?op=GETDELEGATIONTOKEN```, ```{URL}?op=RENEWDELEGATIONTOKEN``` and ```{URL}?op=CANCELDELEGATIONTOKEN```
An example request:
```{URL}?op=GETDELEGATIONTOKEN&renewer=<renewer>```
```{URL}?op=RENEWDELEGATIONTOKEN&token=<delegation token>```
```{URL}?op=CANCELDELEGATIONTOKEN&token=<delegation token>```
The service is expected to return a response in JSON format for GETDELEGATIONTOKEN request:
```json
{
"Token" : {
"urlString": URL string of delegation token.
}
}
```
## Testing the hadoop-azure Module
The hadoop-azure module includes a full suite of unit tests. Most of the tests

View File

@ -46,7 +46,7 @@ public class TestNativeAzureFileSystemAuthorization
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
Configuration conf = new Configuration();
conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, "http://localhost/");
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost/");
return AzureBlobStorageTestAccount.create(conf);
}

View File

@ -21,34 +21,48 @@ package org.apache.hadoop.fs.azure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.http.*;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.ProtocolVersion;
import org.apache.http.ParseException;
import org.apache.http.HeaderElement;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.hamcrest.Description;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.times;
/**
* Test class to hold all WasbRemoteCallHelper tests
*/
public class TestWasbRemoteCallHelper
extends AbstractWasbTestBase {
public static final String EMPTY_STRING = "";
private static final int INVALID_HTTP_STATUS_CODE_999 = 999;
@Override
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
Configuration conf = new Configuration();
conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, "http://localhost/");
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URLS, "http://localhost1/,http://localhost2/");
return AzureBlobStorageTestAccount.create(conf);
}
@ -80,7 +94,7 @@ public class TestWasbRemoteCallHelper
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(999));
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(INVALID_HTTP_STATUS_CODE_999));
// finished setting up mocks
performop(mockHttpClient);
@ -99,7 +113,7 @@ public class TestWasbRemoteCallHelper
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "text/plain"));
// finished setting up mocks
@ -120,7 +134,7 @@ public class TestWasbRemoteCallHelper
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
// finished setting up mocks
@ -141,7 +155,7 @@ public class TestWasbRemoteCallHelper
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
@ -164,7 +178,7 @@ public class TestWasbRemoteCallHelper
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
@ -188,7 +202,7 @@ public class TestWasbRemoteCallHelper
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
@ -211,7 +225,7 @@ public class TestWasbRemoteCallHelper
public void testMalFormedJSONResponse() throws Throwable {
expectedEx.expect(WasbAuthorizationException.class);
expectedEx.expectMessage("com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input in FIELD_NAME");
expectedEx.expectMessage("org.codehaus.jackson.JsonParseException: Unexpected end-of-input within/between OBJECT entries");
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
@ -220,7 +234,7 @@ public class TestWasbRemoteCallHelper
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
@ -250,7 +264,7 @@ public class TestWasbRemoteCallHelper
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(HttpStatus.SC_OK));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
@ -263,17 +277,155 @@ public class TestWasbRemoteCallHelper
performop(mockHttpClient);
}
private void setupExpectations() throws UnsupportedEncodingException {
@Test
public void testWhenOneInstanceIsDown() throws Throwable {
String path = new Path("/").makeQualified(fs.getUri(), fs.getWorkingDirectory()).toString();
String pathEncoded = URLEncoder.encode(path, "UTF-8");
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
String requestURI = String.format("http://localhost/CHECK_AUTHORIZATION?wasb_absolute_path=%s&operation_type=write", pathEncoded);
HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpResponseService1.getStatusLine())
.thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length"))
.thenReturn(newHeader("Content-Length", "1024"));
Mockito.when(mockHttpResponseService1.getEntity())
.thenReturn(mockHttpEntity);
HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpResponseService2.getStatusLine())
.thenReturn(newStatusLine(HttpStatus.SC_OK));
Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length"))
.thenReturn(newHeader("Content-Length", "1024"));
Mockito.when(mockHttpResponseService2.getEntity())
.thenReturn(mockHttpEntity);
class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
@Override public boolean matches(Object o) {
return checkHttpGetMatchHost((HttpGet) o, "localhost1");
}
}
class HttpGetForService2 extends ArgumentMatcher<HttpGet>{
@Override public boolean matches(Object o) {
return checkHttpGetMatchHost((HttpGet) o, "localhost2");
}
}
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
.thenReturn(mockHttpResponseService1);
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
.thenReturn(mockHttpResponseService2);
//Need 3 times because performop() does 3 fs operations.
Mockito.when(mockHttpEntity.getContent())
.thenReturn(new ByteArrayInputStream(validJsonResponse()
.getBytes(StandardCharsets.UTF_8)))
.thenReturn(new ByteArrayInputStream(validJsonResponse()
.getBytes(StandardCharsets.UTF_8)))
.thenReturn(new ByteArrayInputStream(validJsonResponse()
.getBytes(StandardCharsets.UTF_8)));
// finished setting up mocks
performop(mockHttpClient);
Mockito.verify(mockHttpClient, times(3)).execute(Mockito.argThat(new HttpGetForService2()));
}
@Test
public void testWhenServiceInstancesAreDown() throws Throwable {
//expectedEx.expect(WasbAuthorizationException.class);
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
HttpResponse mockHttpResponseService1 = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpResponseService1.getStatusLine())
.thenReturn(newStatusLine(HttpStatus.SC_INTERNAL_SERVER_ERROR));
Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponseService1.getFirstHeader("Content-Length"))
.thenReturn(newHeader("Content-Length", "1024"));
Mockito.when(mockHttpResponseService1.getEntity())
.thenReturn(mockHttpEntity);
HttpResponse mockHttpResponseService2 = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpResponseService2.getStatusLine())
.thenReturn(newStatusLine(
HttpStatus.SC_INTERNAL_SERVER_ERROR));
Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponseService2.getFirstHeader("Content-Length"))
.thenReturn(newHeader("Content-Length", "1024"));
Mockito.when(mockHttpResponseService2.getEntity())
.thenReturn(mockHttpEntity);
class HttpGetForService1 extends ArgumentMatcher<HttpGet>{
@Override public boolean matches(Object o) {
return checkHttpGetMatchHost((HttpGet) o, "localhost1");
}
}
class HttpGetForService2 extends ArgumentMatcher<HttpGet>{
@Override public boolean matches(Object o) {
return checkHttpGetMatchHost((HttpGet) o, "localhost2");
}
}
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService1())))
.thenReturn(mockHttpResponseService1);
Mockito.when(mockHttpClient.execute(argThat(new HttpGetForService2())))
.thenReturn(mockHttpResponseService2);
//Need 3 times because performop() does 3 fs operations.
Mockito.when(mockHttpEntity.getContent())
.thenReturn(new ByteArrayInputStream(
validJsonResponse().getBytes(StandardCharsets.UTF_8)))
.thenReturn(new ByteArrayInputStream(
validJsonResponse().getBytes(StandardCharsets.UTF_8)))
.thenReturn(new ByteArrayInputStream(
validJsonResponse().getBytes(StandardCharsets.UTF_8)));
// finished setting up mocks
try {
performop(mockHttpClient);
}catch (WasbAuthorizationException e){
e.printStackTrace();
Mockito.verify(mockHttpClient, atLeast(3))
.execute(argThat(new HttpGetForService1()));
Mockito.verify(mockHttpClient, atLeast(3))
.execute(argThat(new HttpGetForService2()));
Mockito.verify(mockHttpClient, times(7)).execute(Mockito.<HttpGet>any());
}
}
private void setupExpectations() {
expectedEx.expect(WasbAuthorizationException.class);
expectedEx.expectMessage("org.apache.hadoop.fs.azure.WasbRemoteCallException: "
+ requestURI
+ ":Encountered IOException while making remote call"
);
class MatchesPattern extends TypeSafeMatcher<String> {
private String pattern;
MatchesPattern(String pattern) {
this.pattern = pattern;
}
@Override protected boolean matchesSafely(String item) {
return item.matches(pattern);
}
@Override public void describeTo(Description description) {
description.appendText("matches pattern ").appendValue(pattern);
}
@Override protected void describeMismatchSafely(String item,
Description mismatchDescription) {
mismatchDescription.appendText("does not match");
}
}
expectedEx.expectMessage(new MatchesPattern(
"org\\.apache\\.hadoop\\.fs\\.azure\\.WasbRemoteCallException: "
+ "Encountered error while making remote call to "
+ "http:\\/\\/localhost1\\/,http:\\/\\/localhost2\\/ retried 6 time\\(s\\)\\."));
}
private void performop(HttpClient mockHttpClient) throws Throwable {
@ -282,7 +434,10 @@ public class TestWasbRemoteCallHelper
RemoteWasbAuthorizerImpl authorizer = new RemoteWasbAuthorizerImpl();
authorizer.init(fs.getConf());
WasbRemoteCallHelper mockWasbRemoteCallHelper = new WasbRemoteCallHelper();
WasbRemoteCallHelper mockWasbRemoteCallHelper = new WasbRemoteCallHelper(
RetryUtils.getMultipleLinearRandomRetry(new Configuration(),
EMPTY_STRING, true,
EMPTY_STRING, "1000,3,10000,2"));
mockWasbRemoteCallHelper.updateHttpClient(mockHttpClient);
authorizer.updateWasbRemoteCallHelper(mockWasbRemoteCallHelper);
fs.updateWasbAuthorizer(authorizer);
@ -293,21 +448,26 @@ public class TestWasbRemoteCallHelper
}
private String validJsonResponse() {
return new String(
"{\"responseCode\": 0, \"authorizationResult\": true, \"responseMessage\": \"Authorized\"}"
);
return "{"
+ "\"responseCode\": 0,"
+ "\"authorizationResult\": true,"
+ "\"responseMessage\": \"Authorized\""
+ "}";
}
private String malformedJsonResponse() {
return new String(
"{\"responseCode\": 0, \"authorizationResult\": true, \"responseMessage\":"
);
return "{"
+ "\"responseCode\": 0,"
+ "\"authorizationResult\": true,"
+ "\"responseMessage\":";
}
private String failureCodeJsonResponse() {
return new String(
"{\"responseCode\": 1, \"authorizationResult\": false, \"responseMessage\": \"Unauthorized\"}"
);
return "{"
+ "\"responseCode\": 1,"
+ "\"authorizationResult\": false,"
+ "\"responseMessage\": \"Unauthorized\""
+ "}";
}
private StatusLine newStatusLine(final int statusCode) {
@ -347,4 +507,10 @@ public class TestWasbRemoteCallHelper
}
};
}
/** Check that a HttpGet request is with given remote host. */
private static boolean checkHttpGetMatchHost(HttpGet g, String h) {
return g != null && g.getURI().getHost().equals(h);
}
}