HADOOP-16730: ABFS: Support for Shared Access Signatures (SAS). Contributed by Sneha Vijayarajan.

This commit is contained in:
Sneha Vijayarajan 2020-02-27 17:00:15 +00:00 committed by Thomas Marquardt
parent cd2c6b1aac
commit 791270a2e5
No known key found for this signature in database
GPG Key ID: AEB30C9E78868287
22 changed files with 798 additions and 493 deletions

View File

@ -20,10 +20,10 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@ -40,15 +40,15 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException;
import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizer;
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter;
@ -170,9 +170,6 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_DELEGATION_TOKEN)
private boolean enableDelegationToken;
@StringConfigurationValidatorAnnotation(ConfigurationKey = ABFS_EXTERNAL_AUTHORIZATION_CLASS,
DefaultValue = "")
private String abfsExternalAuthorizationClass;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ALWAYS_USE_HTTPS,
DefaultValue = DEFAULT_ENABLE_HTTPS)
@ -217,6 +214,14 @@ public class AbfsConfiguration{
}
}
/**
* Gets the Azure Storage account name corresponding to this instance of configuration.
* @return the Azure Storage account name
*/
public String getAccountName() {
return accountName;
}
/**
* Appends an account name to a configuration key yielding the
* account-specific form.
@ -436,7 +441,9 @@ public class AbfsConfiguration{
}
public boolean getCreateRemoteFileSystemDuringInitialization() {
return this.createRemoteFileSystemDuringInitialization;
// we do not support creating the filesystem when AuthType is SAS
return this.createRemoteFileSystemDuringInitialization
&& this.getAuthType(this.accountName) != AuthType.SAS;
}
public boolean getSkipUserGroupMetadataDuringInitialization() {
@ -578,35 +585,32 @@ public class AbfsConfiguration{
}
}
public String getAbfsExternalAuthorizationClass() {
return this.abfsExternalAuthorizationClass;
}
public AbfsAuthorizer getAbfsAuthorizer() throws IOException {
String authClassName = getAbfsExternalAuthorizationClass();
AbfsAuthorizer authorizer = null;
public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemException {
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
if (authType != AuthType.SAS) {
throw new SASTokenProviderException(String.format(
"Invalid auth type: %s is being used, expecting SAS", authType));
}
try {
if (authClassName != null && !authClassName.isEmpty()) {
@SuppressWarnings("unchecked")
Class<AbfsAuthorizer> authClass = (Class<AbfsAuthorizer>) rawConfig.getClassByName(authClassName);
authorizer = authClass.getConstructor(new Class[] {Configuration.class}).newInstance(rawConfig);
LOG.trace("Initializing {}", authClassName);
authorizer.init();
LOG.trace("{} init complete", authClassName);
}
} catch (
IllegalAccessException
| InstantiationException
| ClassNotFoundException
| IllegalArgumentException
| InvocationTargetException
| NoSuchMethodException
| SecurityException
| AbfsAuthorizationException e) {
throw new IOException(e);
String configKey = FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
Class<? extends SASTokenProvider> sasTokenProviderClass =
getClass(configKey, null, SASTokenProvider.class);
Preconditions.checkArgument(sasTokenProviderClass != null,
String.format("The configuration value for \"%s\" is invalid.", configKey));
SASTokenProvider sasTokenProvider = ReflectionUtils
.newInstance(sasTokenProviderClass, rawConfig);
Preconditions.checkArgument(sasTokenProvider != null,
String.format("Failed to initialize %s", sasTokenProviderClass));
LOG.trace("Initializing {}", sasTokenProviderClass.getName());
sasTokenProvider.initialize(rawConfig, accountName);
LOG.trace("{} init complete", sasTokenProviderClass.getName());
return sasTokenProvider;
} catch (Exception e) {
throw new TokenAccessProviderException("Unable to load SAS token provider class: " + e, e);
}
return authorizer;
}
void validateStorageAccountKeys() throws InvalidConfigurationValueException {

View File

@ -29,7 +29,6 @@ import java.net.URISyntaxException;
import java.util.Hashtable;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@ -66,9 +65,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException;
import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizer;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
@ -96,7 +94,6 @@ public class AzureBlobFileSystem extends FileSystem {
private boolean delegationTokenEnabled = false;
private AbfsDelegationTokenManager delegationTokenManager;
private AbfsAuthorizer authorizer;
@Override
public void initialize(URI uri, Configuration configuration)
@ -139,9 +136,6 @@ public class AzureBlobFileSystem extends FileSystem {
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
// Initialize ABFS authorizer
//
this.authorizer = abfsConfiguration.getAbfsAuthorizer();
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
}
@ -170,7 +164,6 @@ public class AzureBlobFileSystem extends FileSystem {
LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize);
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.READ, qualifiedPath);
try {
InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, statistics);
@ -193,7 +186,6 @@ public class AzureBlobFileSystem extends FileSystem {
trailingPeriodCheck(f);
Path qualifiedPath = makeQualified(f);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite,
@ -256,7 +248,6 @@ public class AzureBlobFileSystem extends FileSystem {
bufferSize);
Path qualifiedPath = makeQualified(f);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false);
@ -315,7 +306,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
qualifiedDstPath = makeQualified(adjustedDst);
performAbfsAuthCheck(FsAction.READ_WRITE, qualifiedSrcPath, qualifiedDstPath);
abfsStore.rename(qualifiedSrcPath, qualifiedDstPath);
return true;
@ -340,7 +330,6 @@ public class AzureBlobFileSystem extends FileSystem {
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive);
Path qualifiedPath = makeQualified(f);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
if (f.isRoot()) {
if (!recursive) {
@ -366,7 +355,6 @@ public class AzureBlobFileSystem extends FileSystem {
"AzureBlobFileSystem.listStatus path: {}", f.toString());
Path qualifiedPath = makeQualified(f);
performAbfsAuthCheck(FsAction.READ, qualifiedPath);
try {
FileStatus[] result = abfsStore.listStatus(qualifiedPath);
@ -416,7 +404,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
Path qualifiedPath = makeQualified(f);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
abfsStore.createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission,
@ -445,7 +432,6 @@ public class AzureBlobFileSystem extends FileSystem {
LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f);
Path qualifiedPath = makeQualified(f);
performAbfsAuthCheck(FsAction.READ, qualifiedPath);
try {
return abfsStore.getFileStatus(qualifiedPath);
@ -627,7 +613,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
abfsStore.setOwner(qualifiedPath,
@ -657,9 +642,6 @@ public class AzureBlobFileSystem extends FileSystem {
throw new IllegalArgumentException("A valid name and value must be specified.");
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.READ_WRITE, qualifiedPath);
try {
Hashtable<String, String> properties = abfsStore.getPathStatus(path);
String xAttrName = ensureValidAttributeName(name);
@ -693,9 +675,6 @@ public class AzureBlobFileSystem extends FileSystem {
throw new IllegalArgumentException("A valid name must be specified.");
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.READ, qualifiedPath);
byte[] value = null;
try {
Hashtable<String, String> properties = abfsStore.getPathStatus(path);
@ -735,7 +714,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
abfsStore.setPermission(qualifiedPath,
@ -771,7 +749,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
abfsStore.modifyAclEntries(qualifiedPath,
@ -805,7 +782,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
abfsStore.removeAclEntries(qualifiedPath, aclSpec);
@ -831,7 +807,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
abfsStore.removeDefaultAcl(qualifiedPath);
@ -859,7 +834,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
abfsStore.removeAcl(qualifiedPath);
@ -894,7 +868,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.WRITE, qualifiedPath);
try {
abfsStore.setAcl(qualifiedPath, aclSpec);
@ -921,7 +894,6 @@ public class AzureBlobFileSystem extends FileSystem {
}
Path qualifiedPath = makeQualified(path);
performAbfsAuthCheck(FsAction.READ, qualifiedPath);
try {
return abfsStore.getAclStatus(qualifiedPath);
@ -1107,6 +1079,8 @@ public class AzureBlobFileSystem extends FileSystem {
} else {
throw ere;
}
} else if (exception instanceof SASTokenProviderException) {
throw exception;
} else {
if (path == null) {
throw exception;
@ -1208,32 +1182,6 @@ public class AzureBlobFileSystem extends FileSystem {
return abfsStore.getIsNamespaceEnabled();
}
/**
* Use ABFS authorizer to check if user is authorized to perform specific
* {@link FsAction} on specified {@link Path}s.
*
* @param action The {@link FsAction} being requested on the provided {@link Path}s.
* @param paths The absolute paths of the storage being accessed.
* @throws AbfsAuthorizationException on authorization failure.
* @throws IOException network problems or similar.
* @throws IllegalArgumentException if the required parameters are not provided.
*/
private void performAbfsAuthCheck(FsAction action, Path... paths)
throws AbfsAuthorizationException, IOException {
if (authorizer == null) {
LOG.debug("ABFS authorizer is not initialized. No authorization check will be performed.");
} else {
Preconditions.checkArgument(paths.length > 0, "no paths supplied for authorization check");
LOG.debug("Auth check for action: {} on paths: {}", action.toString(), Arrays.toString(paths));
if (!authorizer.isAuthorized(action, paths)) {
throw new AbfsAuthorizationException(
"User is not authorized for action " + action.toString()
+ " on paths: " + Arrays.toString(paths));
}
}
}
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {

View File

@ -71,6 +71,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
@ -1130,8 +1131,9 @@ public class AzureBlobFileSystemStore implements Closeable {
SharedKeyCredentials creds = null;
AccessTokenProvider tokenProvider = null;
SASTokenProvider sasTokenProvider = null;
if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) {
if (authType == AuthType.SharedKey) {
LOG.trace("Fetching SharedKey credentials");
int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
if (dotIndex <= 0) {
@ -1140,6 +1142,9 @@ public class AzureBlobFileSystemStore implements Closeable {
}
creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
abfsConfiguration.getStorageAccountKey());
} else if (authType == AuthType.SAS) {
LOG.trace("Fetching SAS token provider");
sasTokenProvider = abfsConfiguration.getSASTokenProvider();
} else {
LOG.trace("Fetching token provider");
tokenProvider = abfsConfiguration.getTokenProvider();
@ -1148,9 +1153,15 @@ public class AzureBlobFileSystemStore implements Closeable {
}
LOG.trace("Initializing AbfsClient for {}", baseUrl);
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
tokenProvider, abfsPerfTracker);
if (tokenProvider != null) {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
tokenProvider, abfsPerfTracker);
} else {
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
sasTokenProvider, abfsPerfTracker);
}
LOG.trace("AbfsClient init complete");
}

