From 9149b9703e3ab09abdc087db129e82ad3f4cefa1 Mon Sep 17 00:00:00 2001 From: Thomas Marquardt Date: Sat, 18 Aug 2018 18:53:32 +0000 Subject: [PATCH] HADOOP-15660. ABFS: Add support for OAuth Contributed by Da Zhou, Rajeev Bansal, and Junhua Gu. --- .../hadoop/fs/azurebfs/AbfsConfiguration.java | 149 ++++++-- .../fs/azurebfs/AzureBlobFileSystemStore.java | 28 +- .../azurebfs/constants/ConfigurationKeys.java | 19 + .../TokenAccessProviderException.java | 36 ++ .../services/AzureServiceErrorCode.java | 1 + .../services/ListResultEntrySchema.java | 89 ++++- .../azurebfs/oauth2/AccessTokenProvider.java | 98 +++++ .../azurebfs/oauth2/AzureADAuthenticator.java | 344 ++++++++++++++++++ .../fs/azurebfs/oauth2/AzureADToken.java | 47 +++ .../oauth2/ClientCredsTokenProvider.java | 62 ++++ .../oauth2/CustomTokenProviderAdaptee.java | 75 ++++ .../oauth2/CustomTokenProviderAdapter.java | 57 +++ .../fs/azurebfs/oauth2/MsiTokenProvider.java | 48 +++ .../fs/azurebfs/oauth2/QueryParams.java | 69 ++++ .../RefreshTokenBasedTokenProvider.java | 57 +++ .../oauth2/UserPasswordTokenProvider.java | 66 ++++ .../fs/azurebfs/oauth2/package-info.java | 18 + .../fs/azurebfs/services/AbfsClient.java | 18 +- .../fs/azurebfs/services/AbfsHttpHeader.java | 2 +- .../azurebfs/services/AbfsRestOperation.java | 19 +- .../hadoop/fs/azurebfs/services/AuthType.java | 27 ++ .../azurebfs/AbstractAbfsIntegrationTest.java | 35 +- .../hadoop/fs/azurebfs/ITestAbfsClient.java | 2 +- .../ITestAzureBlobFileSystemBackCompat.java | 4 + .../ITestAzureBlobFileSystemFileStatus.java | 3 - .../ITestAzureBlobFileSystemFinalize.java | 8 +- .../ITestAzureBlobFileSystemFlush.java | 8 +- .../ITestAzureBlobFileSystemOauth.java | 176 +++++++++ .../ITestAzureBlobFileSystemRandomRead.java | 3 + .../ITestFileSystemInitialization.java | 5 +- .../azurebfs/ITestFileSystemRegistration.java | 11 +- .../azurebfs/ITestWasbAbfsCompatibility.java | 2 + .../constants/TestConfigurationKeys.java | 6 + .../contract/ABFSContractTestBinding.java | 14 +- .../ITestAbfsFileSystemContractAppend.java | 19 +- .../ITestAbfsFileSystemContractConcat.java | 17 +- .../ITestAbfsFileSystemContractCreate.java | 17 +- .../ITestAbfsFileSystemContractDelete.java | 17 +- .../ITestAbfsFileSystemContractDistCp.java | 2 +- ...stAbfsFileSystemContractGetFileStatus.java | 17 +- .../ITestAbfsFileSystemContractMkdir.java | 17 +- .../ITestAbfsFileSystemContractOpen.java | 17 +- .../ITestAbfsFileSystemContractRename.java | 17 +- ...stAbfsFileSystemContractRootDirectory.java | 16 +- ...estAbfsFileSystemContractSecureDistCp.java | 2 +- .../ITestAbfsFileSystemContractSeek.java | 17 +- .../ITestAbfsFileSystemContractSetTimes.java | 17 +- .../ITestAzureBlobFileSystemBasics.java | 2 +- .../fs/azurebfs/services/TestAbfsClient.java | 6 +- .../fs/azurebfs/services/TestQueryParams.java | 72 ++++ .../utils/CleanUpAbfsTestContainer.java | 13 +- .../src/test/resources/azure-bfs-test.xml | 128 ++++++- 52 files changed, 1769 insertions(+), 250 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestQueryParams.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index e647ae8f250..f26f56204ca 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs; +import java.io.IOException; import java.lang.reflect.Field; import java.util.Map; @@ -26,7 +27,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.IntegerConfigurationValidatorAnnotation; import org.apache.hadoop.fs.azurebfs.contracts.annotations.ConfigurationValidationAnnotations.LongConfigurationValidatorAnnotation; @@ -37,16 +37,26 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc import org.apache.hadoop.fs.azurebfs.contracts.exceptions.ConfigurationPropertyNotFoundException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TokenAccessProviderException; import org.apache.hadoop.fs.azurebfs.diagnostics.Base64StringConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.BooleanConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.IntegerConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdaptee; +import org.apache.hadoop.fs.azurebfs.oauth2.CustomTokenProviderAdapter; +import org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.RefreshTokenBasedTokenProvider; +import org.apache.hadoop.fs.azurebfs.oauth2.UserPasswordTokenProvider; +import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.KeyProvider; import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider; import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx; +import org.apache.hadoop.util.ReflectionUtils; -import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_FS_AZURE_SSL_CHANNEL_MODE; /** @@ -58,81 +68,81 @@ public class AbfsConfiguration{ private final Configuration configuration; private final boolean isSecure; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_WRITE_BUFFER_SIZE, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE, MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, DefaultValue = FileSystemConfigurations.DEFAULT_WRITE_BUFFER_SIZE) private int writeBufferSize; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_READ_BUFFER_SIZE, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE, MinValue = FileSystemConfigurations.MIN_BUFFER_SIZE, MaxValue = FileSystemConfigurations.MAX_BUFFER_SIZE, DefaultValue = FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE) private int readBufferSize; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MIN_BACKOFF_INTERVAL, DefaultValue = FileSystemConfigurations.DEFAULT_MIN_BACKOFF_INTERVAL) private int minBackoffInterval; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_BACKOFF_INTERVAL, DefaultValue = FileSystemConfigurations.DEFAULT_MAX_BACKOFF_INTERVAL) private int maxBackoffInterval; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BACKOFF_INTERVAL, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BACKOFF_INTERVAL, DefaultValue = FileSystemConfigurations.DEFAULT_BACKOFF_INTERVAL) private int backoffInterval; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_MAX_IO_RETRIES, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_MAX_IO_RETRIES, MinValue = 0, DefaultValue = FileSystemConfigurations.DEFAULT_MAX_RETRY_ATTEMPTS) private int maxIoRetries; - @LongConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_SIZE_PROPERTY_NAME, + @LongConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_SIZE_PROPERTY_NAME, MinValue = 0, MaxValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE, DefaultValue = FileSystemConfigurations.MAX_AZURE_BLOCK_SIZE) private long azureBlockSize; - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, + @StringConfigurationValidatorAnnotation(ConfigurationKey = AZURE_BLOCK_LOCATION_HOST_PROPERTY_NAME, DefaultValue = FileSystemConfigurations.AZURE_BLOCK_LOCATION_HOST_DEFAULT) private String azureBlockLocationHost; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_OUT, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_OUT, MinValue = 1, DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_WRITE_THREADS) private int maxConcurrentWriteThreads; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CONCURRENT_CONNECTION_VALUE_IN, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CONCURRENT_CONNECTION_VALUE_IN, MinValue = 1, DefaultValue = FileSystemConfigurations.MAX_CONCURRENT_READ_THREADS) private int maxConcurrentReadThreads; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_TOLERATE_CONCURRENT_APPEND, DefaultValue = FileSystemConfigurations.DEFAULT_READ_TOLERATE_CONCURRENT_APPEND) private boolean tolerateOobAppends; - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ATOMIC_RENAME_KEY, + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ATOMIC_RENAME_KEY, DefaultValue = FileSystemConfigurations.DEFAULT_FS_AZURE_ATOMIC_RENAME_DIRECTORIES) private String azureAtomicDirs; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION) private boolean createRemoteFileSystemDuringInitialization; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION, DefaultValue = FileSystemConfigurations.DEFAULT_AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION) private boolean skipUserGroupMetadataDuringInitialization; - @IntegerConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_AHEAD_QUEUE_DEPTH, DefaultValue = FileSystemConfigurations.DEFAULT_READ_AHEAD_QUEUE_DEPTH) private int readAheadQueueDepth; - @BooleanConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_ENABLE_FLUSH, + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_FLUSH, DefaultValue = FileSystemConfigurations.DEFAULT_ENABLE_FLUSH) private boolean enableFlush; - @StringConfigurationValidatorAnnotation(ConfigurationKey = ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY, DefaultValue = "") private String userAgentId; @@ -140,7 +150,7 @@ public class AbfsConfiguration{ public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException { this.configuration = configuration; - this.isSecure = this.configuration.getBoolean(ConfigurationKeys.FS_AZURE_SECURE_MODE, false); + this.isSecure = this.configuration.getBoolean(FS_AZURE_SECURE_MODE, false); validateStorageAccountKeys(); Field[] fields = this.getClass().getDeclaredFields(); @@ -161,17 +171,17 @@ public class AbfsConfiguration{ } public boolean isEmulator() { - return this.getConfiguration().getBoolean(ConfigurationKeys.FS_AZURE_EMULATOR_ENABLED, false); + return this.getConfiguration().getBoolean(FS_AZURE_EMULATOR_ENABLED, false); } public boolean isSecureMode() { - return this.isSecure; + return isSecure; } public String getStorageAccountKey(final String accountName) throws AzureBlobFileSystemException { String key; String keyProviderClass = - configuration.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName); + configuration.get(AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName); KeyProvider keyProvider; if (keyProviderClass == null) { @@ -278,19 +288,88 @@ public class AbfsConfiguration{ return configuration.getEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DEFAULT_FS_AZURE_SSL_CHANNEL_MODE); } + public AuthType getAuthType(final String accountName) { + return configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey); + } + + public AccessTokenProvider getTokenProvider(final String accountName) throws TokenAccessProviderException { + AuthType authType = configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey); + if (authType == AuthType.OAuth) { + try { + Class tokenProviderClass = + configuration.getClass(FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName, null, + AccessTokenProvider.class); + AccessTokenProvider tokenProvider = null; + if (tokenProviderClass == ClientCredsTokenProvider.class) { + String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName); + String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName); + String clientSecret = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + accountName); + tokenProvider = new ClientCredsTokenProvider(authEndpoint, clientId, clientSecret); + } else if (tokenProviderClass == UserPasswordTokenProvider.class) { + String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName); + String username = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_NAME + accountName); + String password = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD + accountName); + tokenProvider = new UserPasswordTokenProvider(authEndpoint, username, password); + } else if (tokenProviderClass == MsiTokenProvider.class) { + String tenantGuid = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT + accountName); + String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName); + tokenProvider = new MsiTokenProvider(tenantGuid, clientId); + } else if (tokenProviderClass == RefreshTokenBasedTokenProvider.class) { + String refreshToken = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN + accountName); + String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName); + tokenProvider = new RefreshTokenBasedTokenProvider(clientId, refreshToken); + } else { + throw new IllegalArgumentException("Failed to initialize " + tokenProviderClass); + } + return tokenProvider; + } catch(IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new TokenAccessProviderException("Unable to load key provider class.", e); + } + + } else if (authType == AuthType.Custom) { + try { + String configKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName; + Class customTokenProviderClass = + configuration.getClass(configKey, null, + CustomTokenProviderAdaptee.class); + if (customTokenProviderClass == null) { + throw new IllegalArgumentException( + String.format("The configuration value for \"%s\" is invalid.", configKey)); + } + CustomTokenProviderAdaptee azureTokenProvider = ReflectionUtils + .newInstance(customTokenProviderClass, configuration); + if (azureTokenProvider == null) { + throw new IllegalArgumentException("Failed to initialize " + customTokenProviderClass); + } + azureTokenProvider.initialize(configuration, accountName); + return new CustomTokenProviderAdapter(azureTokenProvider); + } catch(IllegalArgumentException e) { + throw e; + } catch (Exception e) { + throw new TokenAccessProviderException("Unable to load custom token provider class.", e); + } + + } else { + throw new TokenAccessProviderException(String.format( + "Invalid auth type: %s is being used, expecting OAuth", authType)); + } + } + void validateStorageAccountKeys() throws InvalidConfigurationValueException { Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator( - ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); - this.storageAccountKeys = this.configuration.getValByRegex(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX); + FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true); + this.storageAccountKeys = configuration.getValByRegex(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX); - for (Map.Entry account : this.storageAccountKeys.entrySet()) { + for (Map.Entry account : storageAccountKeys.entrySet()) { validator.validate(account.getValue()); } } int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException { IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new IntegerConfigurationBasicValidator( @@ -303,7 +382,7 @@ public class AbfsConfiguration{ long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException { LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new LongConfigurationBasicValidator( @@ -316,7 +395,7 @@ public class AbfsConfiguration{ String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException { StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new StringConfigurationBasicValidator( @@ -327,7 +406,7 @@ public class AbfsConfiguration{ String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException { Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class)); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new Base64StringConfigurationBasicValidator( @@ -338,7 +417,7 @@ public class AbfsConfiguration{ boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException { BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class); - String value = this.configuration.get(validator.ConfigurationKey()); + String value = configuration.get(validator.ConfigurationKey()); // validate return new BooleanConfigurationBasicValidator( @@ -347,6 +426,14 @@ public class AbfsConfiguration{ validator.ThrowIfInvalid()).validate(value); } + String getPasswordString(String key) throws IOException { + char[] passchars = configuration.getPassword(key); + if (passchars != null) { + return new String(passchars); + } + return null; + } + @VisibleForTesting void setReadBufferSize(int bufferSize) { this.readBufferSize = bufferSize; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index ba721497d74..b8da35b0b10 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -67,10 +67,12 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema; import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; +import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; import org.apache.hadoop.fs.azurebfs.services.SharedKeyCredentials; import org.apache.hadoop.fs.permission.FsAction; @@ -487,16 +489,22 @@ public class AzureBlobFileSystemStore { throw new InvalidUriException(uri.toString()); } - int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT); - if (dotIndex <= 0) { - throw new InvalidUriException( - uri.toString() + " - account name is not fully qualified."); - } - SharedKeyCredentials creds = - new SharedKeyCredentials(accountName.substring(0, dotIndex), - this.abfsConfiguration.getStorageAccountKey(accountName)); + SharedKeyCredentials creds = null; + AccessTokenProvider tokenProvider = null; - this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy()); + if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) { + int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT); + if (dotIndex <= 0) { + throw new InvalidUriException( + uri.toString() + " - account name is not fully qualified."); + } + creds = new SharedKeyCredentials(accountName.substring(0, dotIndex), + abfsConfiguration.getStorageAccountKey(accountName)); + } else { + tokenProvider = abfsConfiguration.getTokenProvider(accountName); + } + + this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider); } private String getRelativePath(final Path path) { @@ -537,7 +545,7 @@ public class AzureBlobFileSystemStore { Date utcDate = new SimpleDateFormat(DATE_TIME_PATTERN).parse(lastModifiedTime); parsedTime = utcDate.getTime(); } catch (ParseException e) { - LOG.error("Failed to parse the date {0}", lastModifiedTime); + LOG.error("Failed to parse the date {}", lastModifiedTime); } finally { return parsedTime; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 16ddd900edd..ffdf700b78f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -60,5 +60,24 @@ public final class ConfigurationKeys { public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider."; public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script"; + /** Prefix for auth type properties: {@value}. */ + public static final String FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME = "fs.azure.account.auth.type."; + /** Prefix for oauth token provider type: {@value}. */ + public static final String FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME = "fs.azure.account.oauth.provider.type."; + /** Prefix for oauth AAD client id: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID = "fs.azure.account.oauth2.client.id."; + /** Prefix for oauth AAD client secret: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET = "fs.azure.account.oauth2.client.secret."; + /** Prefix for oauth AAD client endpoint: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT = "fs.azure.account.oauth2.client.endpoint."; + /** Prefix for oauth msi tenant id: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT = "fs.azure.account.oauth2.msi.tenant."; + /** Prefix for oauth user name: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_USER_NAME = "fs.azure.account.oauth2.user.name."; + /** Prefix for oauth user password: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD = "fs.azure.account.oauth2.user.password."; + /** Prefix for oauth refresh token: {@value}. */ + public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token."; + private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java new file mode 100644 index 00000000000..b40b34ac13e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/TokenAccessProviderException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.contracts.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Thrown if there is a problem instantiating a TokenAccessProvider or retrieving a configuration + * using a TokenAccessProvider object. + */ +@InterfaceAudience.Private +public class TokenAccessProviderException extends AzureBlobFileSystemException { + + public TokenAccessProviderException(String message) { + super(message); + } + + public TokenAccessProviderException(String message, Throwable cause) { + super(message); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index a89f339967c..63bf8d03fda 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -44,6 +44,7 @@ public enum AzureServiceErrorCode { INGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Ingress is over the account limit."), EGRESS_OVER_ACCOUNT_LIMIT(null, HttpURLConnection.HTTP_UNAVAILABLE, "Egress is over the account limit."), INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null), + AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null), UNKNOWN(null, -1, null); private final String errorCode; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java index 903ff69e9e3..1de9dfaeeb9 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/ListResultEntrySchema.java @@ -57,13 +57,31 @@ public class ListResultEntrySchema { @JsonProperty(value = "contentLength") private Long contentLength; + /** + * The owner property. + */ + @JsonProperty(value = "owner") + private String owner; + + /** + * The group property. + */ + @JsonProperty(value = "group") + private String group; + + /** + * The permissions property. + */ + @JsonProperty(value = "permissions") + private String permissions; + /** * Get the name value. * * @return the name value */ public String name() { - return this.name; + return name; } /** @@ -83,7 +101,7 @@ public class ListResultEntrySchema { * @return the isDirectory value */ public Boolean isDirectory() { - return this.isDirectory; + return isDirectory; } /** @@ -103,7 +121,7 @@ public class ListResultEntrySchema { * @return the lastModified value */ public String lastModified() { - return this.lastModified; + return lastModified; } /** @@ -123,7 +141,7 @@ public class ListResultEntrySchema { * @return the etag value */ public String eTag() { - return this.eTag; + return eTag; } /** @@ -143,7 +161,7 @@ public class ListResultEntrySchema { * @return the contentLength value */ public Long contentLength() { - return this.contentLength; + return contentLength; } /** @@ -157,4 +175,65 @@ public class ListResultEntrySchema { return this; } + /** + * + Get the owner value. + * + * @return the owner value + */ + public String owner() { + return owner; + } + + /** + * Set the owner value. + * + * @param owner the owner value to set + * @return the ListEntrySchema object itself. + */ + public ListResultEntrySchema withOwner(final String owner) { + this.owner = owner; + return this; + } + + /** + * Get the group value. + * + * @return the group value + */ + public String group() { + return group; + } + + /** + * Set the group value. + * + * @param group the group value to set + * @return the ListEntrySchema object itself. + */ + public ListResultEntrySchema withGroup(final String group) { + this.group = group; + return this; + } + + /** + * Get the permissions value. + * + * @return the permissions value + */ + public String permissions() { + return permissions; + } + + /** + * Set the permissions value. + * + * @param permissions the permissions value to set + * @return the ListEntrySchema object itself. + */ + public ListResultEntrySchema withPermissions(final String permissions) { + this.permissions = permissions; + return this; + } + } \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java new file mode 100644 index 00000000000..72f37a1dc1a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AccessTokenProvider.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; +import java.util.Date; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Returns an Azure Active Directory token when requested. The provider can + * cache the token if it has already retrieved one. If it does, then the + * provider is responsible for checking expiry and refreshing as needed. + * + * In other words, this is is a token cache that fetches tokens when + * requested, if the cached token has expired. + * + */ +public abstract class AccessTokenProvider { + + private AzureADToken token; + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + /** + * returns the {@link AzureADToken} cached (or retrieved) by this instance. + * + * @return {@link AzureADToken} containing the access token + * @throws IOException if there is an error fetching the token + */ + public synchronized AzureADToken getToken() throws IOException { + if (isTokenAboutToExpire()) { + LOG.debug("AAD Token is missing or expired:" + + " Calling refresh-token from abstract base class"); + token = refreshToken(); + } + return token; + } + + /** + * the method to fetch the access token. Derived classes should override + * this method to actually get the token from Azure Active Directory. + * + * This method will be called initially, and then once when the token + * is about to expire. + * + * + * @return {@link AzureADToken} containing the access token + * @throws IOException if there is an error fetching the token + */ + protected abstract AzureADToken refreshToken() throws IOException; + + /** + * Checks if the token is about to expire in the next 5 minutes. + * The 5 minute allowance is to allow for clock skew and also to + * allow for token to be refreshed in that much time. + * + * @return true if the token is expiring in next 5 minutes + */ + private boolean isTokenAboutToExpire() { + if (token == null) { + LOG.debug("AADToken: no token. Returning expiring=true"); + return true; // no token should have same response as expired token + } + boolean expiring = false; + // allow 5 minutes for clock skew + long approximatelyNow = System.currentTimeMillis() + FIVE_MINUTES; + if (token.getExpiry().getTime() < approximatelyNow) { + expiring = true; + } + if (expiring) { + LOG.debug("AADToken: token expiring: " + + token.getExpiry().toString() + + " : Five-minute window: " + + new Date(approximatelyNow).toString()); + } + + return expiring; + } + + // 5 minutes in milliseconds + private static final long FIVE_MINUTES = 300 * 1000; +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java new file mode 100644 index 00000000000..e82dc954f59 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADAuthenticator.java @@ -0,0 +1,344 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Date; +import java.util.Hashtable; +import java.util.Map; + +import com.google.common.base.Preconditions; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.azurebfs.services.ExponentialRetryPolicy; + +/** + * This class provides convenience methods to obtain AAD tokens. + * While convenient, it is not necessary to use these methods to + * obtain the tokens. Customers can use any other method + * (e.g., using the adal4j client) to obtain tokens. + */ + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class AzureADAuthenticator { + + private static final Logger LOG = LoggerFactory.getLogger(AzureADAuthenticator.class); + private static final String RESOURCE_NAME = "https://storage.azure.com/"; + private static final int CONNECT_TIMEOUT = 30 * 1000; + private static final int READ_TIMEOUT = 30 * 1000; + + private AzureADAuthenticator() { + // no operation + } + + /** + * gets Azure Active Directory token using the user ID and password of + * a service principal (that is, Web App in Azure Active Directory). + * + * Azure Active Directory allows users to set up a web app as a + * service principal. Users can optionally obtain service principal keys + * from AAD. This method gets a token using a service principal's client ID + * and keys. In addition, it needs the token endpoint associated with the + * user's directory. + * + * + * @param authEndpoint the OAuth 2.0 token endpoint associated + * with the user's directory (obtain from + * Active Directory configuration) + * @param clientId the client ID (GUID) of the client web app + * btained from Azure Active Directory configuration + * @param clientSecret the secret key of the client web app + * @return {@link AzureADToken} obtained using the creds + * @throws IOException throws IOException if there is a failure in connecting to Azure AD + */ + public static AzureADToken getTokenUsingClientCreds(String authEndpoint, + String clientId, String clientSecret) + throws IOException { + Preconditions.checkNotNull(authEndpoint, "authEndpoint"); + Preconditions.checkNotNull(clientId, "clientId"); + Preconditions.checkNotNull(clientSecret, "clientSecret"); + + QueryParams qp = new QueryParams(); + qp.add("resource", RESOURCE_NAME); + qp.add("grant_type", "client_credentials"); + qp.add("client_id", clientId); + qp.add("client_secret", clientSecret); + LOG.debug("AADToken: starting to fetch token using client creds for client ID " + clientId); + + return getTokenCall(authEndpoint, qp.serialize(), null, null); + } + + /** + * Gets AAD token from the local virtual machine's VM extension. This only works on + * an Azure VM with MSI extension + * enabled. + * + * @param tenantGuid (optional) The guid of the AAD tenant. Can be {@code null}. + * @param clientId (optional) The clientId guid of the MSI service + * principal to use. Can be {@code null}. + * @param bypassCache {@code boolean} specifying whether a cached token is acceptable or a fresh token + * request should me made to AAD + * @return {@link AzureADToken} obtained using the creds + * @throws IOException throws IOException if there is a failure in obtaining the token + */ + public static AzureADToken getTokenFromMsi(String tenantGuid, String clientId, + boolean bypassCache) throws IOException { + Preconditions.checkNotNull(tenantGuid, "tenantGuid"); + Preconditions.checkNotNull(clientId, "clientId"); + + String authEndpoint = "http://169.254.169.254/metadata/identity/oauth2/token"; + + QueryParams qp = new QueryParams(); + qp.add("api-version", "2018-02-01"); + qp.add("resource", RESOURCE_NAME); + + + if (tenantGuid.length() > 0) { + String authority = "https://login.microsoftonline.com/" + tenantGuid; + qp.add("authority", authority); + } + + if (clientId.length() > 0) { + qp.add("client_id", clientId); + } + + if (bypassCache) { + qp.add("bypass_cache", "true"); + } + + Hashtable headers = new Hashtable<>(); + headers.put("Metadata", "true"); + + LOG.debug("AADToken: starting to fetch token using MSI"); + return getTokenCall(authEndpoint, qp.serialize(), headers, "GET"); + } + + /** + * Gets Azure Active Directory token using refresh token. + * + * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration + * @param refreshToken the refresh token + * @return {@link AzureADToken} obtained using the refresh token + * @throws IOException throws IOException if there is a failure in connecting to Azure AD + */ + public static AzureADToken getTokenUsingRefreshToken(String clientId, + String refreshToken) throws IOException { + String authEndpoint = "https://login.microsoftonline.com/Common/oauth2/token"; + QueryParams qp = new QueryParams(); + qp.add("grant_type", "refresh_token"); + qp.add("refresh_token", refreshToken); + if (clientId != null) { + qp.add("client_id", clientId); + } + LOG.debug("AADToken: starting to fetch token using refresh token for client ID " + clientId); + return getTokenCall(authEndpoint, qp.serialize(), null, null); + } + + private static class HttpException extends IOException { + private int httpErrorCode; + private String requestId; + + public int getHttpErrorCode() { + return this.httpErrorCode; + } + + public String getRequestId() { + return this.requestId; + } + + HttpException(int httpErrorCode, String requestId, String message) { + super(message); + this.httpErrorCode = httpErrorCode; + this.requestId = requestId; + } + } + + private static AzureADToken getTokenCall(String authEndpoint, String body, + Hashtable headers, String httpMethod) + throws IOException { + AzureADToken token = null; + ExponentialRetryPolicy retryPolicy + = new ExponentialRetryPolicy(3, 0, 1000, 2); + + int httperror = 0; + String requestId; + String httpExceptionMessage = null; + IOException ex = null; + boolean succeeded = false; + int retryCount = 0; + do { + httperror = 0; + requestId = ""; + ex = null; + try { + token = getTokenSingleCall(authEndpoint, body, headers, httpMethod); + } catch (HttpException e) { + httperror = e.httpErrorCode; + requestId = e.requestId; + httpExceptionMessage = e.getMessage(); + } catch (IOException e) { + ex = e; + } + succeeded = ((httperror == 0) && (ex == null)); + retryCount++; + } while (!succeeded && retryPolicy.shouldRetry(retryCount, httperror)); + if (!succeeded) { + if (ex != null) { + throw ex; + } + if (httperror != 0) { + throw new IOException(httpExceptionMessage); + } + } + return token; + } + + private static AzureADToken getTokenSingleCall( + String authEndpoint, String payload, Hashtable headers, String httpMethod) + throws IOException { + + AzureADToken token = null; + HttpURLConnection conn = null; + String urlString = authEndpoint; + + httpMethod = (httpMethod == null) ? "POST" : httpMethod; + if (httpMethod.equals("GET")) { + urlString = urlString + "?" + payload; + } + + try { + URL url = new URL(urlString); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod(httpMethod); + conn.setReadTimeout(READ_TIMEOUT); + conn.setConnectTimeout(CONNECT_TIMEOUT); + + if (headers != null && headers.size() > 0) { + for (Map.Entry entry : headers.entrySet()) { + conn.setRequestProperty(entry.getKey(), entry.getValue()); + } + } + conn.setRequestProperty("Connection", "close"); + + if (httpMethod.equals("POST")) { + conn.setDoOutput(true); + conn.getOutputStream().write(payload.getBytes("UTF-8")); + } + + int httpResponseCode = conn.getResponseCode(); + String requestId = conn.getHeaderField("x-ms-request-id"); + String responseContentType = conn.getHeaderField("Content-Type"); + long responseContentLength = conn.getHeaderFieldLong("Content-Length", 0); + + requestId = requestId == null ? "" : requestId; + if (httpResponseCode == HttpURLConnection.HTTP_OK + && responseContentType.startsWith("application/json") && responseContentLength > 0) { + InputStream httpResponseStream = conn.getInputStream(); + token = parseTokenFromStream(httpResponseStream); + } else { + String responseBody = consumeInputStream(conn.getInputStream(), 1024); + String proxies = "none"; + String httpProxy = System.getProperty("http.proxy"); + String httpsProxy = System.getProperty("https.proxy"); + if (httpProxy != null || httpsProxy != null) { + proxies = "http:" + httpProxy + "; https:" + httpsProxy; + } + String logMessage = + "AADToken: HTTP connection failed for getting token from AzureAD. Http response: " + + httpResponseCode + " " + conn.getResponseMessage() + + " Content-Type: " + responseContentType + + " Content-Length: " + responseContentLength + + " Request ID: " + requestId.toString() + + " Proxies: " + proxies + + " First 1K of Body: " + responseBody; + LOG.debug(logMessage); + throw new HttpException(httpResponseCode, requestId, logMessage); + } + } finally { + if (conn != null) { + conn.disconnect(); + } + } + return token; + } + + private static AzureADToken parseTokenFromStream(InputStream httpResponseStream) throws IOException { + AzureADToken token = new AzureADToken(); + try { + int expiryPeriod = 0; + + JsonFactory jf = new JsonFactory(); + JsonParser jp = jf.createJsonParser(httpResponseStream); + String fieldName, fieldValue; + jp.nextToken(); + while (jp.hasCurrentToken()) { + if (jp.getCurrentToken() == JsonToken.FIELD_NAME) { + fieldName = jp.getCurrentName(); + jp.nextToken(); // field value + fieldValue = jp.getText(); + + if (fieldName.equals("access_token")) { + token.setAccessToken(fieldValue); + } + if (fieldName.equals("expires_in")) { + expiryPeriod = Integer.parseInt(fieldValue); + } + } + jp.nextToken(); + } + jp.close(); + long expiry = System.currentTimeMillis(); + expiry = expiry + expiryPeriod * 1000L; // convert expiryPeriod to milliseconds and add + token.setExpiry(new Date(expiry)); + LOG.debug("AADToken: fetched token with expiry " + token.getExpiry().toString()); + } catch (Exception ex) { + LOG.debug("AADToken: got exception when parsing json token " + ex.toString()); + throw ex; + } finally { + httpResponseStream.close(); + } + return token; + } + + private static String consumeInputStream(InputStream inStream, int length) throws IOException { + byte[] b = new byte[length]; + int totalBytesRead = 0; + int bytesRead = 0; + + do { + bytesRead = inStream.read(b, totalBytesRead, length - totalBytesRead); + if (bytesRead > 0) { + totalBytesRead += bytesRead; + } + } while (bytesRead >= 0 && totalBytesRead < length); + + return new String(b, 0, totalBytesRead, StandardCharsets.UTF_8); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java new file mode 100644 index 00000000000..daa5a93bf6c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/AzureADToken.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.util.Date; + + +/** + * Object representing the AAD access token to use when making HTTP requests to Azure Data Lake Storage. + */ +public class AzureADToken { + private String accessToken; + private Date expiry; + + public String getAccessToken() { + return this.accessToken; + } + + public void setAccessToken(String accessToken) { + this.accessToken = accessToken; + } + + public Date getExpiry() { + return new Date(this.expiry.getTime()); + } + + public void setExpiry(Date expiry) { + this.expiry = new Date(expiry.getTime()); + } + +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java new file mode 100644 index 00000000000..9a46018ec62 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/ClientCredsTokenProvider.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Provides tokens based on client credentials. + */ +public class ClientCredsTokenProvider extends AccessTokenProvider { + + private final String authEndpoint; + + private final String clientId; + + private final String clientSecret; + + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + + public ClientCredsTokenProvider(final String authEndpoint, + final String clientId, final String clientSecret) { + + Preconditions.checkNotNull(authEndpoint, "authEndpoint"); + Preconditions.checkNotNull(clientId, "clientId"); + Preconditions.checkNotNull(clientSecret, "clientSecret"); + + this.authEndpoint = authEndpoint; + this.clientId = clientId; + this.clientSecret = clientSecret; + } + + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing client-credential based token"); + return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, clientId, clientSecret); + } + + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java new file mode 100644 index 00000000000..7366a8d7950 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdaptee.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; +import java.util.Date; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + + +/** + * This interface provides an extensibility model for customizing the acquisition + * of Azure Active Directory Access Tokens. When "fs.azure.account.auth.type" is + * set to "Custom", implementors may use the + * "fs.azure.account.oauth.provider.type.{accountName}" configuration property + * to specify a class with a custom implementation of CustomTokenProviderAdaptee. + * This class will be dynamically loaded, initialized, and invoked to provide + * AAD Access Tokens and their Expiry. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface CustomTokenProviderAdaptee { + + /** + * Initialize with supported configuration. This method is invoked when the + * (URI, Configuration)} method is invoked. + * + * @param configuration Configuration object + * @param accountName Account Name + * @throws IOException if instance can not be configured. + */ + void initialize(Configuration configuration, String accountName) + throws IOException; + + /** + * Obtain the access token that should be added to https connection's header. + * Will be called depending upon {@link #getExpiryTime()} expiry time is set, + * so implementations should be performant. Implementations are responsible + * for any refreshing of the token. + * + * @return String containing the access token + * @throws IOException if there is an error fetching the token + */ + String getAccessToken() throws IOException; + + /** + * Obtain expiry time of the token. If implementation is performant enough to + * maintain expiry and expect {@link #getAccessToken()} call for every + * connection then safe to return current or past time. + * + * However recommended to use the token expiry time received from Azure Active + * Directory. + * + * @return Date to expire access token retrieved from AAD. + */ + Date getExpiryTime(); +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java new file mode 100644 index 00000000000..7bae415daf6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/CustomTokenProviderAdapter.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Provides tokens based on custom implementation, following the Adapter Design + * Pattern. + */ +public final class CustomTokenProviderAdapter extends AccessTokenProvider { + + private CustomTokenProviderAdaptee adaptee; + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + /** + * Constructs a token provider based on the custom token provider. + * + * @param adaptee the custom token provider + */ + public CustomTokenProviderAdapter(CustomTokenProviderAdaptee adaptee) { + Preconditions.checkNotNull(adaptee, "adaptee"); + this.adaptee = adaptee; + } + + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing custom based token"); + + AzureADToken azureADToken = new AzureADToken(); + azureADToken.setAccessToken(adaptee.getAccessToken()); + azureADToken.setExpiry(adaptee.getExpiryTime()); + + return azureADToken; + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java new file mode 100644 index 00000000000..2deb9d246d1 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/MsiTokenProvider.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Provides tokens based on Azure VM's Managed Service Identity. + */ +public class MsiTokenProvider extends AccessTokenProvider { + + private final String tenantGuid; + + private final String clientId; + + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + public MsiTokenProvider(final String tenantGuid, final String clientId) { + this.tenantGuid = tenantGuid; + this.clientId = clientId; + } + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing token from MSI"); + AzureADToken token = AzureADAuthenticator.getTokenFromMsi(tenantGuid, clientId, false); + return token; + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java new file mode 100644 index 00000000000..ff6e06f9501 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/QueryParams.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.util.HashMap; +import java.util.Map; + +/** + * Utilities class http query parameters. + */ +public class QueryParams { + private Map params = new HashMap<>(); + private String apiVersion = null; + private String separator = ""; + private String serializedString = null; + + public void add(String name, String value) { + params.put(name, value); + serializedString = null; + } + + public void setApiVersion(String apiVersion) { + this.apiVersion = apiVersion; + serializedString = null; + } + + public String serialize() { + if (serializedString == null) { + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : params.entrySet()) { + String name = entry.getKey(); + try { + sb.append(separator); + sb.append(URLEncoder.encode(name, "UTF-8")); + sb.append('='); + sb.append(URLEncoder.encode(entry.getValue(), "UTF-8")); + separator = "&"; + } catch (UnsupportedEncodingException ex) { + } + } + + if (apiVersion != null) { + sb.append(separator); + sb.append("api-version="); + sb.append(apiVersion); + separator = "&"; + } + serializedString = sb.toString(); + } + return serializedString; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java new file mode 100644 index 00000000000..949d5bf1d80 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/RefreshTokenBasedTokenProvider.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Provides tokens based on refresh token. + */ +public class RefreshTokenBasedTokenProvider extends AccessTokenProvider { + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + private final String clientId; + + private final String refreshToken; + + /** + * Constructs a token provider based on the refresh token provided. + * + * @param clientId the client ID (GUID) of the client web app obtained from Azure Active Directory configuration + * @param refreshToken the refresh token + */ + public RefreshTokenBasedTokenProvider(String clientId, String refreshToken) { + Preconditions.checkNotNull(clientId, "clientId"); + Preconditions.checkNotNull(refreshToken, "refreshToken"); + this.clientId = clientId; + this.refreshToken = refreshToken; + } + + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing refresh-token based token"); + return AzureADAuthenticator.getTokenUsingRefreshToken(clientId, refreshToken); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java new file mode 100644 index 00000000000..7504e9d527e --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/UserPasswordTokenProvider.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.oauth2; + +import java.io.IOException; + +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +/** + * Provides tokens based on username and password. + */ +public class UserPasswordTokenProvider extends AccessTokenProvider { + + private final String authEndpoint; + + private final String username; + + private final String password; + + private static final Logger LOG = LoggerFactory.getLogger(AccessTokenProvider.class); + + public UserPasswordTokenProvider(final String authEndpoint, + final String username, final String password) { + Preconditions.checkNotNull(authEndpoint, "authEndpoint"); + Preconditions.checkNotNull(username, "username"); + Preconditions.checkNotNull(password, "password"); + + this.authEndpoint = authEndpoint; + this.username = username; + this.password = password; + } + + @Override + protected AzureADToken refreshToken() throws IOException { + LOG.debug("AADToken: refreshing user-password based token"); + return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, username, password); + } + + private static String getPasswordString(Configuration conf, String key) + throws IOException { + char[] passchars = conf.getPassword(key); + if (passchars == null) { + throw new IOException("Password " + key + " not found"); + } + return new String(passchars); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java new file mode 100644 index 00000000000..bad1a85b31d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/oauth2/package-info.java @@ -0,0 +1,18 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.oauth2; \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index e003ffd31d9..f5c9f18fc61 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME; @@ -42,7 +43,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.* import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*; /** - * AbfsClient + * AbfsClient. */ public class AbfsClient { public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); @@ -54,9 +55,13 @@ public class AbfsClient { private final AbfsConfiguration abfsConfiguration; private final String userAgent; + private final AccessTokenProvider tokenProvider; + + public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, - final ExponentialRetryPolicy exponentialRetryPolicy) { + final ExponentialRetryPolicy exponentialRetryPolicy, + final AccessTokenProvider tokenProvider) { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); @@ -76,6 +81,7 @@ public class AbfsClient { } this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); + this.tokenProvider = tokenProvider; } public String getFileSystem() { @@ -409,6 +415,14 @@ public class AbfsClient { return encodedString; } + public synchronized String getAccessToken() throws IOException { + if (tokenProvider != null) { + return "Bearer " + tokenProvider.getToken().getAccessToken(); + } else { + return null; + } + } + @VisibleForTesting String initializeUserAgent(final AbfsConfiguration abfsConfiguration, final String sslProviderName) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java index 46b4c6d8442..0067b755460 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsHttpHeader.java @@ -19,7 +19,7 @@ package org.apache.hadoop.fs.azurebfs.services; /** - * The Http Request / Response Headers for Rest AbfsClient + * The Http Request / Response Headers for Rest AbfsClient. */ public class AbfsHttpHeader { private final String name; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 6dd32fafb7a..c0407f58d29 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; /** * The AbfsRestOperation for Rest AbfsClient. @@ -48,7 +49,7 @@ public class AbfsRestOperation { // request body and all the download methods have a response body. private final boolean hasRequestBody; - private final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); + private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); // For uploads, this is the request entity body. For downloads, // this will hold the response entity body. @@ -139,9 +140,15 @@ public class AbfsRestOperation { httpOperation = new AbfsHttpOperation(url, method, requestHeaders); // sign the HTTP request - client.getSharedKeyCredentials().signRequest( - httpOperation.getConnection(), - hasRequestBody ? bufferLength : 0); + if (client.getAccessToken() == null) { + // sign the HTTP request + client.getSharedKeyCredentials().signRequest( + httpOperation.getConnection(), + hasRequestBody ? bufferLength : 0); + } else { + httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, + client.getAccessToken()); + } if (hasRequestBody) { // HttpUrlConnection requires @@ -163,9 +170,7 @@ public class AbfsRestOperation { return false; } - if (LOG.isDebugEnabled()) { - LOG.debug("HttpRequest: " + httpOperation.toString()); - } + LOG.debug("HttpRequest: " + httpOperation.toString()); if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) { return false; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java new file mode 100644 index 00000000000..c95b92cbe61 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AuthType.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +/** + * Auth Type Enum. + */ +public enum AuthType { + SharedKey, + OAuth, + Custom +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index b1f14856cf1..e0afeb4e234 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.azure.AbstractWasbTestWithTimeout; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; @@ -62,7 +63,7 @@ public abstract class AbstractAbfsIntegrationTest extends private static final Logger LOG = LoggerFactory.getLogger(AbstractAbfsIntegrationTest.class); - private final boolean isEmulator; + private boolean isEmulator; private NativeAzureFileSystem wasb; private AzureBlobFileSystem abfs; private String abfsScheme; @@ -71,20 +72,18 @@ public abstract class AbstractAbfsIntegrationTest extends private String fileSystemName; private String accountName; private String testUrl; - - protected AbstractAbfsIntegrationTest(final boolean secure) { - this(secure ? FileSystemUriSchemes.ABFS_SECURE_SCHEME : FileSystemUriSchemes.ABFS_SCHEME); - } + private AuthType authType; protected AbstractAbfsIntegrationTest() { - this(FileSystemUriSchemes.ABFS_SCHEME); - } - - private AbstractAbfsIntegrationTest(final String scheme) { - abfsScheme = scheme; fileSystemName = ABFS_TEST_CONTAINER_PREFIX + UUID.randomUUID().toString(); configuration = new Configuration(); configuration.addResource(ABFS_TEST_RESOURCE_XML); + this.accountName = this.configuration.get(FS_AZURE_TEST_ACCOUNT_NAME); + + authType = configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + + accountName, AuthType.SharedKey); + abfsScheme = authType == AuthType.SharedKey ? FileSystemUriSchemes.ABFS_SCHEME + : FileSystemUriSchemes.ABFS_SECURE_SCHEME; String accountName = configuration.get(FS_AZURE_TEST_ACCOUNT_NAME, ""); assumeTrue("Not set: " + FS_AZURE_TEST_ACCOUNT_NAME, @@ -94,8 +93,13 @@ public abstract class AbstractAbfsIntegrationTest extends accountName, containsString("dfs.core.windows.net")); String fullKey = FS_AZURE_TEST_ACCOUNT_KEY_PREFIX + accountName; - assumeTrue("Not set: " + fullKey, - configuration.get(fullKey) != null); + + if (authType == AuthType.SharedKey) { + assumeTrue("Not set: " + fullKey, configuration.get(fullKey) != null); + } else { + String accessTokenProviderKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName; + assumeTrue("Not set: " + accessTokenProviderKey, configuration.get(accessTokenProviderKey) != null); + } final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName(); URI defaultUri = null; @@ -110,7 +114,6 @@ public abstract class AbstractAbfsIntegrationTest extends configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString()); configuration.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true); this.isEmulator = this.configuration.getBoolean(FS_AZURE_EMULATOR_ENABLED, false); - this.accountName = this.configuration.get(FS_AZURE_TEST_ACCOUNT_NAME); } @@ -119,7 +122,7 @@ public abstract class AbstractAbfsIntegrationTest extends //Create filesystem first to make sure getWasbFileSystem() can return an existing filesystem. createFileSystem(); - if (!isEmulator) { + if (!isEmulator && authType == AuthType.SharedKey) { final URI wasbUri = new URI(abfsUrlToWasbUrl(getTestUrl())); final AzureNativeFileSystemStore azureNativeFileSystemStore = new AzureNativeFileSystemStore(); @@ -234,6 +237,10 @@ public abstract class AbstractAbfsIntegrationTest extends return isEmulator; } + protected AuthType getAuthType() { + return this.authType; + } + /** * Write a buffer to a file. * @param path path diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java index 9c369bb2bf4..1c2083d5038 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsClient.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.azurebfs; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; -import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.junit.Assert; @@ -30,6 +29,7 @@ import org.junit.Test; */ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest { private static final int LIST_MAX_RESULTS = 5000; + @Test public void testContinuationTokenHavingEqualSign() throws Exception { final AzureBlobFileSystem fs = this.getFileSystem(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java index d6964814cc6..6207a47a452 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemBackCompat.java @@ -22,8 +22,11 @@ import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; import com.microsoft.azure.storage.blob.CloudBlockBlob; + +import org.junit.Assume; import org.junit.Test; +import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -34,6 +37,7 @@ public class ITestAzureBlobFileSystemBackCompat extends AbstractAbfsIntegrationTest { public ITestAzureBlobFileSystemBackCompat() { super(); + Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java index 791694bf0f1..13abaf88bd9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java @@ -34,9 +34,6 @@ public class ITestAzureBlobFileSystemFileStatus extends AbstractAbfsIntegrationTest { private static final Path TEST_FILE = new Path("testFile"); private static final Path TEST_FOLDER = new Path("testDir"); - public ITestAzureBlobFileSystemFileStatus() { - super(); - } @Test public void testEnsureStatusWorksForRoot() throws Exception { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java index e4acbaefc61..c1022b01978 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFinalize.java @@ -25,12 +25,14 @@ import org.junit.Test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.services.AuthType; /** * Test finalize() method when "fs.abfs.impl.disable.cache" is enabled. */ public class ITestAzureBlobFileSystemFinalize extends AbstractAbfsScaleTest{ - static final String DISABLE_CACHE_KEY = "fs.abfs.impl.disable.cache"; + static final String DISABLE_ABFS_CACHE_KEY = "fs.abfs.impl.disable.cache"; + static final String DISABLE_ABFSSS_CACHE_KEY = "fs.abfss.impl.disable.cache"; public ITestAzureBlobFileSystemFinalize() throws Exception { super(); @@ -40,7 +42,9 @@ public class ITestAzureBlobFileSystemFinalize extends AbstractAbfsScaleTest{ public void testFinalize() throws Exception { // Disable the cache for filesystem to make sure there is no reference. Configuration configuration = this.getConfiguration(); - configuration.setBoolean(this.DISABLE_CACHE_KEY, true); + configuration.setBoolean( + this.getAuthType() == AuthType.SharedKey ? DISABLE_ABFS_CACHE_KEY : DISABLE_ABFSSS_CACHE_KEY, + true); AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java index 2f40b6444fd..b02d723c9ca 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java @@ -31,9 +31,9 @@ import java.io.IOException; import com.microsoft.azure.storage.blob.BlockEntry; import com.microsoft.azure.storage.blob.BlockListingFilter; import com.microsoft.azure.storage.blob.CloudBlockBlob; -import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; import org.hamcrest.core.IsEqual; import org.hamcrest.core.IsNot; +import org.junit.Assume; import org.junit.Test; import org.apache.hadoop.fs.FSDataInputStream; @@ -42,6 +42,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; +import org.apache.hadoop.fs.azurebfs.services.AuthType; + /** * Test flush operation. */ @@ -209,6 +212,8 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { @Test public void testFlushWithFlushEnabled() throws Exception { + Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey); + AzureBlobStorageTestAccount testAccount = createWasbTestAccount(); String wasbUrl = testAccount.getFileSystem().getName(); String abfsUrl = wasbUrlToAbfsUrl(wasbUrl); @@ -228,6 +233,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest { @Test public void testFlushWithFlushDisabled() throws Exception { + Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey); AzureBlobStorageTestAccount testAccount = createWasbTestAccount(); String wasbUrl = testAccount.getFileSystem().getName(); String abfsUrl = wasbUrlToAbfsUrl(wasbUrl); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java new file mode 100644 index 00000000000..f60740fce0b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemOauth.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs; + + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; + +import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; +import org.junit.Assume; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.services.AuthType; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_ID; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET; + +/** + * Test Azure Oauth with Blob Data contributor role and Blob Data Reader role. + * The Test AAD client need to be configured manually through Azure Portal, then save their properties in + * configuration files. + */ +public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{ + + private static final Path FILE_PATH = new Path("/testFile"); + private static final Path EXISTED_FILE_PATH = new Path("/existedFile"); + private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder"); + + public ITestAzureBlobFileSystemOauth() { + Assume.assumeTrue(this.getAuthType() == AuthType.OAuth); + } + /* + * BLOB DATA CONTRIBUTOR should have full access to the container and blobs in the container. + * */ + @Test + public void testBlobDataContributor() throws Exception { + String clientId = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID); + Assume.assumeTrue("Contributor client id not provided", clientId != null); + String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET); + Assume.assumeTrue("Contributor client secret not provided", secret != null); + + prepareFiles(); + + final AzureBlobFileSystem fs = getBlobConributor(); + + // create and write into file in current container/fs + try(FSDataOutputStream stream = fs.create(FILE_PATH)) { + stream.write(0); + } + assertTrue(fs.exists(FILE_PATH)); + FileStatus fileStatus = fs.getFileStatus(FILE_PATH); + assertEquals(1, fileStatus.getLen()); + // delete file + assertTrue(fs.delete(FILE_PATH, true)); + assertFalse(fs.exists(FILE_PATH)); + + // Verify Blob Data Contributor has full access to existed folder, file + + // READ FOLDER + assertTrue(fs.exists(EXISTED_FOLDER_PATH)); + + //DELETE FOLDER + fs.delete(EXISTED_FOLDER_PATH, true); + assertFalse(fs.exists(EXISTED_FOLDER_PATH)); + + // READ FILE + try (FSDataInputStream stream = fs.open(EXISTED_FILE_PATH)) { + assertTrue(stream.read() != 0); + } + + assertEquals(0, fs.getFileStatus(EXISTED_FILE_PATH).getLen()); + + // WRITE FILE + try (FSDataOutputStream stream = fs.append(EXISTED_FILE_PATH)) { + stream.write(0); + } + + assertEquals(1, fs.getFileStatus(EXISTED_FILE_PATH).getLen()); + + // REMOVE FILE + fs.delete(EXISTED_FILE_PATH, true); + assertFalse(fs.exists(EXISTED_FILE_PATH)); + } + + /* + * BLOB DATA READER should have only READ access to the container and blobs in the container. + * */ + @Test + public void testBlobDataReader() throws Exception { + String clientId = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_ID); + Assume.assumeTrue("Reader client id not provided", clientId != null); + String secret = this.getConfiguration().get(TestConfigurationKeys.FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET); + Assume.assumeTrue("Reader client secret not provided", secret != null); + + prepareFiles(); + final AzureBlobFileSystem fs = getBlobReader(); + + // Use abfsStore in this test to verify the ERROR code in AbfsRestOperationException + AzureBlobFileSystemStore abfsStore = fs.getAbfsStore(); + // TEST READ FS + Map properties = abfsStore.getFilesystemProperties(); + // TEST READ FOLDER + assertTrue(fs.exists(EXISTED_FOLDER_PATH)); + + // TEST DELETE FOLDER + try { + abfsStore.delete(EXISTED_FOLDER_PATH, true); + } catch (AbfsRestOperationException e) { + assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode()); + } + + // TEST READ FILE + try (InputStream inputStream = abfsStore.openFileForRead(EXISTED_FILE_PATH, null)) { + assertTrue(inputStream.read() != 0); + } + + // TEST WRITE FILE + try { + abfsStore.openFileForWrite(EXISTED_FILE_PATH, true); + } catch (AbfsRestOperationException e) { + assertEquals(AzureServiceErrorCode.AUTHORIZATION_PERMISSION_MISS_MATCH, e.getErrorCode()); + } + + } + + private void prepareFiles() throws IOException { + // create test files/folders to verify access control diff between + // Blob data contributor and Blob data reader + final AzureBlobFileSystem fs = this.getFileSystem(); + fs.create(EXISTED_FILE_PATH); + assertTrue(fs.exists(EXISTED_FILE_PATH)); + fs.mkdirs(EXISTED_FOLDER_PATH); + assertTrue(fs.exists(EXISTED_FOLDER_PATH)); + } + + private AzureBlobFileSystem getBlobConributor() throws Exception { + Configuration configuration = this.getConfiguration(); + configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID)); + configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET)); + return getFileSystem(configuration); + } + + private AzureBlobFileSystem getBlobReader() throws Exception { + Configuration configuration = this.getConfiguration(); + configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_READER_CLIENT_ID)); + configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET)); + return getFileSystem(configuration); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java index c61de6764ea..13c5bc8f31f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemRandomRead.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Random; import java.util.concurrent.Callable; +import org.junit.Assume; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; +import org.apache.hadoop.fs.azurebfs.services.AuthType; import org.apache.hadoop.fs.contract.ContractTestUtils; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -66,6 +68,7 @@ public class ITestAzureBlobFileSystemRandomRead extends public ITestAzureBlobFileSystemRandomRead() throws Exception { super(); + Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java index 3a44909a314..50b1828a828 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemInitialization.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; +import org.apache.hadoop.fs.azurebfs.services.AuthType; /** * Test AzureBlobFileSystem initialization. @@ -41,8 +42,10 @@ public class ITestFileSystemInitialization extends AbstractAbfsIntegrationTest { final String accountName = getAccountName(); final String filesystem = getFileSystemName(); + String scheme = this.getAuthType() == AuthType.SharedKey ? FileSystemUriSchemes.ABFS_SCHEME + : FileSystemUriSchemes.ABFS_SECURE_SCHEME; assertEquals(fs.getUri(), - new URI(FileSystemUriSchemes.ABFS_SCHEME, + new URI(scheme, filesystem + "@" + accountName, null, null, diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java index 5d1cf91a9c3..56a91d3eaf9 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestFileSystemRegistration.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; +import org.apache.hadoop.fs.azurebfs.services.AuthType; /** * Test AzureBlobFileSystem registration. @@ -79,8 +80,14 @@ public class ITestFileSystemRegistration extends AbstractAbfsIntegrationTest { AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(getConfiguration()); assertNotNull("filesystem", fs); - Abfs afs = (Abfs) FileContext.getFileContext(getConfiguration()).getDefaultFileSystem(); - assertNotNull("filecontext", afs); + if (this.getAuthType() == AuthType.OAuth) { + Abfss afs = (Abfss) FileContext.getFileContext(getConfiguration()).getDefaultFileSystem(); + assertNotNull("filecontext", afs); + } else { + Abfs afs = (Abfs) FileContext.getFileContext(getConfiguration()).getDefaultFileSystem(); + assertNotNull("filecontext", afs); + } + } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java index a89c0443350..ff28d3e0fdd 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestWasbAbfsCompatibility.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azure.NativeAzureFileSystem; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.azurebfs.services.AuthType; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDeleted; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsDirectory; @@ -50,6 +51,7 @@ public class ITestWasbAbfsCompatibility extends AbstractAbfsIntegrationTest { public ITestWasbAbfsCompatibility() throws Exception { Assume.assumeFalse("Emulator is not supported", isEmulator()); + Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey); } @Test diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java index fc7312aadac..67301c7f8e3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java @@ -28,6 +28,12 @@ public final class TestConfigurationKeys { public static final String FS_AZURE_TEST_HOST_PORT = "fs.azure.test.host.port"; public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs"; + public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id"; + public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET = "fs.azure.account.oauth2.contributor.client.secret"; + + public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_ID = "fs.azure.account.oauth2.reader.client.id"; + public static final String FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET = "fs.azure.account.oauth2.reader.client.secret"; + public static final String ABFS_TEST_RESOURCE_XML = "azure-bfs-test.xml"; public static final String ABFS_TEST_CONTAINER_PREFIX = "abfs-testcontainer-"; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ABFSContractTestBinding.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ABFSContractTestBinding.java index ffd5babc5bf..5505e6ae0f3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ABFSContractTestBinding.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ABFSContractTestBinding.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.services.AuthType; /** * Bind ABFS contract tests to the Azure test setup/teardown. @@ -32,18 +33,17 @@ import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys; public class ABFSContractTestBinding extends AbstractAbfsIntegrationTest { private final URI testUri; - public ABFSContractTestBinding(final boolean secure) throws Exception { - this(secure, true); + public ABFSContractTestBinding() throws Exception { + this(true); } - public ABFSContractTestBinding(final boolean secure, + public ABFSContractTestBinding( final boolean useExistingFileSystem) throws Exception{ - super(secure); if (useExistingFileSystem) { Configuration configuration = getConfiguration(); String testUrl = configuration.get(TestConfigurationKeys.FS_AZURE_CONTRACT_TEST_URI); - if (secure) { + if (getAuthType() != AuthType.SharedKey) { testUrl = testUrl.replaceFirst(FileSystemUriSchemes.ABFS_SCHEME, FileSystemUriSchemes.ABFS_SECURE_SCHEME); } setTestUrl(testUrl); @@ -61,4 +61,8 @@ public class ABFSContractTestBinding extends AbstractAbfsIntegrationTest { public Configuration getConfiguration() { return super.getConfiguration(); } + + public boolean isSecureMode() { + return this.getAuthType() == AuthType.SharedKey ? false : true; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractAppend.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractAppend.java index a302fccfd9c..8a955bc6062 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractAppend.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractAppend.java @@ -18,34 +18,23 @@ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractAppendTest; import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.junit.Test; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; /** * Contract test for open operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractAppend extends AbstractContractAppendTest { - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractAppend(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(this.isSecure); + public ITestAbfsFileSystemContractAppend() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractConcat.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractConcat.java index c31a6d2a5aa..383528b75a8 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractConcat.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractConcat.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractConcatTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -29,19 +24,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Contract test for concat operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractConcat extends AbstractContractConcatTest{ - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractConcat(final boolean secure) throws Exception { - isSecure = secure; - binding = new ABFSContractTestBinding(isSecure); + public ITestAbfsFileSystemContractConcat() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractCreate.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractCreate.java index ce4d22963e1..3c3e9490365 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractCreate.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractCreate.java @@ -18,11 +18,6 @@ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractCreateTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Contract test for create operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractCreate extends AbstractContractCreateTest{ - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractCreate(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(this.isSecure); + public ITestAbfsFileSystemContractCreate() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDelete.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDelete.java index 310731cc0eb..1d1136c3538 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDelete.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDelete.java @@ -18,11 +18,6 @@ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractDeleteTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Contract test for delete operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractDelete extends AbstractContractDeleteTest { - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractDelete(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(isSecure); + public ITestAbfsFileSystemContractDelete() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDistCp.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDistCp.java index c2cf25569b2..544bbbfbdb1 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDistCp.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractDistCp.java @@ -28,7 +28,7 @@ public class ITestAbfsFileSystemContractDistCp extends AbstractContractDistCpTes private final ABFSContractTestBinding binding; public ITestAbfsFileSystemContractDistCp() throws Exception { - binding = new ABFSContractTestBinding(false); + binding = new ABFSContractTestBinding(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractGetFileStatus.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractGetFileStatus.java index 9ad3b215e10..08b7eefe198 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractGetFileStatus.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractGetFileStatus.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -29,19 +24,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Contract test for getFileStatus operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractGetFileStatus extends AbstractContractGetFileStatusTest { - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractGetFileStatus(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(isSecure); + public ITestAbfsFileSystemContractGetFileStatus() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractMkdir.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractMkdir.java index 6265ca1f92c..7b785753f2c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractMkdir.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractMkdir.java @@ -18,11 +18,6 @@ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractMkdirTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Contract test for mkdir operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractMkdir extends AbstractContractMkdirTest { - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractMkdir(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(secure); + public ITestAbfsFileSystemContractMkdir() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractOpen.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractOpen.java index ae4bb2a4fc6..41f691d512f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractOpen.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractOpen.java @@ -18,11 +18,6 @@ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractOpenTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Contract test for open operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractOpen extends AbstractContractOpenTest { - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractOpen(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(this.isSecure); + public ITestAbfsFileSystemContractOpen() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRename.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRename.java index 6e6a7280350..82f104a44bf 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRename.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRename.java @@ -18,11 +18,6 @@ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractRenameTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Contract test for rename operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractRename extends AbstractContractRenameTest { - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractRename(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(this.isSecure); + public ITestAbfsFileSystemContractRename() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRootDirectory.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRootDirectory.java index 01dea2d164d..5b5493fdcd0 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRootDirectory.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractRootDirectory.java @@ -17,31 +17,21 @@ */ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractRootDirectoryTest; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.junit.Ignore; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; /** * Contract test for root directory operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractRootDirectory extends AbstractContractRootDirectoryTest { - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractRootDirectory(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(secure); + public ITestAbfsFileSystemContractRootDirectory() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSecureDistCp.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSecureDistCp.java index 5ed74668408..fc235e36c61 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSecureDistCp.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSecureDistCp.java @@ -28,7 +28,7 @@ public class ITestAbfsFileSystemContractSecureDistCp extends AbstractContractDis private final ABFSContractTestBinding binding; public ITestAbfsFileSystemContractSecureDistCp() throws Exception { - binding = new ABFSContractTestBinding(true); + binding = new ABFSContractTestBinding(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java index 5e0ea0c94ea..4529e752b01 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSeek.java @@ -18,11 +18,6 @@ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractSeekTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -30,19 +25,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Contract test for seek operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{ - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractSeek(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(this.isSecure); + public ITestAbfsFileSystemContractSeek() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSetTimes.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSetTimes.java index 8d23b0bbffd..6c4f9badc0f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSetTimes.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAbfsFileSystemContractSetTimes.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.fs.azurebfs.contract; -import java.util.Arrays; - -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.contract.AbstractContractSetTimesTest; import org.apache.hadoop.fs.contract.AbstractFSContract; @@ -29,19 +24,13 @@ import org.apache.hadoop.fs.contract.AbstractFSContract; /** * Contract test for setTimes operation. */ -@RunWith(Parameterized.class) public class ITestAbfsFileSystemContractSetTimes extends AbstractContractSetTimesTest { - @Parameterized.Parameters(name = "SecureMode={0}") - public static Iterable secure() { - return Arrays.asList(new Object[][] { {true}, {false} }); - } - private final boolean isSecure; private final ABFSContractTestBinding binding; - public ITestAbfsFileSystemContractSetTimes(final boolean secure) throws Exception { - this.isSecure = secure; - binding = new ABFSContractTestBinding(this.isSecure); + public ITestAbfsFileSystemContractSetTimes() throws Exception { + binding = new ABFSContractTestBinding(); + this.isSecure = binding.isSecureMode(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAzureBlobFileSystemBasics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAzureBlobFileSystemBasics.java index d8854a2b0a0..a9fa2d77194 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAzureBlobFileSystemBasics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/ITestAzureBlobFileSystemBasics.java @@ -40,7 +40,7 @@ public class ITestAzureBlobFileSystemBasics extends FileSystemContractBaseTest { public ITestAzureBlobFileSystemBasics() throws Exception { // If all contract tests are running in parallel, some root level tests in FileSystemContractBaseTest will fail // due to the race condition. Hence for this contract test it should be tested in different container - binding = new ABFSContractTestBinding(false, false); + binding = new ABFSContractTestBinding(false); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index 7bb27fc4514..a2fdd095ac5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -21,11 +21,11 @@ package org.apache.hadoop.fs.azurebfs.services; import java.net.URL; import java.util.regex.Pattern; -import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; -import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx; import org.junit.Assert; import org.junit.Test; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; @@ -40,7 +40,7 @@ public final class TestAbfsClient { AbfsConfiguration config, boolean includeSSLProvider) { AbfsClient client = new AbfsClient(baseUrl, null, - config, null); + config, null, null); String sslProviderName = null; if (includeSSLProvider) { sslProviderName = SSLSocketFactoryEx.getDefaultFactory().getProviderName(); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestQueryParams.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestQueryParams.java new file mode 100644 index 00000000000..e6c6993b1dc --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestQueryParams.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.HashMap; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.oauth2.QueryParams; +/** + * Test query params serialization. + */ +public class TestQueryParams { + private static final String SEPARATOR = "&"; + private static final String[][] PARAM_ARRAY = {{"K0", "V0"}, {"K1", "V1"}, {"K2", "V2"}}; + + @Test + public void testOneParam() { + String key = PARAM_ARRAY[0][0]; + String value = PARAM_ARRAY[0][1]; + + Map paramMap = new HashMap<>(); + paramMap.put(key, value); + + QueryParams qp = new QueryParams(); + qp.add(key, value); + Assert.assertEquals(key + "=" + value, qp.serialize()); + } + + @Test + public void testMultipleParams() { + QueryParams qp = new QueryParams(); + for (String[] entry : PARAM_ARRAY) { + qp.add(entry[0], entry[1]); + } + Map paramMap = constructMap(qp.serialize()); + Assert.assertEquals(PARAM_ARRAY.length, paramMap.size()); + + for (String[] entry : PARAM_ARRAY) { + Assert.assertTrue(paramMap.containsKey(entry[0])); + Assert.assertEquals(entry[1], paramMap.get(entry[0])); + } + } + + private Map constructMap(String input) { + String[] entries = input.split(SEPARATOR); + Map paramMap = new HashMap<>(); + for (String entry : entries) { + String[] keyValue = entry.split("="); + paramMap.put(keyValue[0], keyValue[1]); + } + return paramMap; + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/CleanUpAbfsTestContainer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/CleanUpAbfsTestContainer.java index 9051a72e127..ef4ddb92157 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/CleanUpAbfsTestContainer.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/CleanUpAbfsTestContainer.java @@ -21,9 +21,13 @@ import com.microsoft.azure.storage.CloudStorageAccount; import com.microsoft.azure.storage.blob.CloudBlobClient; import com.microsoft.azure.storage.blob.CloudBlobContainer; -import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; +import org.junit.Assume; import org.junit.Test; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.services.AuthType; + import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ABFS_TEST_CONTAINER_PREFIX; /** @@ -31,7 +35,12 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.ABFS * In that case, dev can use this tool to list and delete all test containers. * By default, all test container used in E2E tests sharing same prefix: "abfs-testcontainer-" */ -public class CleanUpAbfsTestContainer { +public class CleanUpAbfsTestContainer extends AbstractAbfsIntegrationTest{ + + public CleanUpAbfsTestContainer() { + Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey); + } + @Test public void testEnumContainers() throws Throwable { int count = 0; diff --git a/hadoop-tools/hadoop-azure/src/test/resources/azure-bfs-test.xml b/hadoop-tools/hadoop-azure/src/test/resources/azure-bfs-test.xml index 464a8e670e9..9b908b06fd7 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/azure-bfs-test.xml +++ b/hadoop-tools/hadoop-azure/src/test/resources/azure-bfs-test.xml @@ -22,6 +22,27 @@ {YOURACCOUNT} + + fs.azure.account.auth.type.{YOURACCOUNT}.dfs.core.windows.net + {AUTH TYPE} + The auth type can be : SharedKey, OAuth, Custom. By default "SharedKey" is used. + + + + fs.contract.test.fs.abfs + abfs://{CONTAINERNAME}@{ACCOUNTNAME}.dfs.core.windows.net/value> + The name of the azure file system for testing. + + + + fs.contract.test.fs.abfss + abfss://{CONTAINERNAME}@{ACCOUNTNAME}.dfs.core.windows.net/value> + The name of the azure file system for testing. + + --> + + + + + + + + + + + + + + + + + + + + + + + +