HADOOP-13930. Azure: Add Authorization support to WASB. Contributed by Sivaguru Sankaridurg and Dushyanth

This commit is contained in:
Mingliang Liu 2017-03-06 17:16:36 -08:00 committed by Anu Engineer
parent bb0bc7d909
commit 223c268535
15 changed files with 1372 additions and 69 deletions

View File

@ -1292,6 +1292,16 @@
to specify the time (such as 2s, 2m, 1h, etc.).
</description>
</property>
<property>
<name>fs.azure.authorization</name>
<value>false</value>
<description>
Config flag to enable authorization support in WASB. Setting it to "true" enables
authorization support to WASB. Currently WASB authorization requires a remote service
to provide authorization that needs to be specified via fs.azure.authorization.remote.service.url
configuration
</description>
</property>
<property>

View File

@ -114,6 +114,7 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
xmlPropsToSkipCompare.add("fs.azure.sas.expiry.period");
xmlPropsToSkipCompare.add("fs.azure.local.sas.key.mode");
xmlPropsToSkipCompare.add("fs.azure.secure.mode");
xmlPropsToSkipCompare.add("fs.azure.authorization");
// Deprecated properties. These should eventually be removed from the
// class.

View File

@ -249,7 +249,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
* Default values to control SAS Key mode.
* By default we set the values to false.
*/
private static final boolean DEFAULT_USE_SECURE_MODE = false;
public static final boolean DEFAULT_USE_SECURE_MODE = false;
private static final boolean DEFAULT_USE_LOCAL_SAS_KEY_MODE = false;
/**
@ -849,6 +849,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
rootDirectory = container.getDirectoryReference("");
canCreateOrModifyContainer = true;
configureAzureStorageSession();
tolerateOobAppends = false;
}
/**

View File

@ -1106,7 +1106,31 @@ public class NativeAzureFileSystem extends FileSystem {
// A counter to create unique (within-process) names for my metrics sources.
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
private boolean appendSupportEnabled = false;
/**
* Configuration key to enable authorization support in WASB.
*/
public static final String KEY_AZURE_AUTHORIZATION =
"fs.azure.authorization";
/**
* Default value for the authorization support in WASB.
*/
private static final boolean DEFAULT_AZURE_AUTHORIZATION = false;
/**
* Flag controlling authorization support in WASB.
*/
private boolean azureAuthorization = false;
/**
* Authorizer to use when authorization support is enabled in
* WASB.
*/
private WasbAuthorizerInterface authorizer = null;
private String delegationToken = null;
public NativeAzureFileSystem() {
// set store in initialize()
}
@ -1146,11 +1170,11 @@ public class NativeAzureFileSystem extends FileSystem {
return baseName + number;
}
}
/**
* Checks if the given URI scheme is a scheme that's affiliated with the Azure
* File System.
*
*
* @param scheme
* The URI scheme.
* @return true iff it's an Azure File System URI scheme.
@ -1167,7 +1191,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Puts in the authority of the default file system if it is a WASB file
* system and the given URI's authority is null.
*
*
* @return The URI with reconstructed authority if necessary and possible.
*/
private static URI reconstructAuthorityIfNeeded(URI uri, Configuration conf) {
@ -1237,6 +1261,24 @@ public class NativeAzureFileSystem extends FileSystem {
// Initialize thread counts from user configuration
deleteThreadCount = conf.getInt(AZURE_DELETE_THREADS, DEFAULT_AZURE_DELETE_THREADS);
renameThreadCount = conf.getInt(AZURE_RENAME_THREADS, DEFAULT_AZURE_RENAME_THREADS);
boolean useSecureMode = conf.getBoolean(AzureNativeFileSystemStore.KEY_USE_SECURE_MODE,
AzureNativeFileSystemStore.DEFAULT_USE_SECURE_MODE);
this.azureAuthorization = useSecureMode &&
conf.getBoolean(KEY_AZURE_AUTHORIZATION, DEFAULT_AZURE_AUTHORIZATION);
if (this.azureAuthorization) {
this.authorizer =
new RemoteWasbAuthorizerImpl();
authorizer.init(conf);
}
}
@VisibleForTesting
public void updateWasbAuthorizer(WasbAuthorizerInterface authorizer) {
this.authorizer = authorizer;
}
private NativeFileSystemStore createDefaultStore(Configuration conf) {
@ -1338,18 +1380,28 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* For unit test purposes, retrieves the AzureNativeFileSystemStore store
* backing this file system.
*
*
* @return The store object.
*/
@VisibleForTesting
public AzureNativeFileSystemStore getStore() {
return actualStore;
}
NativeFileSystemStore getStoreInterface() {
return store;
}
private void performAuthCheck(String path, String accessType,
String operation) throws WasbAuthorizationException, IOException {
if (azureAuthorization && this.authorizer != null &&
!this.authorizer.authorize(path, accessType, delegationToken)) {
throw new WasbAuthorizationException(operation
+ " operation for Path : " + path + " not allowed");
}
}
/**
* Gets the metrics source for this file system.
* This is mainly here for unit testing purposes.
@ -1372,6 +1424,10 @@ public class NativeAzureFileSystem extends FileSystem {
LOG.debug("Opening file: {} for append", f);
Path absolutePath = makeAbsolute(f);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.WRITE.toString(), "append");
String key = pathToKey(absolutePath);
FileMetadata meta = null;
try {
@ -1434,6 +1490,7 @@ public class NativeAzureFileSystem extends FileSystem {
* Get a self-renewing lease on the specified file.
* @param path path whose lease to be renewed.
* @return Lease
* @throws AzureException when not being able to acquire a lease on the path
*/
public SelfRenewingLease acquireLease(Path path) throws AzureException {
String fullKey = pathToKey(makeAbsolute(path));
@ -1572,6 +1629,10 @@ public class NativeAzureFileSystem extends FileSystem {
}
Path absolutePath = makeAbsolute(f);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.WRITE.toString(), "create");
String key = pathToKey(absolutePath);
FileMetadata existingMetadata = store.retrieveMetadata(key);
@ -1652,10 +1713,10 @@ public class NativeAzureFileSystem extends FileSystem {
// Construct the data output stream from the buffered output stream.
FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
// Increment the counter
instrumentation.fileCreated();
// Return data output stream to caller.
return fsOut;
}
@ -1694,6 +1755,10 @@ public class NativeAzureFileSystem extends FileSystem {
LOG.debug("Deleting file: {}", f.toString());
Path absolutePath = makeAbsolute(f);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.EXECUTE.toString(), "delete");
String key = pathToKey(absolutePath);
// Capture the metadata for the path.
@ -1964,6 +2029,10 @@ public class NativeAzureFileSystem extends FileSystem {
// Capture the absolute path and the path to key.
Path absolutePath = makeAbsolute(f);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.EXECUTE.toString(), "getFileStatus");
String key = pathToKey(absolutePath);
if (key.length() == 0) { // root always exists
return newDirectory(null, absolutePath);
@ -2062,6 +2131,10 @@ public class NativeAzureFileSystem extends FileSystem {
LOG.debug("Listing status for {}", f.toString());
Path absolutePath = makeAbsolute(f);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.EXECUTE.toString(), "list");
String key = pathToKey(absolutePath);
Set<FileStatus> status = new TreeSet<FileStatus>();
FileMetadata meta = null;
@ -2228,7 +2301,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Applies the applicable UMASK's on the given permission.
*
*
* @param permission
* The permission to mask.
* @param applyMode
@ -2250,7 +2323,7 @@ public class NativeAzureFileSystem extends FileSystem {
/**
* Creates the PermissionStatus object to use for the given permission, based
* on the current user in context.
*
*
* @param permission
* The permission for the file.
* @return The permission status object to use.
@ -2284,6 +2357,10 @@ public class NativeAzureFileSystem extends FileSystem {
}
Path absolutePath = makeAbsolute(f);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.EXECUTE.toString(), "mkdirs");
PermissionStatus permissionStatus = null;
if(noUmask) {
// ensure owner still has wx permissions at the minimum
@ -2337,6 +2414,10 @@ public class NativeAzureFileSystem extends FileSystem {
LOG.debug("Opening file: {}", f.toString());
Path absolutePath = makeAbsolute(f);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.READ.toString(), "read");
String key = pathToKey(absolutePath);
FileMetadata meta = null;
try {
@ -2393,7 +2474,12 @@ public class NativeAzureFileSystem extends FileSystem {
+ " through WASB that has colons in the name");
}
String srcKey = pathToKey(makeAbsolute(src));
Path absolutePath = makeAbsolute(src);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.EXECUTE.toString(), "rename");
String srcKey = pathToKey(absolutePath);
if (srcKey.length() == 0) {
// Cannot rename root of file system
@ -2695,6 +2781,10 @@ public class NativeAzureFileSystem extends FileSystem {
@Override
public void setPermission(Path p, FsPermission permission) throws FileNotFoundException, IOException {
Path absolutePath = makeAbsolute(p);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.EXECUTE.toString(), "setPermission");
String key = pathToKey(absolutePath);
FileMetadata metadata = null;
try {
@ -2733,6 +2823,10 @@ public class NativeAzureFileSystem extends FileSystem {
public void setOwner(Path p, String username, String groupname)
throws IOException {
Path absolutePath = makeAbsolute(p);
performAuthCheck(absolutePath.toString(),
WasbAuthorizationOperations.EXECUTE.toString(), "setOwner");
String key = pathToKey(absolutePath);
FileMetadata metadata = null;

View File

@ -0,0 +1,190 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import java.io.IOException;
import java.net.URISyntaxException;
import static org.apache.hadoop.fs.azure.WasbRemoteCallHelper.REMOTE_CALL_SUCCESS_CODE;
/**
* Class implementing WasbAuthorizerInterface using a remote
* service that implements the authorization operation. This
* class expects the url of the remote service to be passed
* via config.
*/
public class RemoteWasbAuthorizerImpl implements WasbAuthorizerInterface {
private String remoteAuthorizerServiceUrl = "";
/**
* Configuration parameter name expected in the Configuration object to
* provide the url of the remote service. {@value}
*/
public static final String KEY_REMOTE_AUTH_SERVICE_URL =
"fs.azure.authorization.remote.service.url";
/**
* Authorization operation OP name in the remote service {@value}
*/
private static final String CHECK_AUTHORIZATION_OP =
"CHECK_AUTHORIZATION";
/**
* Query parameter specifying the access operation type. {@value}
*/
private static final String ACCESS_OPERATION_QUERY_PARAM_NAME =
"operation_type";
/**
* Query parameter specifying the wasb absolute path. {@value}
*/
private static final String WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME =
"wasb_absolute_path";
/**
* Query parameter name for user info {@value}
*/
private static final String DELEGATION_TOKEN_QUERY_PARAM_NAME =
"delegation_token";
private WasbRemoteCallHelper remoteCallHelper = null;
@VisibleForTesting
public void updateWasbRemoteCallHelper(WasbRemoteCallHelper helper) {
this.remoteCallHelper = helper;
}
@Override
public void init(Configuration conf)
throws WasbAuthorizationException, IOException {
remoteAuthorizerServiceUrl = conf.get(KEY_REMOTE_AUTH_SERVICE_URL);
if (remoteAuthorizerServiceUrl == null
|| remoteAuthorizerServiceUrl.isEmpty()) {
throw new WasbAuthorizationException(
"fs.azure.authorization.remote.service.url config not set"
+ " in configuration.");
}
this.remoteCallHelper = new WasbRemoteCallHelper();
}
@Override
public boolean authorize(String wasbAbsolutePath, String accessType,
String delegationToken) throws WasbAuthorizationException, IOException {
try {
URIBuilder uriBuilder = new URIBuilder(remoteAuthorizerServiceUrl);
uriBuilder.setPath("/" + CHECK_AUTHORIZATION_OP);
uriBuilder.addParameter(WASB_ABSOLUTE_PATH_QUERY_PARAM_NAME,
wasbAbsolutePath);
uriBuilder.addParameter(ACCESS_OPERATION_QUERY_PARAM_NAME,
accessType);
uriBuilder.addParameter(DELEGATION_TOKEN_QUERY_PARAM_NAME,
delegationToken);
String responseBody = remoteCallHelper.makeRemoteGetRequest(
new HttpGet(uriBuilder.build()));
ObjectMapper objectMapper = new ObjectMapper();
RemoteAuthorizerResponse authorizerResponse =
objectMapper.readValue(responseBody, RemoteAuthorizerResponse.class);
if (authorizerResponse == null) {
throw new WasbAuthorizationException(
"RemoteAuthorizerResponse object null from remote call");
} else if (authorizerResponse.getResponseCode()
== REMOTE_CALL_SUCCESS_CODE) {
return authorizerResponse.getAuthorizationResult();
} else {
throw new WasbAuthorizationException("Remote authorization"
+ " service encountered an error "
+ authorizerResponse.getResponseMessage());
}
} catch (URISyntaxException | WasbRemoteCallException
| JsonParseException | JsonMappingException ex) {
throw new WasbAuthorizationException(ex);
}
}
}
/**
* POJO representing the response expected from a remote
* authorization service.
* The remote service is expected to return the authorization
* response in the following JSON format
* {
* "responseCode" : 0 or non-zero <int>,
* "responseMessage" : relavant message of failure <String>
* "authorizationResult" : authorization result <boolean>
* true - if auhorization allowed
* false - otherwise.
*
* }
*/
class RemoteAuthorizerResponse {
private int responseCode;
private boolean authorizationResult;
private String responseMessage;
public RemoteAuthorizerResponse(int responseCode,
boolean authorizationResult, String message) {
this.responseCode = responseCode;
this.authorizationResult = authorizationResult;
this.responseMessage = message;
}
public RemoteAuthorizerResponse() {
}
public int getResponseCode() {
return responseCode;
}
public void setResponseCode(int responseCode) {
this.responseCode = responseCode;
}
public boolean getAuthorizationResult() {
return authorizationResult;
}
public void setAuthorizationResult(boolean authorizationResult) {
this.authorizationResult = authorizationResult;
}
public String getResponseMessage() {
return responseMessage;
}
public void setResponseMessage(String message) {
this.responseMessage = message;
}
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
/**
* Exception that gets thrown during the authorization failures
* in WASB.
*/
public class WasbAuthorizationException extends AzureException {
private static final long serialVersionUID = 1L;
public WasbAuthorizationException(String message) {
super(message);
}
public WasbAuthorizationException(String message, Throwable cause) {
super(message, cause);
}
public WasbAuthorizationException(Throwable t) {
super(t);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
/**
* Different authorization operations supported
* in WASB.
*/
public enum WasbAuthorizationOperations {
READ, WRITE, EXECUTE;
@Override
public String toString() {
switch(this) {
case READ:
return "read";
case WRITE:
return "write";
case EXECUTE:
return "execute";
default:
throw new IllegalArgumentException(
"Invalid Authorization Operation");
}
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
/**
* Interface to implement authorization support in WASB.
* API's of this interface will be implemented in the
* StorageInterface Layer before making calls to Azure
* Storage.
*/
public interface WasbAuthorizerInterface {
/**
* Initializer method
* @param conf - Configuration object
* @throws WasbAuthorizationException - On authorization exceptions
* @throws IOException - When not able to reach the authorizer
*/
public void init(Configuration conf)
throws WasbAuthorizationException, IOException;
/**
* Authorizer API to authorize access in WASB.
* @param wasbAbolutePath : Absolute WASB Path used for access.
* @param accessType : Type of access
* @param delegationToken : The user information.
* @return : true - If access allowed false - If access is not allowed.
* @throws WasbAuthorizationException - On authorization exceptions
* @throws IOException - When not able to reach the authorizer
*/
public boolean authorize(String wasbAbolutePath, String accessType,
String delegationToken) throws WasbAuthorizationException, IOException;
}

View File

@ -18,18 +18,21 @@
package org.apache.hadoop.fs.azure;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import com.google.common.annotations.VisibleForTesting;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.StatusLine;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
/**
* Helper class the has constants and helper methods
* used in WASB when integrating with a remote http cred
@ -48,6 +51,11 @@ class WasbRemoteCallHelper {
*/
private HttpClient client = null;
@VisibleForTesting
public void updateHttpClient(HttpClient client) {
this.client = client;
}
public WasbRemoteCallHelper() {
this.client = HttpClientBuilder.create().build();
}
@ -58,17 +66,54 @@ class WasbRemoteCallHelper {
* @return Http Response body returned as a string. The caller
* is expected to semantically understand the response.
* @throws WasbRemoteCallException
* @throws IOException
*/
public String makeRemoteGetRequest(HttpGet getRequest)
throws WasbRemoteCallException {
throws WasbRemoteCallException, IOException {
try {
final String APPLICATION_JSON = "application/json";
final int MAX_CONTENT_LENGTH = 1024;
getRequest.setHeader("Accept", APPLICATION_JSON);
HttpResponse response = client.execute(getRequest);
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
throw new WasbRemoteCallException(
response.getStatusLine().toString());
StatusLine statusLine = response.getStatusLine();
if (statusLine == null || statusLine.getStatusCode() != HttpStatus.SC_OK) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
((statusLine!=null) ? statusLine.toString() : "NULL")
);
}
Header contentTypeHeader = response.getFirstHeader("Content-Type");
if (contentTypeHeader == null || contentTypeHeader.getValue() != APPLICATION_JSON) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Content-Type mismatch: expected: " + APPLICATION_JSON +
", got " + ((contentTypeHeader!=null) ? contentTypeHeader.getValue() : "NULL")
);
}
Header contentLengthHeader = response.getFirstHeader("Content-Length");
if (contentLengthHeader == null) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Content-Length header missing"
);
}
try {
if (Integer.parseInt(contentLengthHeader.getValue()) > MAX_CONTENT_LENGTH) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Content-Length:" + contentLengthHeader.getValue() +
"exceeded max:" + MAX_CONTENT_LENGTH
);
}
}
catch (NumberFormatException nfe) {
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Invalid Content-Length value :" + contentLengthHeader.getValue()
);
}
BufferedReader rd = new BufferedReader(
@ -83,11 +128,11 @@ class WasbRemoteCallHelper {
return responseBody.toString();
} catch (ClientProtocolException clientProtocolEx) {
throw new WasbRemoteCallException("Encountered ClientProtocolException"
+ " while making remote call", clientProtocolEx);
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Encountered ClientProtocolException while making remote call", clientProtocolEx);
} catch (IOException ioEx) {
throw new WasbRemoteCallException("Encountered IOException while making"
+ " remote call", ioEx);
throw new WasbRemoteCallException(getRequest.getURI().toString() + ":" +
"Encountered IOException while making remote call", ioEx);
}
}
}

View File

@ -333,6 +333,40 @@ The service is expected to return a response in JSON format:
"sasKey" : Requested SAS Key <String>
}
```
## <a name="WASB Authorization" />Authorization Support in WASB.
Authorization support can be enabled in WASB using the following configuration:
```
<property>
<name>fs.azure.authorization</name>
<value>true</value>
</property>
```
The current implementation of authorization relies on the presence of an external service that can enforce
the authorization. The service is expected to be running on a URL provided by the following config.
```
<property>
<name>fs.azure.authorization.remote.service.url</name>
<value>{URL}</value>
</property>
```
The remote service is expected to provide support for the following REST call: ```{URL}/CHECK_AUTHORIZATION```
An example request:
```{URL}/CHECK_AUTHORIZATION?wasb_absolute_path=<absolute_path>&operation_type=<operation type>&delegation_token=<delegation token>```
The service is expected to return a response in JSON format:
```
{
"responseCode" : 0 or non-zero <int>,
"responseMessage" : relevant message on failure <String>,
"authorizationResult" : true/false <boolean>
}
```
## <a name="Testing_the_hadoop-azure_Module" />Testing the hadoop-azure Module
The hadoop-azure module includes a full suite of unit tests. Most of the tests

View File

@ -18,17 +18,9 @@
package org.apache.hadoop.fs.azure;
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumSet;
import java.util.GregorianCalendar;
import java.util.TimeZone;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.microsoft.azure.storage.*;
import com.microsoft.azure.storage.blob.*;
import com.microsoft.azure.storage.core.Base64;
import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@ -39,22 +31,14 @@ import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.impl.TestMetricsConfig;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.StorageCredentials;
import com.microsoft.azure.storage.StorageCredentialsAccountAndKey;
import com.microsoft.azure.storage.StorageCredentialsAnonymous;
import com.microsoft.azure.storage.blob.BlobContainerPermissions;
import com.microsoft.azure.storage.blob.BlobContainerPublicAccessType;
import com.microsoft.azure.storage.blob.BlobOutputStream;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.SharedAccessBlobPermissions;
import com.microsoft.azure.storage.blob.SharedAccessBlobPolicy;
import com.microsoft.azure.storage.core.Base64;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_LOCAL_SAS_KEY_MODE;
/**
* Helper class to create WASB file systems backed by either a mock in-memory
@ -92,7 +76,7 @@ public final class AzureBlobStorageTestAccount {
private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics =
new ConcurrentLinkedQueue<MetricsRecord>();
private static boolean metricsConfigSaved = false;
private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
CloudStorageAccount account,
CloudBlobContainer container) {
@ -272,6 +256,7 @@ public final class AzureBlobStorageTestAccount {
store.setAzureStorageInteractionLayer(mockStorage);
NativeAzureFileSystem fs = new NativeAzureFileSystem(store);
setMockAccountKey(conf);
configureSecureModeTestSettings(conf);
// register the fs provider.
fs.initialize(new URI(MOCK_WASB_URI), conf);
@ -332,6 +317,8 @@ public final class AzureBlobStorageTestAccount {
// Set account URI and initialize Azure file system.
URI accountUri = createAccountUri(DEFAULT_STORAGE_EMULATOR_ACCOUNT_NAME,
containerName);
configureSecureModeTestSettings(conf);
fs.initialize(accountUri, conf);
// Create test account initializing the appropriate member variables.
@ -368,6 +355,7 @@ public final class AzureBlobStorageTestAccount {
// out-of-band appends.
conf.setBoolean(KEY_DISABLE_THROTTLING, true);
conf.setBoolean(KEY_READ_TOLERATE_CONCURRENT_APPEND, true);
configureSecureModeTestSettings(conf);
// Set account URI and initialize Azure file system.
URI accountUri = createAccountUri(accountName, containerName);
@ -408,6 +396,17 @@ public final class AzureBlobStorageTestAccount {
setMockAccountKey(conf, MOCK_ACCOUNT_NAME);
}
/**
* Configure default values for Secure Mode testing.
* These values are relevant only when testing in Secure Mode.
*
* @param conf
* The configuration.
*/
public static void configureSecureModeTestSettings(Configuration conf) {
conf.set(KEY_USE_LOCAL_SAS_KEY_MODE, "true"); // always use local sas-key mode for testing
}
/**
* Sets the mock account key in the given configuration.
*
@ -556,6 +555,8 @@ public final class AzureBlobStorageTestAccount {
conf.setBoolean(KEY_DISABLE_THROTTLING, true);
}
configureSecureModeTestSettings(conf);
// Set account URI and initialize Azure file system.
URI accountUri = createAccountUri(accountName, containerName);
fs.initialize(accountUri, conf);
@ -693,6 +694,8 @@ public final class AzureBlobStorageTestAccount {
// Capture the account URL and the account name.
String accountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME);
configureSecureModeTestSettings(conf);
// Generate a container name and create a shared access signature string for
// it.
//
@ -764,6 +767,8 @@ public final class AzureBlobStorageTestAccount {
// Capture the account URL and the account name.
String accountName = conf.get(TEST_ACCOUNT_NAME_PROPERTY_NAME);
configureSecureModeTestSettings(conf);
// Set up public container with the specified blob name.
CloudBlockBlob blobRoot = primeRootContainer(blobClient, accountName,
blobName, fileSize);

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
/**
* A mock wasb authorizer implementation.
*/
public class MockWasbAuthorizerImpl implements WasbAuthorizerInterface {
private Map<AuthorizationComponent, Boolean> authRules;
@Override
public void init(Configuration conf) {
authRules = new HashMap<AuthorizationComponent, Boolean>();
}
public void addAuthRule(String wasbAbsolutePath,
String accessType, boolean access) {
AuthorizationComponent component =
new AuthorizationComponent(wasbAbsolutePath, accessType);
this.authRules.put(component, access);
}
@Override
public boolean authorize(String wasbAbsolutePath, String accessType,
String delegationToken) throws WasbAuthorizationException {
AuthorizationComponent component =
new AuthorizationComponent(wasbAbsolutePath, accessType);
if (authRules.containsKey(component)) {
return authRules.get(component);
} else {
return false;
}
}
}
class AuthorizationComponent {
private String wasbAbsolutePath;
private String accessType;
public AuthorizationComponent(String wasbAbsolutePath,
String accessType) {
this.wasbAbsolutePath = wasbAbsolutePath;
this.accessType = accessType;
}
@Override
public int hashCode() {
return this.wasbAbsolutePath.hashCode() ^ this.accessType.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (obj == null
|| !(obj instanceof AuthorizationComponent)) {
return false;
}
return ((AuthorizationComponent)obj).
getWasbAbsolutePath().equals(this.wasbAbsolutePath)
&& ((AuthorizationComponent)obj).
getAccessType().equals(this.accessType);
}
public String getWasbAbsolutePath() {
return this.wasbAbsolutePath;
}
public String getAccessType() {
return accessType;
}
}

View File

@ -0,0 +1,344 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import com.sun.tools.javac.util.Assert;
import org.junit.rules.ExpectedException;
import java.io.Console;
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
/**
* Test class to hold all WASB authorization tests.
*/
public class TestNativeAzureFileSystemAuthorization
extends AbstractWasbTestBase {
@Override
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
Configuration conf = new Configuration();
conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, "http://localhost/");
return AzureBlobStorageTestAccount.create(conf);
}
@Before
public void beforeMethod() {
boolean useSecureMode = fs.getConf().getBoolean(KEY_USE_SECURE_MODE, false);
boolean useAuthorization = fs.getConf().getBoolean(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, false);
Assume.assumeTrue("Test valid when both SecureMode and Authorization are enabled .. skipping",
useSecureMode && useAuthorization);
Assume.assumeTrue(
useSecureMode && useAuthorization
);
}
@Rule
public ExpectedException expectedEx = ExpectedException.none();
/**
* Positive test to verify Create and delete access check
* @throws Throwable
*/
@Test
public void testCreateAccessCheckPositive() throws Throwable {
AzureBlobStorageTestAccount testAccount = createTestAccount();
NativeAzureFileSystem fs = testAccount.getFileSystem();
Path parentDir = new Path("/testCreateAccessCheckPositive");
Path testPath = new Path(parentDir, "test.dat");
MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
authorizer.init(null);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
fs.create(testPath);
ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
fs.delete(parentDir, true);
}
/**
* Negative test to verify Create access check
* @throws Throwable
*/
@Test // (expected=WasbAuthorizationException.class)
public void testCreateAccessCheckNegative() throws Throwable {
expectedEx.expect(WasbAuthorizationException.class);
expectedEx.expectMessage("create operation for Path : /testCreateAccessCheckNegative/test.dat not allowed");
AzureBlobStorageTestAccount testAccount = createTestAccount();
NativeAzureFileSystem fs = testAccount.getFileSystem();
Path parentDir = new Path("/testCreateAccessCheckNegative");
Path testPath = new Path(parentDir, "test.dat");
MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
authorizer.init(null);
authorizer.addAuthRule(testPath.toString(),WasbAuthorizationOperations.WRITE.toString(), false);
authorizer.addAuthRule(parentDir.toString(),WasbAuthorizationOperations.EXECUTE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
try {
fs.create(testPath);
}
finally {
fs.delete(parentDir, true);
}
}
/**
* Positive test to verify Create and delete access check
* @throws Throwable
*/
@Test
public void testListAccessCheckPositive() throws Throwable {
AzureBlobStorageTestAccount testAccount = createTestAccount();
NativeAzureFileSystem fs = testAccount.getFileSystem();
Path parentDir = new Path("/testListAccessCheckPositive");
Path testPath = new Path(parentDir, "test.dat");
MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
authorizer.init(null);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
fs.create(testPath);
ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
try {
fs.listStatus(testPath);
}
finally {
fs.delete(parentDir, true);
}
}
/**
* Negative test to verify Create access check
* @throws Throwable
*/
@Test //(expected=WasbAuthorizationException.class)
public void testListAccessCheckNegative() throws Throwable {
expectedEx.expect(WasbAuthorizationException.class);
expectedEx.expectMessage("getFileStatus operation for Path : /testListAccessCheckNegative/test.dat not allowed");
AzureBlobStorageTestAccount testAccount = createTestAccount();
NativeAzureFileSystem fs = testAccount.getFileSystem();
Path parentDir = new Path("/testListAccessCheckNegative");
Path testPath = new Path(parentDir, "test.dat");
MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
authorizer.init(null);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), false);
authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
fs.create(testPath);
ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
try {
fs.listStatus(testPath);
}
finally {
fs.delete(parentDir, true);
}
}
/**
* Positive test to verify rename access check.
* @throws Throwable
*/
@Test
public void testRenameAccessCheckPositive() throws Throwable {
AzureBlobStorageTestAccount testAccount = createTestAccount();
NativeAzureFileSystem fs = testAccount.getFileSystem();
Path parentDir = new Path("/testRenameAccessCheckPositive");
Path testPath = new Path(parentDir, "test.dat");
Path renamePath = new Path(parentDir, "test2.dat");
MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
authorizer.init(null);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
authorizer.addAuthRule(renamePath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
fs.create(testPath);
ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", testPath);
try {
fs.rename(testPath, renamePath);
ContractTestUtils.assertPathExists(fs, "destPath does not exist", renamePath);
}
finally {
fs.delete(parentDir, true);
}
}
/**
* Negative test to verify rename access check.
* @throws Throwable
*/
@Test //(expected=WasbAuthorizationException.class)
public void testRenameAccessCheckNegative() throws Throwable {
expectedEx.expect(WasbAuthorizationException.class);
expectedEx.expectMessage("rename operation for Path : /testRenameAccessCheckNegative/test.dat not allowed");
AzureBlobStorageTestAccount testAccount = createTestAccount();
NativeAzureFileSystem fs = testAccount.getFileSystem();
Path parentDir = new Path("/testRenameAccessCheckNegative");
Path testPath = new Path(parentDir, "test.dat");
Path renamePath = new Path(parentDir, "test2.dat");
MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
authorizer.init(null);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
// set EXECUTE to true for initial assert right after creation.
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
fs.create(testPath);
ContractTestUtils.assertPathExists(fs, "sourcePath does not exist", testPath);
// Set EXECUTE to false for actual rename-failure test
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), false);
fs.updateWasbAuthorizer(authorizer);
try {
fs.rename(testPath, renamePath);
ContractTestUtils.assertPathExists(fs, "destPath does not exist", renamePath);
} finally {
fs.delete(parentDir, true);
}
}
/**
* Positive test for read access check.
* @throws Throwable
*/
@Test
public void testReadAccessCheckPositive() throws Throwable {
AzureBlobStorageTestAccount testAccount = createTestAccount();
NativeAzureFileSystem fs = testAccount.getFileSystem();
Path parentDir = new Path("/testReadAccessCheckPositive");
Path testPath = new Path(parentDir, "test.dat");
MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
authorizer.init(null);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), true);
authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
FSDataOutputStream fso = fs.create(testPath);
String data = "Hello World";
fso.writeBytes(data);
fso.close();
ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
FSDataInputStream inputStream = null;
try {
inputStream = fs.open(testPath);
ContractTestUtils.verifyRead(inputStream, data.getBytes(), 0, data.length());
}
finally {
if(inputStream != null) {
inputStream.close();
}
fs.delete(parentDir, true);
}
}
/**
* Negative test to verify read access check.
* @throws Throwable
*/
@Test //(expected=WasbAuthorizationException.class)
public void testReadAccessCheckNegative() throws Throwable {
expectedEx.expect(WasbAuthorizationException.class);
expectedEx.expectMessage("read operation for Path : /testReadAccessCheckNegative/test.dat not allowed");
AzureBlobStorageTestAccount testAccount = createTestAccount();
NativeAzureFileSystem fs = testAccount.getFileSystem();
Path parentDir = new Path("/testReadAccessCheckNegative");
Path testPath = new Path(parentDir, "test.dat");
MockWasbAuthorizerImpl authorizer = new MockWasbAuthorizerImpl();
authorizer.init(null);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.WRITE.toString(), true);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
authorizer.addAuthRule(testPath.toString(), WasbAuthorizationOperations.READ.toString(), false);
authorizer.addAuthRule(parentDir.toString(), WasbAuthorizationOperations.EXECUTE.toString(), true);
fs.updateWasbAuthorizer(authorizer);
FSDataOutputStream fso = fs.create(testPath);
String data = "Hello World";
fso.writeBytes(data);
fso.close();
ContractTestUtils.assertPathExists(fs, "testPath does not exist", testPath);
FSDataInputStream inputStream = null;
try {
inputStream = fs.open(testPath);
ContractTestUtils.verifyRead(inputStream, data.getBytes(), 0, data.length());
} finally {
if (inputStream != null) {
inputStream.close();
}
fs.delete(parentDir, true);
}
}
}

View File

@ -0,0 +1,344 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.http.*;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import java.io.ByteArrayInputStream;
import java.nio.charset.StandardCharsets;
import static org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.KEY_USE_SECURE_MODE;
/**
* Test class to hold all WasbRemoteCallHelper tests
*/
public class TestWasbRemoteCallHelper
extends AbstractWasbTestBase {
@Override
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
Configuration conf = new Configuration();
conf.set(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, "true");
conf.set(RemoteWasbAuthorizerImpl.KEY_REMOTE_AUTH_SERVICE_URL, "http://localhost/");
return AzureBlobStorageTestAccount.create(conf);
}
@Before
public void beforeMethod() {
boolean useSecureMode = fs.getConf().getBoolean(KEY_USE_SECURE_MODE, false);
boolean useAuthorization = fs.getConf().getBoolean(NativeAzureFileSystem.KEY_AZURE_AUTHORIZATION, false);
Assume.assumeTrue("Test valid when both SecureMode and Authorization are enabled .. skipping",
useSecureMode && useAuthorization);
Assume.assumeTrue(
useSecureMode && useAuthorization
);
}
@Rule
public ExpectedException expectedEx = ExpectedException.none();
/**
* Test invalid status-code
* @throws Throwable
*/
@Test // (expected = WasbAuthorizationException.class)
public void testInvalidStatusCode() throws Throwable {
setupExpectations();
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(999));
// finished setting up mocks
performop(mockHttpClient);
}
/**
* Test invalid Content-Type
* @throws Throwable
*/
@Test // (expected = WasbAuthorizationException.class)
public void testInvalidContentType() throws Throwable {
setupExpectations();
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "text/plain"));
// finished setting up mocks
performop(mockHttpClient);
}
/**
* Test missing Content-Length
* @throws Throwable
*/
@Test // (expected = WasbAuthorizationException.class)
public void testMissingContentLength() throws Throwable {
setupExpectations();
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
// finished setting up mocks
performop(mockHttpClient);
}
/**
* Test Content-Length exceeds max
* @throws Throwable
*/
@Test // (expected = WasbAuthorizationException.class)
public void testContentLengthExceedsMax() throws Throwable {
setupExpectations();
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
.thenReturn(newHeader("Content-Length", "2048"));
// finished setting up mocks
performop(mockHttpClient);
}
/**
* Test invalid Content-Length value
* @throws Throwable
*/
@Test // (expected = WasbAuthorizationException.class)
public void testInvalidContentLengthValue() throws Throwable {
setupExpectations();
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
.thenReturn(newHeader("Content-Length", "20abc48"));
// finished setting up mocks
performop(mockHttpClient);
}
/**
* Test valid JSON response
* @throws Throwable
*/
@Test
public void testValidJSONResponse() throws Throwable {
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
.thenReturn(newHeader("Content-Length", "1024"));
Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
Mockito.when(mockHttpEntity.getContent())
.thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)))
.thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)))
.thenReturn(new ByteArrayInputStream(validJsonResponse().getBytes(StandardCharsets.UTF_8)));
// finished setting up mocks
performop(mockHttpClient);
}
/**
* Test malformed JSON response
* @throws Throwable
*/
@Test // (expected = WasbAuthorizationException.class)
public void testMalFormedJSONResponse() throws Throwable {
expectedEx.expect(WasbAuthorizationException.class);
expectedEx.expectMessage("com.fasterxml.jackson.core.JsonParseException: Unexpected end-of-input in FIELD_NAME");
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
.thenReturn(newHeader("Content-Length", "1024"));
Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
Mockito.when(mockHttpEntity.getContent())
.thenReturn(new ByteArrayInputStream(malformedJsonResponse().getBytes(StandardCharsets.UTF_8)));
// finished setting up mocks
performop(mockHttpClient);
}
/**
* Test valid JSON response failure response code
* @throws Throwable
*/
@Test // (expected = WasbAuthorizationException.class)
public void testFailureCodeJSONResponse() throws Throwable {
expectedEx.expect(WasbAuthorizationException.class);
expectedEx.expectMessage("Remote authorization service encountered an error Unauthorized");
// set up mocks
HttpClient mockHttpClient = Mockito.mock(HttpClient.class);
HttpResponse mockHttpResponse = Mockito.mock(HttpResponse.class);
HttpEntity mockHttpEntity = Mockito.mock(HttpEntity.class);
Mockito.when(mockHttpClient.execute(Mockito.<HttpGet>any())).thenReturn(mockHttpResponse);
Mockito.when(mockHttpResponse.getStatusLine()).thenReturn(newStatusLine(200));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Type"))
.thenReturn(newHeader("Content-Type", "application/json"));
Mockito.when(mockHttpResponse.getFirstHeader("Content-Length"))
.thenReturn(newHeader("Content-Length", "1024"));
Mockito.when(mockHttpResponse.getEntity()).thenReturn(mockHttpEntity);
Mockito.when(mockHttpEntity.getContent())
.thenReturn(new ByteArrayInputStream(failureCodeJsonResponse().getBytes(StandardCharsets.UTF_8)));
// finished setting up mocks
performop(mockHttpClient);
}
private void setupExpectations() {
expectedEx.expect(WasbAuthorizationException.class);
expectedEx.expectMessage("org.apache.hadoop.fs.azure.WasbRemoteCallException: " +
"http://localhost/CHECK_AUTHORIZATION?wasb_absolute_path=%2Ftest.dat&" +
"operation_type=write&delegation_token:Encountered IOException while making remote call");
}
private void performop(HttpClient mockHttpClient) throws Throwable {
AzureBlobStorageTestAccount testAccount = createTestAccount();
NativeAzureFileSystem fs = testAccount.getFileSystem();
Path testPath = new Path("/", "test.dat");
RemoteWasbAuthorizerImpl authorizer = new RemoteWasbAuthorizerImpl();
authorizer.init(fs.getConf());
WasbRemoteCallHelper mockWasbRemoteCallHelper = new WasbRemoteCallHelper();
mockWasbRemoteCallHelper.updateHttpClient(mockHttpClient);
authorizer.updateWasbRemoteCallHelper(mockWasbRemoteCallHelper);
fs.updateWasbAuthorizer(authorizer);
fs.create(testPath);
ContractTestUtils.assertPathExists(fs, "testPath was not created", testPath);
fs.delete(testPath, false);
}
private String validJsonResponse() {
return new String(
"{\"responseCode\": 0, \"authorizationResult\": true, \"responseMessage\": \"Authorized\"}"
);
}
private String malformedJsonResponse() {
return new String(
"{\"responseCode\": 0, \"authorizationResult\": true, \"responseMessage\":"
);
}
private String failureCodeJsonResponse() {
return new String(
"{\"responseCode\": 1, \"authorizationResult\": false, \"responseMessage\": \"Unauthorized\"}"
);
}
private StatusLine newStatusLine(int statusCode) {
return new StatusLine() {
@Override
public ProtocolVersion getProtocolVersion() {
return new ProtocolVersion("HTTP", 1, 1);
}
@Override
public int getStatusCode() {
return statusCode;
}
@Override
public String getReasonPhrase() {
return "Reason Phrase";
}
};
}
private Header newHeader(String name, String value) {
return new Header() {
@Override
public String getName() {
return name;
}
@Override
public String getValue() {
return value;
}
@Override
public HeaderElement[] getElements() throws ParseException {
return new HeaderElement[0];
}
};
}
}

View File

@ -19,27 +19,21 @@
<!-- For tests against live azure, provide the following account information -->
<!--
<property>
<name>fs.azure.test.account.name</name>
<value>{ACCOUNTNAME}.blob.core.windows.net</value>
</property>
<property>
<name>fs.azure.account.key.{ACCOUNTNAME}.blob.core.windows.net</name>
<value>{ACCOUNTKEY}</value>
</property>
<property>
<name>fs.azure.test.account.name</name>
<value>{ACCOUNTNAME}.blob.core.windows.net</value>
</property>
<property>
<name>fs.azure.account.key.{ACCOUNTNAME}.blob.core.windows.net</name>
<value>{ACCOUNTKEY}</value>
</property>
-->
<property>
<name>fs.azure.secure.mode</name>
<value>false</value>
</property>
<property>
<name>fs.azure.local.sas.key.mode</name>
<value>false</value>
</property>
<property>
<name>fs.azure.cred.service.url</name>
<value>{CRED_SERIVCE_URL}</value>
</property>
-->
<!-- Save the above configuration properties in a separate file named -->
<!-- azure-auth-keys.xml in the same directory as this file. -->