View File

@ -128,7 +128,8 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ENABLE_DELEGATION_TOKEN = "fs.azure.enable.delegation.token";
public static final String FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE = "fs.azure.delegation.token.provider.type";
public static final String ABFS_EXTERNAL_AUTHORIZATION_CLASS = "abfs.external.authorization.class";
/** Key for SAS token provider **/
public static final String FS_AZURE_SAS_TOKEN_PROVIDER_TYPE = "fs.azure.sas.token.provider.type";
private ConfigurationKeys() {}
}

View File

@ -16,26 +16,23 @@
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Exception raised on ABFS Authorization failures.
* Thrown if there is an error instantiating the SASTokenProvider or getting
* a SAS token.
*/
public class AbfsAuthorizationException extends IOException {
@InterfaceAudience.Private
public class SASTokenProviderException extends AzureBlobFileSystemException {
private static final long serialVersionUID = 1L;
public AbfsAuthorizationException(String message, Exception e) {
super(message, e);
}
public AbfsAuthorizationException(String message) {
public SASTokenProviderException(String message) {
super(message);
}
public AbfsAuthorizationException(Throwable e) {
super(e);
public SASTokenProviderException(String message, Throwable cause) {
super(message);
initCause(cause);
}
}

View File

@ -1,57 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
/**
* Interface to support authorization in Azure Blob File System.
*/
@InterfaceAudience.LimitedPrivate("authorization-subsystems")
@InterfaceStability.Unstable
public interface AbfsAuthorizer {
/**
* Initialize authorizer for Azure Blob File System.
*
* @throws AbfsAuthorizationException if unable to initialize the authorizer.
* @throws IOException network problems or similar.
* @throws IllegalArgumentException if the required parameters are not provided.
*/
void init() throws AbfsAuthorizationException, IOException;
/**
* Checks if the provided {@link FsAction} is allowed on the provided {@link Path}s.
*
* @param action the {@link FsAction} being requested on the provided {@link Path}s.
* @param absolutePaths The absolute paths of the storage being accessed.
* @return true if authorized, otherwise false.
* @throws AbfsAuthorizationException on authorization failure.
* @throws IOException network problems or similar.
* @throws IllegalArgumentException if the required parameters are not provided.
*/
boolean isAuthorized(FsAction action, Path... absolutePaths)
throws AbfsAuthorizationException, IOException;
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
/**
* Interface to support SAS authorization.
*/
@InterfaceAudience.LimitedPrivate("authorization-subsystems")
@InterfaceStability.Unstable
public interface SASTokenProvider {
String CONCAT_SOURCE_OPERATION = "concat-source";
String CONCAT_TARGET_OPERATION = "concat-target";
String CREATEFILE_OPERATION = "create";
String DELETE_OPERATION = "delete";
String EXECUTE_OPERATION = "execute";
String GETACL_OPERATION = "getaclstatus";
String GETFILESTATUS_OPERATION = "getfilestatus";
String LISTSTATUS_OPERATION = "liststatus";
String MKDIR_OPERATION = "mkdir";
String READ_OPERATION = "read";
String RENAME_SOURCE_OPERATION = "rename-source";
String RENAME_DESTINATION_OPERATION = "rename-destination";
String SETACL_OPERATION = "setacl";
String SETOWNER_OPERATION = "setowner";
String SETPERMISSION_OPERATION = "setpermission";
String APPEND_OPERATION = "write";
/**
* Initialize authorizer for Azure Blob File System.
* @param configuration Configuration object
* @param accountName Account Name
* @throws IOException network problems or similar.
*/
void initialize(Configuration configuration, String accountName)
throws IOException;
/**
* Invokes the authorizer to obtain a SAS token.
*
* @param account the name of the storage account.
* @param fileSystem the name of the fileSystem.
* @param path the file or directory path.
* @param operation the operation to be performed on the path.
* @return a SAS token to perform the request operation.
* @throws IOException if there is a network error.
* @throws AccessControlException if access is denied.
*/
String getSASToken(String account, String fileSystem, String path,
String operation) throws IOException, AccessControlException;
}

View File

@ -38,7 +38,9 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.io.IOUtils;
@ -62,13 +64,14 @@ public class AbfsClient implements Closeable {
private final String userAgent;
private final AbfsPerfTracker abfsPerfTracker;
private final AccessTokenProvider tokenProvider;
private final String accountName;
private final AuthType authType;
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final AccessTokenProvider tokenProvider,
final AbfsPerfTracker abfsPerfTracker) {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
@ -76,6 +79,8 @@ public class AbfsClient implements Closeable {
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
this.abfsConfiguration = abfsConfiguration;
this.retryPolicy = exponentialRetryPolicy;
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
String sslProviderName = null;
@ -93,10 +98,27 @@ public class AbfsClient implements Closeable {
}
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.tokenProvider = tokenProvider;
this.abfsPerfTracker = abfsPerfTracker;
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final AccessTokenProvider tokenProvider,
final AbfsPerfTracker abfsPerfTracker) {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
this.tokenProvider = tokenProvider;
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final ExponentialRetryPolicy exponentialRetryPolicy,
final SASTokenProvider sasTokenProvider,
final AbfsPerfTracker abfsPerfTracker) {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
this.sasTokenProvider = sasTokenProvider;
}
@Override
public void close() throws IOException {
if (tokenProvider instanceof Closeable) {
@ -191,6 +213,7 @@ public class AbfsClient implements Closeable {
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
appendSASTokenToQuery(relativePath, SASTokenProvider.LISTSTATUS_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -255,6 +278,11 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
String operation = isFile
? SASTokenProvider.CREATEFILE_OPERATION
: SASTokenProvider.MKDIR_OPERATION;
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CreatePath,
@ -266,16 +294,24 @@ public class AbfsClient implements Closeable {
return op;
}
public AbfsRestOperation renamePath(final String source, final String destination, final String continuation)
public AbfsRestOperation renamePath(String source, final String destination, final String continuation)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source);
String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source);
if (authType == AuthType.SAS) {
final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder);
encodedRenameSource += srcQueryBuilder.toString();
}
LOG.trace("Rename source queryparam added {}", encodedRenameSource);
requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource));
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -299,6 +335,7 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -324,6 +361,7 @@ public class AbfsClient implements Closeable {
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
appendSASTokenToQuery(path, SASTokenProvider.APPEND_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -365,6 +403,7 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
appendSASTokenToQuery(path, SASTokenProvider.GETFILESTATUS_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -385,6 +424,7 @@ public class AbfsClient implements Closeable {
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
@ -409,6 +449,7 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
appendSASTokenToQuery(path, SASTokenProvider.DELETE_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -438,6 +479,7 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
appendSASTokenToQuery(path, SASTokenProvider.SETOWNER_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -462,6 +504,7 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
appendSASTokenToQuery(path, SASTokenProvider.SETPERMISSION_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -494,6 +537,7 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
appendSASTokenToQuery(path, SASTokenProvider.SETACL_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -516,6 +560,7 @@ public class AbfsClient implements Closeable {
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL);
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(useUPN));
appendSASTokenToQuery(path, SASTokenProvider.GETACL_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
@ -550,6 +595,34 @@ public class AbfsClient implements Closeable {
return op;
}
/**
* If configured for SAS AuthType, appends SAS token to queryBuilder
* @param path
* @param operation
* @param queryBuilder
* @throws SASTokenProviderException
*/
private void appendSASTokenToQuery(String path, String operation, AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException {
if (this.authType == AuthType.SAS) {
try {
LOG.trace("Fetch SAS token for {} on {}", operation, path);
String sasToken = sasTokenProvider.getSASToken(this.accountName,
this.filesystem, path, operation);
if ((sasToken == null) || sasToken.isEmpty()) {
throw new UnsupportedOperationException("SASToken received is empty or null");
}
queryBuilder.setSASToken(sasToken);
LOG.trace("SAS token fetch complete for {} on {}", operation, path);
} catch (Exception ex) {
throw new SASTokenProviderException(String.format("Failed to acquire a SAS token for %s on %s due to %s",
operation,
path,
ex.toString()));
}
}
}
private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
return createRequestUrl(EMPTY_STRING, query);
}
@ -600,6 +673,10 @@ public class AbfsClient implements Closeable {
}
}
public AuthType getAuthType() {
return authType;
}
@VisibleForTesting
String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
final String sslProviderName) {
@ -634,4 +711,9 @@ public class AbfsClient implements Closeable {
URL getBaseUrl() {
return baseUrl;
}
@VisibleForTesting
public SASTokenProvider getSasTokenProvider() {
return this.sasTokenProvider;
}
}

View File

@ -160,18 +160,26 @@ public class AbfsRestOperation {
// initialize the HTTP request and open the connection
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
// sign the HTTP request
if (client.getAccessToken() == null) {
LOG.debug("Signing request with shared key");
// sign the HTTP request
client.getSharedKeyCredentials().signRequest(
httpOperation.getConnection(),
hasRequestBody ? bufferLength : 0);
} else {
LOG.debug("Authenticating request with OAuth2 access token");
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
client.getAccessToken());
switch(client.getAuthType()) {
case Custom:
case OAuth:
LOG.debug("Authenticating request with OAuth2 access token");
httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION,
client.getAccessToken());
break;
case SAS:
// do nothing; the SAS token should already be appended to the query string
break;
case SharedKey:
// sign the HTTP request
LOG.debug("Signing request with shared key");
// sign the HTTP request
client.getSharedKeyCredentials().signRequest(
httpOperation.getConnection(),
hasRequestBody ? bufferLength : 0);
break;
}
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties());

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
*/
public class AbfsUriQueryBuilder {
private Map<String, String> parameters;
private String sasToken = null;
public AbfsUriQueryBuilder() {
this.parameters = new HashMap<>();
@ -40,6 +41,10 @@ public class AbfsUriQueryBuilder {
}
}
public void setSASToken(final String sasToken) {
this.sasToken = sasToken;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
@ -59,6 +64,16 @@ public class AbfsUriQueryBuilder {
throw new IllegalArgumentException("Query string param is not encode-able: " + entry.getKey() + "=" + entry.getValue());
}
}
// append SAS Token
if (sasToken != null) {
if (first) {
sb.append(AbfsHttpConstants.QUESTION_MARK);
} else {
sb.append(AbfsHttpConstants.AND_MARK);
}
sb.append(sasToken);
}
return sb.toString();
}
}
}

View File

@ -23,5 +23,6 @@ package org.apache.hadoop.fs.azurebfs.services;
public enum AuthType {
SharedKey,
OAuth,
Custom
Custom,
SAS
}

View File

@ -626,7 +626,7 @@ points for third-parties to integrate their authentication and authorization
services into the ABFS client.
* `CustomDelegationTokenManager` : adds ability to issue Hadoop Delegation Tokens.
* `AbfsAuthorizer` permits client-side authorization of file operations.
* `SASTokenProvider`: allows for custom provision of Azure Storage Shared Access Signature (SAS) tokens.
* `CustomTokenProviderAdaptee`: allows for custom provision of
Azure OAuth tokens.
* `KeyProvider`.

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
import org.apache.hadoop.fs.contract.ContractTestUtils;
@ -73,6 +74,7 @@ public abstract class AbstractAbfsIntegrationTest extends
private String accountName;
private String testUrl;
private AuthType authType;
private boolean useConfiguredFileSystem = false;
protected AbstractAbfsIntegrationTest() throws Exception {
fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
@ -134,7 +136,9 @@ public abstract class AbstractAbfsIntegrationTest extends
createFileSystem();
// Only live account without namespace support can run ABFS&WASB compatibility tests
if (!isIPAddress && !abfs.getIsNamespaceEnabled()) {
if (!isIPAddress
&& (abfsConfig.getAuthType(accountName) != AuthType.SAS)
&& !abfs.getIsNamespaceEnabled()) {
final URI wasbUri = new URI(abfsUrlToWasbUrl(getTestUrl()));
final AzureNativeFileSystemStore azureNativeFileSystemStore =
new AzureNativeFileSystemStore();
@ -167,19 +171,21 @@ public abstract class AbstractAbfsIntegrationTest extends
return;
}
final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore();
abfsStore.deleteFilesystem();
// Delete all uniquely created filesystem from the account
if (!useConfiguredFileSystem) {
final AzureBlobFileSystemStore abfsStore = abfs.getAbfsStore();
abfsStore.deleteFilesystem();
AbfsRestOperationException ex = intercept(
AbfsRestOperationException.class,
new Callable<Hashtable<String, String>>() {
@Override
public Hashtable<String, String> call() throws Exception {
return abfsStore.getFilesystemProperties();
}
});
if (FILE_SYSTEM_NOT_FOUND.getStatusCode() != ex.getStatusCode()) {
LOG.warn("Deleted test filesystem may still exist: {}", abfs, ex);
AbfsRestOperationException ex = intercept(AbfsRestOperationException.class,
new Callable<Hashtable<String, String>>() {
@Override
public Hashtable<String, String> call() throws Exception {
return abfsStore.getFilesystemProperties();
}
});
if (FILE_SYSTEM_NOT_FOUND.getStatusCode() != ex.getStatusCode()) {
LOG.warn("Deleted test filesystem may still exist: {}", abfs, ex);
}
}
} catch (Exception e) {
LOG.warn("During cleanup: {}", e, e);
@ -189,6 +195,32 @@ public abstract class AbstractAbfsIntegrationTest extends
}
}
public void loadConfiguredFileSystem() throws Exception {
// disable auto-creation of filesystem
abfsConfig.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
false);
// AbstractAbfsIntegrationTest always uses a new instance of FileSystem,
// need to disable that and force filesystem provided in test configs.
String[] authorityParts =
(new URI(rawConfig.get(FS_AZURE_CONTRACT_TEST_URI))).getRawAuthority().split(
AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
this.fileSystemName = authorityParts[0];
// Reset URL with configured filesystem
final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName();
URI defaultUri = null;
defaultUri = new URI(abfsScheme, abfsUrl, null, null, null);
this.testUrl = defaultUri.toString();
abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
defaultUri.toString());
useConfiguredFileSystem = true;
}
public AzureBlobFileSystem getFileSystem() throws IOException {
return abfs;
}

View File

@ -62,7 +62,6 @@ public class ITestAbfsIdentityTransformer extends AbstractAbfsScaleTest{
public ITestAbfsIdentityTransformer() throws Exception {
super();
UserGroupInformation.reset();
userGroupInfo = UserGroupInformation.getCurrentUser();
localUser = userGroupInfo.getShortUserName();
localGroup = userGroupInfo.getPrimaryGroupName();

View File

@ -18,53 +18,106 @@
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import org.junit.Assume;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException;
import org.apache.hadoop.fs.azurebfs.extensions.MockSASTokenProvider;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import static org.apache.hadoop.fs.azurebfs.extensions.MockAbfsAuthorizer.*;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.MOCK_SASTOKENPROVIDER_FAIL_INIT;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN;
import static org.apache.hadoop.fs.azurebfs.utils.AclTestHelpers.aclEntry;
import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assume.assumeTrue;
/**
* Test Perform Authorization Check operation
*/
public class ITestAzureBlobFileSystemAuthorization extends AbstractAbfsIntegrationTest {
private static final Path TEST_READ_ONLY_FILE_PATH_0 = new Path(TEST_READ_ONLY_FILE_0);
private static final Path TEST_READ_ONLY_FOLDER_PATH = new Path(TEST_READ_ONLY_FOLDER);
private static final Path TEST_WRITE_ONLY_FILE_PATH_0 = new Path(TEST_WRITE_ONLY_FILE_0);
private static final Path TEST_WRITE_ONLY_FILE_PATH_1 = new Path(TEST_WRITE_ONLY_FILE_1);
private static final Path TEST_READ_WRITE_FILE_PATH_0 = new Path(TEST_READ_WRITE_FILE_0);
private static final Path TEST_READ_WRITE_FILE_PATH_1 = new Path(TEST_READ_WRITE_FILE_1);
private static final Path TEST_WRITE_ONLY_FOLDER_PATH = new Path(TEST_WRITE_ONLY_FOLDER);
private static final Path TEST_WRITE_THEN_READ_ONLY_PATH = new Path(TEST_WRITE_THEN_READ_ONLY);
private static final String TEST_AUTHZ_CLASS = "org.apache.hadoop.fs.azurebfs.extensions.MockAbfsAuthorizer";
private static final String TEST_AUTHZ_CLASS = "org.apache.hadoop.fs.azurebfs.extensions.MockSASTokenProvider";
private static final String TEST_ERR_AUTHZ_CLASS = "org.apache.hadoop.fs.azurebfs.extensions.MockErrorSASTokenProvider";
private static final String TEST_USER = UUID.randomUUID().toString();
private static final String TEST_GROUP = UUID.randomUUID().toString();
private static final String BAR = UUID.randomUUID().toString();
public ITestAzureBlobFileSystemAuthorization() throws Exception {
// The mock SAS token provider relies on the account key to generate SAS.
Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
}
@Override
public void setup() throws Exception {
this.getConfiguration().set(ConfigurationKeys.ABFS_EXTERNAL_AUTHORIZATION_CLASS, TEST_AUTHZ_CLASS);
boolean isHNSEnabled = this.getConfiguration().getBoolean(
TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
Assume.assumeTrue(isHNSEnabled);
loadConfiguredFileSystem();
this.getConfiguration().set(ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, TEST_AUTHZ_CLASS);
this.getConfiguration().set(ConfigurationKeys.FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, "SAS");
super.setup();
}
@Test
public void testSASTokenProviderInitializeException() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
final AzureBlobFileSystem testFs = new AzureBlobFileSystem();
Configuration testConfig = this.getConfiguration().getRawConfiguration();
testConfig.set(ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, TEST_ERR_AUTHZ_CLASS);
testConfig.set(MOCK_SASTOKENPROVIDER_FAIL_INIT, "true");
intercept(TokenAccessProviderException.class,
()-> {
testFs.initialize(fs.getUri(), this.getConfiguration().getRawConfiguration());
});
}
@Test
public void testSASTokenProviderEmptySASToken() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
final AzureBlobFileSystem testFs = new AzureBlobFileSystem();
Configuration testConfig = this.getConfiguration().getRawConfiguration();
testConfig.set(ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, TEST_ERR_AUTHZ_CLASS);
testConfig.set(MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN, "true");
testFs.initialize(fs.getUri(),
this.getConfiguration().getRawConfiguration());
intercept(SASTokenProviderException.class,
() -> {
testFs.create(new org.apache.hadoop.fs.Path("/testFile"));
});
}
@Test
public void testSASTokenProviderNullSASToken() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
final AzureBlobFileSystem testFs = new AzureBlobFileSystem();
Configuration testConfig = this.getConfiguration().getRawConfiguration();
testConfig.set(ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, TEST_ERR_AUTHZ_CLASS);
testFs.initialize(fs.getUri(), this.getConfiguration().getRawConfiguration());
intercept(SASTokenProviderException.class,
()-> {
testFs.create(new org.apache.hadoop.fs.Path("/testFile"));
});
}
@Test
public void testOpenFileWithInvalidPath() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
@ -76,291 +129,232 @@ public class ITestAzureBlobFileSystemAuthorization extends AbstractAbfsIntegrati
@Test
public void testOpenFileAuthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
fs.open(TEST_WRITE_THEN_READ_ONLY_PATH).close();
runTest(FileSystemOperations.Open, false);
}
@Test
public void testOpenFileUnauthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
intercept(AbfsAuthorizationException.class,
()-> {
fs.open(TEST_WRITE_ONLY_FILE_PATH_0).close();
});
runTest(FileSystemOperations.Open, true);
}
@Test
public void testCreateFileAuthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
runTest(FileSystemOperations.CreatePath, false);
}
@Test
public void testCreateFileUnauthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
intercept(AbfsAuthorizationException.class,
()-> {
fs.create(TEST_READ_ONLY_FILE_PATH_0).close();
});
runTest(FileSystemOperations.CreatePath, true);
}
@Test
public void testAppendFileAuthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
fs.append(TEST_WRITE_ONLY_FILE_PATH_0).close();
runTest(FileSystemOperations.Append, false);
}
@Test
public void testAppendFileUnauthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
intercept(AbfsAuthorizationException.class,
()-> {
fs.append(TEST_WRITE_THEN_READ_ONLY_PATH).close();
});
runTest(FileSystemOperations.Append, true);
}
@Test
public void testRenameAuthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.rename(TEST_READ_WRITE_FILE_PATH_0, TEST_READ_WRITE_FILE_PATH_1);
runTest(FileSystemOperations.RenamePath, false);
}
@Test
public void testRenameUnauthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
intercept(AbfsAuthorizationException.class,
()-> {
fs.rename(TEST_WRITE_ONLY_FILE_PATH_0, TEST_WRITE_ONLY_FILE_PATH_1);
});
runTest(FileSystemOperations.RenamePath, true);
}
@Test
public void testDeleteFileAuthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
fs.delete(TEST_WRITE_ONLY_FILE_PATH_0, false);
runTest(FileSystemOperations.DeletePath, false);
}
@Test
public void testDeleteFileUnauthorized() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
intercept(AbfsAuthorizationException.class,
()-> {
fs.delete(TEST_WRITE_THEN_READ_ONLY_PATH, false);
});
runTest(FileSystemOperations.DeletePath, true);
}
@Test
public void testListStatusAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
fs.listStatus(TEST_WRITE_THEN_READ_ONLY_PATH);
runTest(FileSystemOperations.ListPaths, false);
}
@Test
public void testListStatusUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
intercept(AbfsAuthorizationException.class,
()-> {
fs.listStatus(TEST_WRITE_ONLY_FILE_PATH_0);
});
runTest(FileSystemOperations.ListPaths, true);
}
@Test
public void testMkDirsAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
fs.mkdirs(TEST_WRITE_ONLY_FOLDER_PATH, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
runTest(FileSystemOperations.Mkdir, false);
}
@Test
public void testMkDirsUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
intercept(AbfsAuthorizationException.class,
()-> {
fs.mkdirs(TEST_READ_ONLY_FOLDER_PATH, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
});
runTest(FileSystemOperations.Mkdir, true);
}
@Test
public void testGetFileStatusAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
fs.getFileStatus(TEST_WRITE_THEN_READ_ONLY_PATH);
runTest(FileSystemOperations.GetPathStatus, false);
}
@Test
public void testGetFileStatusUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
intercept(AbfsAuthorizationException.class,
()-> {
fs.getFileStatus(TEST_WRITE_ONLY_FILE_PATH_0);
});
}
@Test
public void testSetOwnerAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
fs.setOwner(TEST_WRITE_ONLY_FILE_PATH_0, TEST_USER, TEST_GROUP);
runTest(FileSystemOperations.GetPathStatus, true);
}
@Test
public void testSetOwnerUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
intercept(AbfsAuthorizationException.class,
()-> {
fs.setOwner(TEST_WRITE_THEN_READ_ONLY_PATH, TEST_USER, TEST_GROUP);
});
}
@Test
public void testSetPermissionAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
fs.setPermission(TEST_WRITE_ONLY_FILE_PATH_0, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
runTest(FileSystemOperations.SetOwner, true);
}
@Test
public void testSetPermissionUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
intercept(AbfsAuthorizationException.class,
()-> {
fs.setPermission(TEST_WRITE_THEN_READ_ONLY_PATH, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
});
}
@Test
public void testModifyAclEntriesAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL));
fs.modifyAclEntries(TEST_WRITE_ONLY_FILE_PATH_0, aclSpec);
runTest(FileSystemOperations.SetPermissions, true);
}
@Test
public void testModifyAclEntriesUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL));
intercept(AbfsAuthorizationException.class,
()-> {
fs.modifyAclEntries(TEST_WRITE_THEN_READ_ONLY_PATH, aclSpec);
});
}
@Test
public void testRemoveAclEntriesAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL));
fs.removeAclEntries(TEST_WRITE_ONLY_FILE_PATH_0, aclSpec);
runTest(FileSystemOperations.ModifyAclEntries, true);
}
@Test
public void testRemoveAclEntriesUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL));
intercept(AbfsAuthorizationException.class,
()-> {
fs.removeAclEntries(TEST_WRITE_THEN_READ_ONLY_PATH, aclSpec);
});
}
@Test
public void testRemoveDefaultAclAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
fs.removeDefaultAcl(TEST_WRITE_ONLY_FILE_PATH_0);
runTest(FileSystemOperations.RemoveAclEntries, true);
}
@Test
public void testRemoveDefaultAclUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
intercept(AbfsAuthorizationException.class,
()-> {
fs.removeDefaultAcl(TEST_WRITE_THEN_READ_ONLY_PATH);
});
}
@Test
public void testRemoveAclAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
fs.removeAcl(TEST_WRITE_ONLY_FILE_PATH_0);
runTest(FileSystemOperations.RemoveDefaultAcl, true);
}
@Test
public void testRemoveAclUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
intercept(AbfsAuthorizationException.class,
()-> {
fs.removeAcl(TEST_WRITE_THEN_READ_ONLY_PATH);
});
}
@Test
public void testSetAclAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL));
fs.setAcl(TEST_WRITE_ONLY_FILE_PATH_0, aclSpec);
runTest(FileSystemOperations.RemoveAcl, true);
}
@Test
public void testSetAclUnauthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL));
intercept(AbfsAuthorizationException.class,
()-> {
fs.setAcl(TEST_WRITE_THEN_READ_ONLY_PATH, aclSpec);
});
runTest(FileSystemOperations.SetAcl, true);
}
@Test
public void testGetAclStatusAuthorized() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_THEN_READ_ONLY_PATH).close();
List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL));
fs.getAclStatus(TEST_WRITE_THEN_READ_ONLY_PATH);
runTest(FileSystemOperations.GetAcl, false);
}
@Test
public void testGetAclStatusUnauthorized() throws Exception {
runTest(FileSystemOperations.GetAcl, true);
}
private void runTest(FileSystemOperations testOp,
boolean expectAbfsAuthorizationException) throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
assumeTrue("This test case only runs when namespace is enabled", fs.getIsNamespaceEnabled());
fs.create(TEST_WRITE_ONLY_FILE_PATH_0).close();
List<AclEntry> aclSpec = Arrays.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL));
intercept(AbfsAuthorizationException.class,
()-> {
fs.getAclStatus(TEST_WRITE_ONLY_FILE_PATH_0);
});
Path reqPath = new Path("requestPath"
+ UUID.randomUUID().toString()
+ (expectAbfsAuthorizationException ? "unauthorized":""));
getMockSASTokenProvider(fs).setSkipAuthorizationForTestSetup(true);
if ((testOp != FileSystemOperations.CreatePath)
&& (testOp != FileSystemOperations.Mkdir)) {
fs.create(reqPath).close();
fs.getFileStatus(reqPath);
}
getMockSASTokenProvider(fs).setSkipAuthorizationForTestSetup(false);
// Test Operation
if (expectAbfsAuthorizationException) {
intercept(SASTokenProviderException.class, () -> {
executeOp(reqPath, fs, testOp);
});
} else {
executeOp(reqPath, fs, testOp);
}
}
private void executeOp(Path reqPath, AzureBlobFileSystem fs,
FileSystemOperations op) throws IOException, IOException {
switch (op) {
case ListPaths:
fs.listStatus(reqPath);
break;
case CreatePath:
fs.create(reqPath);
break;
case RenamePath:
fs.rename(reqPath,
new Path("renameDest" + UUID.randomUUID().toString()));
break;
case GetAcl:
fs.getAclStatus(reqPath);
break;
case GetPathStatus:
fs.getFileStatus(reqPath);
break;
case SetAcl:
fs.setAcl(reqPath, Arrays
.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL)));
break;
case SetOwner:
fs.setOwner(reqPath, TEST_USER, TEST_GROUP);
break;
case SetPermissions:
fs.setPermission(reqPath,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
break;
case Append:
fs.append(reqPath);
break;
case ReadFile:
fs.open(reqPath);
break;
case Open:
fs.open(reqPath);
break;
case DeletePath:
fs.delete(reqPath, false);
break;
case Mkdir:
fs.mkdirs(reqPath,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
break;
case RemoveAclEntries:
fs.removeAclEntries(reqPath, Arrays
.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL)));
break;
case ModifyAclEntries:
fs.modifyAclEntries(reqPath, Arrays
.asList(aclEntry(ACCESS, GROUP, BAR, FsAction.ALL)));
break;
case RemoveAcl:
fs.removeAcl(reqPath);
break;
case RemoveDefaultAcl:
fs.removeDefaultAcl(reqPath);
break;
default:
throw new IllegalStateException("Unexpected value: " + op);
}
}
private MockSASTokenProvider getMockSASTokenProvider(AzureBlobFileSystem fs)
throws Exception {
return ((MockSASTokenProvider) fs.getAbfsStore().getClient().getSasTokenProvider());
}
enum FileSystemOperations {
None, ListPaths, CreatePath, RenamePath, GetAcl, GetPathStatus, SetAcl,
SetOwner, SetPermissions, Append, ReadFile, DeletePath, Mkdir,
RemoveAclEntries, RemoveDefaultAcl, RemoveAcl, ModifyAclEntries,
Open
}
}

View File

@ -42,6 +42,9 @@ public final class TestConfigurationKeys {
public static final String FS_AZURE_BLOB_FS_CHECKACCESS_TEST_USER_GUID = "fs.azure.check.access.testuser.guid";
public static final String MOCK_SASTOKENPROVIDER_FAIL_INIT = "mock.sastokenprovider.fail.init";
public static final String MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN = "mock.sastokenprovider.return.empty.sasToken";
public static final String TEST_CONFIGURATION_FILE_NAME = "azure-test.xml";
public static final String TEST_CONTAINER_PREFIX = "abfs-testcontainer-";
public static final int TEST_TIMEOUT = 15 * 60 * 1000;

View File

@ -1,87 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
/**
* A mock Azure Blob File System Authorization Implementation
*/
public class MockAbfsAuthorizer implements AbfsAuthorizer {
public static final String TEST_READ_ONLY_FILE_0 = "readOnlyFile0";
public static final String TEST_READ_ONLY_FILE_1 = "readOnlyFile1";
public static final String TEST_READ_ONLY_FOLDER = "readOnlyFolder";
public static final String TEST_WRITE_ONLY_FILE_0 = "writeOnlyFile0";
public static final String TEST_WRITE_ONLY_FILE_1 = "writeOnlyFile1";
public static final String TEST_WRITE_ONLY_FOLDER = "writeOnlyFolder";
public static final String TEST_READ_WRITE_FILE_0 = "readWriteFile0";
public static final String TEST_READ_WRITE_FILE_1 = "readWriteFile1";
public static final String TEST_WRITE_THEN_READ_ONLY = "writeThenReadOnlyFile";
private Configuration conf;
private Set<Path> readOnlyPaths = new HashSet<Path>();
private Set<Path> writeOnlyPaths = new HashSet<Path>();
private Set<Path> readWritePaths = new HashSet<Path>();
private int writeThenReadOnly = 0;
public MockAbfsAuthorizer(Configuration conf) {
this.conf = conf;
}
@Override
public void init() throws AbfsAuthorizationException, IOException {
readOnlyPaths.add(new Path(TEST_READ_ONLY_FILE_0));
readOnlyPaths.add(new Path(TEST_READ_ONLY_FILE_1));
readOnlyPaths.add(new Path(TEST_READ_ONLY_FOLDER));
writeOnlyPaths.add(new Path(TEST_WRITE_ONLY_FILE_0));
writeOnlyPaths.add(new Path(TEST_WRITE_ONLY_FILE_1));
writeOnlyPaths.add(new Path(TEST_WRITE_ONLY_FOLDER));
readWritePaths.add(new Path(TEST_READ_WRITE_FILE_0));
readWritePaths.add(new Path(TEST_READ_WRITE_FILE_1));
}
@Override
public boolean isAuthorized(FsAction action, Path... absolutePaths) throws AbfsAuthorizationException, IOException {
Set<Path> paths = new HashSet<Path>();
for (Path path : absolutePaths) {
paths.add(new Path(path.getName()));
}
if (action.equals(FsAction.READ) && Stream.concat(readOnlyPaths.stream(), readWritePaths.stream()).collect(Collectors.toSet()).containsAll(paths)) {
return true;
} else if (action.equals(FsAction.READ) && paths.contains(new Path(TEST_WRITE_THEN_READ_ONLY)) && writeThenReadOnly == 1) {
return true;
} else if (action.equals(FsAction.WRITE)
&& Stream.concat(writeOnlyPaths.stream(), readWritePaths.stream()).collect(Collectors.toSet()).containsAll(paths)) {
return true;
} else if (action.equals(FsAction.WRITE) && paths.contains(new Path(TEST_WRITE_THEN_READ_ONLY)) && writeThenReadOnly == 0) {
writeThenReadOnly = 1;
return true;
} else {
return action.equals(FsAction.READ_WRITE) && readWritePaths.containsAll(paths);
}
}
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.MOCK_SASTOKENPROVIDER_FAIL_INIT;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN;
/**
* A mock SAS token provider to test error conditions.
*/
public class MockErrorSASTokenProvider implements SASTokenProvider {
Configuration config = null;
@Override
public void initialize(org.apache.hadoop.conf.Configuration configuration, String accountName) {
boolean throwExceptionAtInit = configuration.getBoolean(MOCK_SASTOKENPROVIDER_FAIL_INIT, false);
if (throwExceptionAtInit) {
throw new RuntimeException("MockSASTokenProvider initialize exception");
}
this.config = configuration;
}
/**
* Returns null SAS token query or Empty if returnEmptySASToken is set.
* @param accountName
* @param fileSystem the name of the fileSystem.
* @param path the file or directory path.
* @param operation the operation to be performed on the path.
* @return
*/
@Override
public String getSASToken(String accountName, String fileSystem, String path,
String operation) {
boolean returnEmptySASTokenQuery = this.config.getBoolean(
MOCK_SASTOKENPROVIDER_RETURN_EMPTY_SAS_TOKEN, false);
if (returnEmptySASTokenQuery) {
return "";
} else { return null; }
}
}

View File

@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.extensions;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.utils.Base64;
import org.apache.hadoop.fs.azurebfs.utils.SASGenerator;
/**
* A mock SAS token provider implementation
*/
public class MockSASTokenProvider implements SASTokenProvider {
private byte[] accountKey;
private SASGenerator generator;
private boolean skipAuthorizationForTestSetup = false;
// For testing we use a container SAS for all operations.
private String generateSAS(byte[] accountKey, String accountName, String fileSystemName) {
return generator.getContainerSASWithFullControl(accountName, fileSystemName);
}
@Override
public void initialize(Configuration configuration, String accountName) throws IOException {
try {
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, accountName);
accountKey = Base64.decode(abfsConfig.getStorageAccountKey());
} catch (Exception ex) {
throw new IOException(ex);
}
generator = new SASGenerator(accountKey);
}
/**
* Invokes the authorizer to obtain a SAS token.
*
* @param accountName the name of the storage account.
* @param fileSystem the name of the fileSystem.
* @param path the file or directory path.
* @param operation the operation to be performed on the path.
* @return a SAS token to perform the request operation.
* @throws IOException if there is a network error.
* @throws AccessControlException if access is denied.
*/
@Override
public String getSASToken(String accountName, String fileSystem, String path,
String operation) throws IOException, AccessControlException {
if (!isSkipAuthorizationForTestSetup() && path.contains("unauthorized")) {
throw new AccessControlException(
"The user is not authorized to perform this operation.");
}
return generateSAS(accountKey, accountName, fileSystem);
}
public boolean isSkipAuthorizationForTestSetup() {
return skipAuthorizationForTestSetup;
}
public void setSkipAuthorizationForTestSetup(
boolean skipAuthorizationForTestSetup) {
this.skipAuthorizationForTestSetup = skipAuthorizationForTestSetup;
}
}

View File

@ -21,10 +21,12 @@ package org.apache.hadoop.fs.azurebfs.services;
import java.net.URL;
import java.util.regex.Pattern;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.junit.Assert;
import org.junit.Test;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
@ -36,14 +38,15 @@ import org.apache.hadoop.util.VersionInfo;
*/
public final class TestAbfsClient {
private final String accountName = "bogusAccountName";
private final String accountName = "bogusAccountName.dfs.core.windows.net";
private void validateUserAgent(String expectedPattern,
URL baseUrl,
AbfsConfiguration config,
boolean includeSSLProvider) {
boolean includeSSLProvider)
throws AzureBlobFileSystemException {
AbfsClient client = new AbfsClient(baseUrl, null,
config, null, null, null);
config, null, (AccessTokenProvider) null, null);
String sslProviderName = null;
if (includeSSLProvider) {
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.utils;
import java.io.UnsupportedEncodingException;
import java.time.format.DateTimeFormatter;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Locale;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
/**
* Test container SAS generator.
*/
public class SASGenerator {
private static final String HMAC_SHA256 = "HmacSHA256";
private static final int TOKEN_START_PERIOD_IN_SECONDS = 5 * 60;
private static final int TOKEN_EXPIRY_PERIOD_IN_SECONDS = 24 * 60 * 60;
public static final DateTimeFormatter ISO_8601_UTC_DATE_FORMATTER =
DateTimeFormatter
.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT)
.withZone(ZoneId.of("UTC"));
private Mac hmacSha256;
private byte[] key;
public SASGenerator(byte[] key) {
this.key = key;
initializeMac();
}
public String getContainerSASWithFullControl(String accountName, String containerName) {
String sp = "rcwdl";
String sv = "2018-11-09";
String sr = "c";
String st = ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().minusSeconds(TOKEN_START_PERIOD_IN_SECONDS));
String se =
ISO_8601_UTC_DATE_FORMATTER.format(Instant.now().plusSeconds(TOKEN_EXPIRY_PERIOD_IN_SECONDS));
String signature = computeSignatureForSAS(sp, st, se, sv, "c",
accountName, containerName);
AbfsUriQueryBuilder qb = new AbfsUriQueryBuilder();
qb.addQuery("sp", sp);
qb.addQuery("st", st);
qb.addQuery("se", se);
qb.addQuery("sv", sv);
qb.addQuery("sr", sr);
qb.addQuery("sig", signature);
return qb.toString().substring(1);
}
private String computeSignatureForSAS(String sp, String st,
String se, String sv, String sr, String accountName, String containerName) {
StringBuilder sb = new StringBuilder();
sb.append(sp);
sb.append("\n");
sb.append(st);
sb.append("\n");
sb.append(se);
sb.append("\n");
// canonicalized resource
sb.append("/blob/");
sb.append(accountName);
sb.append("/");
sb.append(containerName);
sb.append("\n");
sb.append("\n"); // si
sb.append("\n"); // sip
sb.append("\n"); // spr
sb.append(sv);
sb.append("\n");
sb.append(sr);
sb.append("\n");
sb.append("\n"); // - For optional : rscc - ResponseCacheControl
sb.append("\n"); // - For optional : rscd - ResponseContentDisposition
sb.append("\n"); // - For optional : rsce - ResponseContentEncoding
sb.append("\n"); // - For optional : rscl - ResponseContentLanguage
sb.append("\n"); // - For optional : rsct - ResponseContentType
String stringToSign = sb.toString();
return computeHmac256(stringToSign);
}
private void initializeMac() {
// Initializes the HMAC-SHA256 Mac and SecretKey.
try {
hmacSha256 = Mac.getInstance(HMAC_SHA256);
hmacSha256.init(new SecretKeySpec(key, HMAC_SHA256));
} catch (final Exception e) {
throw new IllegalArgumentException(e);
}
}
private String computeHmac256(final String stringToSign) {
byte[] utf8Bytes;
try {
utf8Bytes = stringToSign.getBytes(AbfsHttpConstants.UTF_8);
} catch (final UnsupportedEncodingException e) {
throw new IllegalArgumentException(e);
}
byte[] hmac;
synchronized (this) {
hmac = hmacSha256.doFinal(utf8Bytes);
}
return Base64.encode(hmac);
}
}

View File

@ -30,7 +30,7 @@
<property>
<name>fs.azure.test.namespace.enabled</name>
<value>false</value>
<value>true</value>
</property>
<property>