HADOOP-15694. ABFS: Allow OAuth credentials to not be tied to accounts.
Contributed by Sean Mackrory.
This commit is contained in:
parent
13c70e9ba3
commit
e5593cbd83
|
@ -54,6 +54,7 @@ import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
|||
import org.apache.hadoop.fs.azurebfs.services.KeyProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.services.SimpleKeyProvider;
|
||||
import org.apache.hadoop.fs.azurebfs.utils.SSLSocketFactoryEx;
|
||||
import org.apache.hadoop.security.ProviderUtils;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
|
||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
|
||||
|
@ -65,7 +66,8 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*
|
|||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Evolving
|
||||
public class AbfsConfiguration{
|
||||
private final Configuration configuration;
|
||||
private final Configuration rawConfig;
|
||||
private final String accountName;
|
||||
private final boolean isSecure;
|
||||
|
||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
|
||||
|
@ -155,9 +157,12 @@ public class AbfsConfiguration{
|
|||
|
||||
private Map<String, String> storageAccountKeys;
|
||||
|
||||
public AbfsConfiguration(final Configuration configuration) throws IllegalAccessException, InvalidConfigurationValueException {
|
||||
this.configuration = configuration;
|
||||
this.isSecure = this.configuration.getBoolean(FS_AZURE_SECURE_MODE, false);
|
||||
public AbfsConfiguration(final Configuration rawConfig, String accountName)
|
||||
throws IllegalAccessException, InvalidConfigurationValueException, IOException {
|
||||
this.rawConfig = ProviderUtils.excludeIncompatibleCredentialProviders(
|
||||
rawConfig, AzureBlobFileSystem.class);
|
||||
this.accountName = accountName;
|
||||
this.isSecure = getBoolean(FS_AZURE_SECURE_MODE, false);
|
||||
|
||||
validateStorageAccountKeys();
|
||||
Field[] fields = this.getClass().getDeclaredFields();
|
||||
|
@ -177,14 +182,130 @@ public class AbfsConfiguration{
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Appends an account name to a configuration key yielding the
|
||||
* account-specific form.
|
||||
* @param key Account-agnostic configuration key
|
||||
* @return Account-specific configuration key
|
||||
*/
|
||||
public String accountConf(String key) {
|
||||
return key + "." + accountName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the account-specific value if it exists, then looks for an
|
||||
* account-agnostic value.
|
||||
* @param key Account-agnostic configuration key
|
||||
* @return value if one exists, else null
|
||||
*/
|
||||
public String get(String key) {
|
||||
return rawConfig.get(accountConf(key), rawConfig.get(key));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the account-specific value if it exists, then looks for an
|
||||
* account-agnostic value, and finally tries the default value.
|
||||
* @param key Account-agnostic configuration key
|
||||
* @param defaultValue Value returned if none is configured
|
||||
* @return value if one exists, else the default value
|
||||
*/
|
||||
public boolean getBoolean(String key, boolean defaultValue) {
|
||||
return rawConfig.getBoolean(accountConf(key), rawConfig.getBoolean(key, defaultValue));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the account-specific value if it exists, then looks for an
|
||||
* account-agnostic value, and finally tries the default value.
|
||||
* @param key Account-agnostic configuration key
|
||||
* @param defaultValue Value returned if none is configured
|
||||
* @return value if one exists, else the default value
|
||||
*/
|
||||
public long getLong(String key, long defaultValue) {
|
||||
return rawConfig.getLong(accountConf(key), rawConfig.getLong(key, defaultValue));
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the account-specific password in string form if it exists, then
|
||||
* looks for an account-agnostic value.
|
||||
* @param key Account-agnostic configuration key
|
||||
* @return value in String form if one exists, else null
|
||||
* @throws IOException
|
||||
*/
|
||||
public String getPasswordString(String key) throws IOException {
|
||||
char[] passchars = rawConfig.getPassword(accountConf(key));
|
||||
if (passchars == null) {
|
||||
passchars = rawConfig.getPassword(key);
|
||||
}
|
||||
if (passchars != null) {
|
||||
return new String(passchars);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the account-specific Class if it exists, then looks for an
|
||||
* account-agnostic value, and finally tries the default value.
|
||||
* @param name Account-agnostic configuration key
|
||||
* @param defaultValue Class returned if none is configured
|
||||
* @param xface Interface shared by all possible values
|
||||
* @return Highest-precedence Class object that was found
|
||||
* @throws IOException
|
||||
*/
|
||||
public <U> Class<? extends U> getClass(String name, Class<? extends U> defaultValue, Class<U> xface) {
|
||||
return rawConfig.getClass(accountConf(name),
|
||||
rawConfig.getClass(name, defaultValue, xface),
|
||||
xface);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the account-specific password in string form if it exists, then
|
||||
* looks for an account-agnostic value.
|
||||
* @param name Account-agnostic configuration key
|
||||
* @param defaultValue Value returned if none is configured
|
||||
* @return value in String form if one exists, else null
|
||||
* @throws IOException
|
||||
*/
|
||||
public <T extends Enum<T>> T getEnum(String name, T defaultValue) {
|
||||
return rawConfig.getEnum(accountConf(name),
|
||||
rawConfig.getEnum(name, defaultValue));
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsets parameter in the underlying Configuration object.
|
||||
* Provided only as a convenience; does not add any account logic.
|
||||
* @param key Configuration key
|
||||
*/
|
||||
public void unset(String key) {
|
||||
rawConfig.unset(key);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets String in the underlying Configuration object.
|
||||
* Provided only as a convenience; does not add any account logic.
|
||||
* @param key Configuration key
|
||||
* @param value Configuration value
|
||||
*/
|
||||
public void set(String key, String value) {
|
||||
rawConfig.set(key, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets boolean in the underlying Configuration object.
|
||||
* Provided only as a convenience; does not add any account logic.
|
||||
* @param key Configuration key
|
||||
* @param value Configuration value
|
||||
*/
|
||||
public void setBoolean(String key, boolean value) {
|
||||
rawConfig.setBoolean(key, value);
|
||||
}
|
||||
|
||||
public boolean isSecureMode() {
|
||||
return isSecure;
|
||||
}
|
||||
|
||||
public String getStorageAccountKey(final String accountName) throws AzureBlobFileSystemException {
|
||||
public String getStorageAccountKey() throws AzureBlobFileSystemException {
|
||||
String key;
|
||||
String keyProviderClass =
|
||||
configuration.get(AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX + accountName);
|
||||
String keyProviderClass = get(AZURE_KEY_ACCOUNT_KEYPROVIDER);
|
||||
KeyProvider keyProvider;
|
||||
|
||||
if (keyProviderClass == null) {
|
||||
|
@ -195,7 +316,7 @@ public class AbfsConfiguration{
|
|||
// implements KeyProvider
|
||||
Object keyProviderObject;
|
||||
try {
|
||||
Class<?> clazz = configuration.getClassByName(keyProviderClass);
|
||||
Class<?> clazz = rawConfig.getClassByName(keyProviderClass);
|
||||
keyProviderObject = clazz.newInstance();
|
||||
} catch (Exception e) {
|
||||
throw new KeyProviderException("Unable to load key provider class.", e);
|
||||
|
@ -206,7 +327,7 @@ public class AbfsConfiguration{
|
|||
}
|
||||
keyProvider = (KeyProvider) keyProviderObject;
|
||||
}
|
||||
key = keyProvider.getStorageAccountKey(accountName, configuration);
|
||||
key = keyProvider.getStorageAccountKey(accountName, rawConfig);
|
||||
|
||||
if (key == null) {
|
||||
throw new ConfigurationPropertyNotFoundException(accountName);
|
||||
|
@ -215,8 +336,8 @@ public class AbfsConfiguration{
|
|||
return key;
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
return this.configuration;
|
||||
public Configuration getRawConfiguration() {
|
||||
return this.rawConfig;
|
||||
}
|
||||
|
||||
public int getWriteBufferSize() {
|
||||
|
@ -292,11 +413,11 @@ public class AbfsConfiguration{
|
|||
}
|
||||
|
||||
public SSLSocketFactoryEx.SSLChannelMode getPreferredSSLFactoryOption() {
|
||||
return configuration.getEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DEFAULT_FS_AZURE_SSL_CHANNEL_MODE);
|
||||
return getEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, DEFAULT_FS_AZURE_SSL_CHANNEL_MODE);
|
||||
}
|
||||
|
||||
public AuthType getAuthType(final String accountName) {
|
||||
return configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey);
|
||||
public AuthType getAuthType(String accountName) {
|
||||
return getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
|
||||
}
|
||||
|
||||
public boolean isDelegationTokenManagerEnabled() {
|
||||
|
@ -304,34 +425,34 @@ public class AbfsConfiguration{
|
|||
}
|
||||
|
||||
public AbfsDelegationTokenManager getDelegationTokenManager() throws IOException {
|
||||
return new AbfsDelegationTokenManager(configuration);
|
||||
return new AbfsDelegationTokenManager(getRawConfiguration());
|
||||
}
|
||||
|
||||
public AccessTokenProvider getTokenProvider(final String accountName) throws TokenAccessProviderException {
|
||||
AuthType authType = configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME + accountName, AuthType.SharedKey);
|
||||
public AccessTokenProvider getTokenProvider() throws TokenAccessProviderException {
|
||||
AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
|
||||
if (authType == AuthType.OAuth) {
|
||||
try {
|
||||
Class<? extends AccessTokenProvider> tokenProviderClass =
|
||||
configuration.getClass(FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName, null,
|
||||
getClass(FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME, null,
|
||||
AccessTokenProvider.class);
|
||||
AccessTokenProvider tokenProvider = null;
|
||||
if (tokenProviderClass == ClientCredsTokenProvider.class) {
|
||||
String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName);
|
||||
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName);
|
||||
String clientSecret = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + accountName);
|
||||
String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT);
|
||||
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
||||
String clientSecret = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET);
|
||||
tokenProvider = new ClientCredsTokenProvider(authEndpoint, clientId, clientSecret);
|
||||
} else if (tokenProviderClass == UserPasswordTokenProvider.class) {
|
||||
String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT + accountName);
|
||||
String username = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_NAME + accountName);
|
||||
String password = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD + accountName);
|
||||
String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT);
|
||||
String username = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_NAME);
|
||||
String password = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD);
|
||||
tokenProvider = new UserPasswordTokenProvider(authEndpoint, username, password);
|
||||
} else if (tokenProviderClass == MsiTokenProvider.class) {
|
||||
String tenantGuid = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT + accountName);
|
||||
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName);
|
||||
String tenantGuid = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT);
|
||||
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
||||
tokenProvider = new MsiTokenProvider(tenantGuid, clientId);
|
||||
} else if (tokenProviderClass == RefreshTokenBasedTokenProvider.class) {
|
||||
String refreshToken = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN + accountName);
|
||||
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + accountName);
|
||||
String refreshToken = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN);
|
||||
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
||||
tokenProvider = new RefreshTokenBasedTokenProvider(clientId, refreshToken);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Failed to initialize " + tokenProviderClass);
|
||||
|
@ -345,20 +466,19 @@ public class AbfsConfiguration{
|
|||
|
||||
} else if (authType == AuthType.Custom) {
|
||||
try {
|
||||
String configKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName;
|
||||
String configKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME;
|
||||
Class<? extends CustomTokenProviderAdaptee> customTokenProviderClass =
|
||||
configuration.getClass(configKey, null,
|
||||
CustomTokenProviderAdaptee.class);
|
||||
getClass(configKey, null, CustomTokenProviderAdaptee.class);
|
||||
if (customTokenProviderClass == null) {
|
||||
throw new IllegalArgumentException(
|
||||
String.format("The configuration value for \"%s\" is invalid.", configKey));
|
||||
}
|
||||
CustomTokenProviderAdaptee azureTokenProvider = ReflectionUtils
|
||||
.newInstance(customTokenProviderClass, configuration);
|
||||
.newInstance(customTokenProviderClass, rawConfig);
|
||||
if (azureTokenProvider == null) {
|
||||
throw new IllegalArgumentException("Failed to initialize " + customTokenProviderClass);
|
||||
}
|
||||
azureTokenProvider.initialize(configuration, accountName);
|
||||
azureTokenProvider.initialize(rawConfig, accountName);
|
||||
return new CustomTokenProviderAdapter(azureTokenProvider);
|
||||
} catch(IllegalArgumentException e) {
|
||||
throw e;
|
||||
|
@ -375,7 +495,7 @@ public class AbfsConfiguration{
|
|||
void validateStorageAccountKeys() throws InvalidConfigurationValueException {
|
||||
Base64StringConfigurationBasicValidator validator = new Base64StringConfigurationBasicValidator(
|
||||
FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME, "", true);
|
||||
this.storageAccountKeys = configuration.getValByRegex(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
|
||||
this.storageAccountKeys = rawConfig.getValByRegex(FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX);
|
||||
|
||||
for (Map.Entry<String, String> account : storageAccountKeys.entrySet()) {
|
||||
validator.validate(account.getValue());
|
||||
|
@ -384,7 +504,7 @@ public class AbfsConfiguration{
|
|||
|
||||
int validateInt(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
|
||||
IntegerConfigurationValidatorAnnotation validator = field.getAnnotation(IntegerConfigurationValidatorAnnotation.class);
|
||||
String value = configuration.get(validator.ConfigurationKey());
|
||||
String value = get(validator.ConfigurationKey());
|
||||
|
||||
// validate
|
||||
return new IntegerConfigurationBasicValidator(
|
||||
|
@ -397,7 +517,7 @@ public class AbfsConfiguration{
|
|||
|
||||
long validateLong(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
|
||||
LongConfigurationValidatorAnnotation validator = field.getAnnotation(LongConfigurationValidatorAnnotation.class);
|
||||
String value = configuration.get(validator.ConfigurationKey());
|
||||
String value = rawConfig.get(validator.ConfigurationKey());
|
||||
|
||||
// validate
|
||||
return new LongConfigurationBasicValidator(
|
||||
|
@ -410,7 +530,7 @@ public class AbfsConfiguration{
|
|||
|
||||
String validateString(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
|
||||
StringConfigurationValidatorAnnotation validator = field.getAnnotation(StringConfigurationValidatorAnnotation.class);
|
||||
String value = configuration.get(validator.ConfigurationKey());
|
||||
String value = rawConfig.get(validator.ConfigurationKey());
|
||||
|
||||
// validate
|
||||
return new StringConfigurationBasicValidator(
|
||||
|
@ -421,7 +541,7 @@ public class AbfsConfiguration{
|
|||
|
||||
String validateBase64String(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
|
||||
Base64StringConfigurationValidatorAnnotation validator = field.getAnnotation((Base64StringConfigurationValidatorAnnotation.class));
|
||||
String value = configuration.get(validator.ConfigurationKey());
|
||||
String value = rawConfig.get(validator.ConfigurationKey());
|
||||
|
||||
// validate
|
||||
return new Base64StringConfigurationBasicValidator(
|
||||
|
@ -432,7 +552,7 @@ public class AbfsConfiguration{
|
|||
|
||||
boolean validateBoolean(Field field) throws IllegalAccessException, InvalidConfigurationValueException {
|
||||
BooleanConfigurationValidatorAnnotation validator = field.getAnnotation(BooleanConfigurationValidatorAnnotation.class);
|
||||
String value = configuration.get(validator.ConfigurationKey());
|
||||
String value = rawConfig.get(validator.ConfigurationKey());
|
||||
|
||||
// validate
|
||||
return new BooleanConfigurationBasicValidator(
|
||||
|
@ -441,14 +561,6 @@ public class AbfsConfiguration{
|
|||
validator.ThrowIfInvalid()).validate(value);
|
||||
}
|
||||
|
||||
String getPasswordString(String key) throws IOException {
|
||||
char[] passchars = configuration.getPassword(key);
|
||||
if (passchars != null) {
|
||||
return new String(passchars);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setReadBufferSize(int bufferSize) {
|
||||
this.readBufferSize = bufferSize;
|
||||
|
@ -463,4 +575,4 @@ public class AbfsConfiguration{
|
|||
void setEnableFlush(boolean enableFlush) {
|
||||
this.enableFlush = enableFlush;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -113,10 +113,15 @@ public class AzureBlobFileSystemStore {
|
|||
private boolean isNamespaceEnabled;
|
||||
|
||||
public AzureBlobFileSystemStore(URI uri, boolean isSecure, Configuration configuration, UserGroupInformation userGroupInformation)
|
||||
throws AzureBlobFileSystemException {
|
||||
throws AzureBlobFileSystemException, IOException {
|
||||
this.uri = uri;
|
||||
|
||||
String[] authorityParts = authorityParts(uri);
|
||||
final String fileSystemName = authorityParts[0];
|
||||
final String accountName = authorityParts[1];
|
||||
|
||||
try {
|
||||
this.abfsConfiguration = new AbfsConfiguration(configuration);
|
||||
this.abfsConfiguration = new AbfsConfiguration(configuration, accountName);
|
||||
} catch (IllegalAccessException exception) {
|
||||
throw new FileSystemOperationUnhandledException(exception);
|
||||
}
|
||||
|
@ -125,7 +130,31 @@ public class AzureBlobFileSystemStore {
|
|||
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
|
||||
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
|
||||
|
||||
initializeClient(uri, isSecure);
|
||||
initializeClient(uri, fileSystemName, accountName, isSecure);
|
||||
}
|
||||
|
||||
private String[] authorityParts(URI uri) throws InvalidUriAuthorityException, InvalidUriException {
|
||||
final String authority = uri.getRawAuthority();
|
||||
if (null == authority) {
|
||||
throw new InvalidUriAuthorityException(uri.toString());
|
||||
}
|
||||
|
||||
if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) {
|
||||
throw new InvalidUriAuthorityException(uri.toString());
|
||||
}
|
||||
|
||||
final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
|
||||
|
||||
if (authorityParts.length < 2 || authorityParts[0] != null
|
||||
&& authorityParts[0].isEmpty()) {
|
||||
final String errMsg = String
|
||||
.format("'%s' has a malformed authority, expected container name. "
|
||||
+ "Authority takes the form "
|
||||
+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>",
|
||||
uri.toString());
|
||||
throw new InvalidUriException(errMsg);
|
||||
}
|
||||
return authorityParts;
|
||||
}
|
||||
|
||||
public boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
|
||||
|
@ -154,7 +183,7 @@ public class AzureBlobFileSystemStore {
|
|||
// the Azure Storage Service URI changes from
|
||||
// http[s]://[account][domain-suffix]/[filesystem] to
|
||||
// http[s]://[ip]:[port]/[account]/[filesystem].
|
||||
String endPoint = abfsConfiguration.getConfiguration().get(AZURE_ABFS_ENDPOINT);
|
||||
String endPoint = abfsConfiguration.get(AZURE_ABFS_ENDPOINT);
|
||||
if (endPoint == null || !endPoint.contains(AbfsHttpConstants.COLON)) {
|
||||
uriBuilder.setHost(hostName);
|
||||
return uriBuilder;
|
||||
|
@ -738,36 +767,12 @@ public class AzureBlobFileSystemStore {
|
|||
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
|
||||
}
|
||||
|
||||
private void initializeClient(URI uri, boolean isSeure) throws AzureBlobFileSystemException {
|
||||
private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) throws AzureBlobFileSystemException {
|
||||
if (this.client != null) {
|
||||
return;
|
||||
}
|
||||
|
||||
final String authority = uri.getRawAuthority();
|
||||
if (null == authority) {
|
||||
throw new InvalidUriAuthorityException(uri.toString());
|
||||
}
|
||||
|
||||
if (!authority.contains(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER)) {
|
||||
throw new InvalidUriAuthorityException(uri.toString());
|
||||
}
|
||||
|
||||
final String[] authorityParts = authority.split(AbfsHttpConstants.AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER, 2);
|
||||
|
||||
if (authorityParts.length < 2 || authorityParts[0] != null
|
||||
&& authorityParts[0].isEmpty()) {
|
||||
final String errMsg = String
|
||||
.format("'%s' has a malformed authority, expected container name. "
|
||||
+ "Authority takes the form "
|
||||
+ FileSystemUriSchemes.ABFS_SCHEME + "://[<container name>@]<account name>",
|
||||
uri.toString());
|
||||
throw new InvalidUriException(errMsg);
|
||||
}
|
||||
|
||||
final String fileSystemName = authorityParts[0];
|
||||
final String accountName = authorityParts[1];
|
||||
|
||||
final URIBuilder uriBuilder = getURIBuilder(accountName, isSeure);
|
||||
final URIBuilder uriBuilder = getURIBuilder(accountName, isSecure);
|
||||
|
||||
final String url = uriBuilder.toString() + AbfsHttpConstants.FORWARD_SLASH + fileSystemName;
|
||||
|
||||
|
@ -788,9 +793,9 @@ public class AzureBlobFileSystemStore {
|
|||
uri.toString() + " - account name is not fully qualified.");
|
||||
}
|
||||
creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
|
||||
abfsConfiguration.getStorageAccountKey(accountName));
|
||||
abfsConfiguration.getStorageAccountKey());
|
||||
} else {
|
||||
tokenProvider = abfsConfiguration.getTokenProvider(accountName);
|
||||
tokenProvider = abfsConfiguration.getTokenProvider();
|
||||
}
|
||||
|
||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(), tokenProvider);
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public final class ConfigurationKeys {
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key.";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
|
||||
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
|
||||
|
||||
|
@ -54,29 +54,33 @@ public final class ConfigurationKeys {
|
|||
public static final String FS_AZURE_USER_AGENT_PREFIX_KEY = "fs.azure.user.agent.prefix";
|
||||
public static final String FS_AZURE_SSL_CHANNEL_MODE_KEY = "fs.azure.ssl.channel.mode";
|
||||
|
||||
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER_PREFIX = "fs.azure.account.keyprovider.";
|
||||
public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
|
||||
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
|
||||
|
||||
/** End point of ABFS account: {@value}. */
|
||||
public static final String AZURE_ABFS_ENDPOINT = "fs.azure.abfs.endpoint";
|
||||
/** Prefix for auth type properties: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME = "fs.azure.account.auth.type.";
|
||||
/** Prefix for oauth token provider type: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME = "fs.azure.account.oauth.provider.type.";
|
||||
/** Prefix for oauth AAD client id: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID = "fs.azure.account.oauth2.client.id.";
|
||||
/** Prefix for oauth AAD client secret: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET = "fs.azure.account.oauth2.client.secret.";
|
||||
/** Prefix for oauth AAD client endpoint: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT = "fs.azure.account.oauth2.client.endpoint.";
|
||||
/** Prefix for oauth msi tenant id: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT = "fs.azure.account.oauth2.msi.tenant.";
|
||||
/** Prefix for oauth user name: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_USER_NAME = "fs.azure.account.oauth2.user.name.";
|
||||
/** Prefix for oauth user password: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD = "fs.azure.account.oauth2.user.password.";
|
||||
/** Prefix for oauth refresh token: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token.";
|
||||
/** Key for auth type properties: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME = "fs.azure.account.auth.type";
|
||||
/** Key for oauth token provider type: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME = "fs.azure.account.oauth.provider.type";
|
||||
/** Key for oauth AAD client id: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID = "fs.azure.account.oauth2.client.id";
|
||||
/** Key for oauth AAD client secret: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET = "fs.azure.account.oauth2.client.secret";
|
||||
/** Key for oauth AAD client endpoint: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT = "fs.azure.account.oauth2.client.endpoint";
|
||||
/** Key for oauth msi tenant id: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_MSI_TENANT = "fs.azure.account.oauth2.msi.tenant";
|
||||
/** Key for oauth user name: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_USER_NAME = "fs.azure.account.oauth2.user.name";
|
||||
/** Key for oauth user password: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD = "fs.azure.account.oauth2.user.password";
|
||||
/** Key for oauth refresh token: {@value}. */
|
||||
public static final String FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN = "fs.azure.account.oauth2.refresh.token";
|
||||
|
||||
public static String accountProperty(String property, String account) {
|
||||
return property + "." + account;
|
||||
}
|
||||
|
||||
public static final String FS_AZURE_ENABLE_DELEGATION_TOKEN = "fs.azure.enable.delegation.token";
|
||||
public static final String FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE = "fs.azure.delegation.token.provider.type";
|
||||
|
|
|
@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
/**
|
||||
* Provides tokens based on username and password.
|
||||
*/
|
||||
|
@ -54,13 +53,4 @@ public class UserPasswordTokenProvider extends AccessTokenProvider {
|
|||
LOG.debug("AADToken: refreshing user-password based token");
|
||||
return AzureADAuthenticator.getTokenUsingClientCreds(authEndpoint, username, password);
|
||||
}
|
||||
|
||||
private static String getPasswordString(Configuration conf, String key)
|
||||
throws IOException {
|
||||
char[] passchars = conf.getPassword(key);
|
||||
if (passchars == null) {
|
||||
throw new IOException("Password " + key + " not found");
|
||||
}
|
||||
return new String(passchars);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
|||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
|
@ -36,11 +37,18 @@ public class ShellDecryptionKeyProvider extends SimpleKeyProvider {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(ShellDecryptionKeyProvider.class);
|
||||
|
||||
@Override
|
||||
public String getStorageAccountKey(String accountName, Configuration conf)
|
||||
public String getStorageAccountKey(String accountName, Configuration rawConfig)
|
||||
throws KeyProviderException {
|
||||
String envelope = super.getStorageAccountKey(accountName, conf);
|
||||
String envelope = super.getStorageAccountKey(accountName, rawConfig);
|
||||
|
||||
final String command = conf.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT);
|
||||
AbfsConfiguration abfsConfig;
|
||||
try {
|
||||
abfsConfig = new AbfsConfiguration(rawConfig, accountName);
|
||||
} catch(IllegalAccessException | IOException e) {
|
||||
throw new KeyProviderException("Unable to get key from credential providers.", e);
|
||||
}
|
||||
|
||||
final String command = abfsConfig.get(ConfigurationKeys.AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT);
|
||||
if (command == null) {
|
||||
throw new KeyProviderException(
|
||||
"Script path is not specified via fs.azure.shellkeyprovider.script");
|
||||
|
|
|
@ -21,10 +21,10 @@ package org.apache.hadoop.fs.azurebfs.services;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.KeyProviderException;
|
||||
import org.apache.hadoop.security.ProviderUtils;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -36,19 +36,19 @@ public class SimpleKeyProvider implements KeyProvider {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(SimpleKeyProvider.class);
|
||||
|
||||
@Override
|
||||
public String getStorageAccountKey(String accountName, Configuration conf)
|
||||
public String getStorageAccountKey(String accountName, Configuration rawConfig)
|
||||
throws KeyProviderException {
|
||||
String key = null;
|
||||
|
||||
try {
|
||||
Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
|
||||
conf, AzureBlobFileSystem.class);
|
||||
char[] keyChars = c.getPassword(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + accountName);
|
||||
if (keyChars != null) {
|
||||
key = new String(keyChars);
|
||||
}
|
||||
AbfsConfiguration abfsConfig = new AbfsConfiguration(rawConfig, accountName);
|
||||
key = abfsConfig.getPasswordString(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME);
|
||||
} catch(IllegalAccessException | InvalidConfigurationValueException e) {
|
||||
throw new KeyProviderException("Failure to initialize configuration", e);
|
||||
} catch(IOException ioe) {
|
||||
LOG.warn("Unable to get key from credential providers. {}", ioe);
|
||||
}
|
||||
|
||||
return key;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,6 +62,16 @@ The abfs client has a fully consistent view of the store, which has complete Cre
|
|||
* Directory Rename: `O(files)`.
|
||||
* Directory Delete: `O(files)`.
|
||||
|
||||
## Configuring ABFS
|
||||
|
||||
Any configuration can be specified generally (or as the default when accessing all accounts) or can be tied to s a specific account.
|
||||
For example, an OAuth identity can be configured for use regardless of which account is accessed with the property
|
||||
"fs.azure.account.oauth2.client.id"
|
||||
or you can configure an identity to be used only for a specific storage account with
|
||||
"fs.azure.account.oauth2.client.id.\<account\_name\>.dfs.core.windows.net".
|
||||
|
||||
Note that it doesn't make sense to do this with some properties, like shared keys that are inherently account-specific.
|
||||
|
||||
## Testing ABFS
|
||||
|
||||
See the relevant section in [Testing Azure](testing_azure.html).
|
||||
|
|
|
@ -67,38 +67,39 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
private AzureBlobFileSystem abfs;
|
||||
private String abfsScheme;
|
||||
|
||||
private Configuration configuration;
|
||||
private Configuration rawConfig;
|
||||
private AbfsConfiguration abfsConfig;
|
||||
private String fileSystemName;
|
||||
private String accountName;
|
||||
private String testUrl;
|
||||
private AuthType authType;
|
||||
|
||||
protected AbstractAbfsIntegrationTest() {
|
||||
protected AbstractAbfsIntegrationTest() throws Exception {
|
||||
fileSystemName = TEST_CONTAINER_PREFIX + UUID.randomUUID().toString();
|
||||
configuration = new Configuration();
|
||||
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||
rawConfig = new Configuration();
|
||||
rawConfig.addResource(TEST_CONFIGURATION_FILE_NAME);
|
||||
|
||||
this.accountName = this.configuration.get(FS_AZURE_ACCOUNT_NAME);
|
||||
this.accountName = rawConfig.get(FS_AZURE_ACCOUNT_NAME);
|
||||
if (accountName == null) {
|
||||
// check if accountName is set using different config key
|
||||
accountName = configuration.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||
accountName = rawConfig.get(FS_AZURE_ABFS_ACCOUNT_NAME);
|
||||
}
|
||||
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT_NAME,
|
||||
accountName != null && !accountName.isEmpty());
|
||||
|
||||
abfsConfig = new AbfsConfiguration(rawConfig, accountName);
|
||||
|
||||
authType = configuration.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME
|
||||
+ accountName, AuthType.SharedKey);
|
||||
authType = abfsConfig.getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
|
||||
abfsScheme = authType == AuthType.SharedKey ? FileSystemUriSchemes.ABFS_SCHEME
|
||||
: FileSystemUriSchemes.ABFS_SECURE_SCHEME;
|
||||
|
||||
if (authType == AuthType.SharedKey) {
|
||||
String keyProperty = FS_AZURE_ACCOUNT_KEY_PREFIX + accountName;
|
||||
assumeTrue("Not set: " + keyProperty, configuration.get(keyProperty) != null);
|
||||
assumeTrue("Not set: " + FS_AZURE_ACCOUNT_KEY,
|
||||
abfsConfig.get(FS_AZURE_ACCOUNT_KEY) != null);
|
||||
// Update credentials
|
||||
} else {
|
||||
String accessTokenProviderKey = FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME + accountName;
|
||||
assumeTrue("Not set: " + accessTokenProviderKey, configuration.get(accessTokenProviderKey) != null);
|
||||
assumeTrue("Not set: " + FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME,
|
||||
abfsConfig.get(FS_AZURE_ACCOUNT_TOKEN_PROVIDER_TYPE_PROPERTY_NAME) != null);
|
||||
}
|
||||
|
||||
final String abfsUrl = this.getFileSystemName() + "@" + this.getAccountName();
|
||||
|
@ -111,14 +112,14 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
}
|
||||
|
||||
this.testUrl = defaultUri.toString();
|
||||
configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
|
||||
configuration.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
|
||||
abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
|
||||
abfsConfig.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, true);
|
||||
// For testing purposes, an IP address and port may be provided to override
|
||||
// the host specified in the FileSystem URI. Also note that the format of
|
||||
// the Azure Storage Service URI changes from
|
||||
// http[s]://[account][domain-suffix]/[filesystem] to
|
||||
// http[s]://[ip]:[port]/[account]/[filesystem].
|
||||
String endPoint = configuration.get(AZURE_ABFS_ENDPOINT);
|
||||
String endPoint = abfsConfig.get(AZURE_ABFS_ENDPOINT);
|
||||
if (endPoint != null && endPoint.contains(":") && endPoint.split(":").length == 2) {
|
||||
this.isIPAddress = true;
|
||||
} else {
|
||||
|
@ -140,18 +141,18 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
// update configuration with wasb credentials
|
||||
String accountNameWithoutDomain = accountName.split("\\.")[0];
|
||||
String wasbAccountName = accountNameWithoutDomain + WASB_ACCOUNT_NAME_DOMAIN_SUFFIX;
|
||||
String keyProperty = FS_AZURE_ACCOUNT_KEY_PREFIX + wasbAccountName;
|
||||
if (configuration.get(keyProperty) == null) {
|
||||
configuration.set(keyProperty, getAccountKey());
|
||||
String keyProperty = FS_AZURE_ACCOUNT_KEY + "." + wasbAccountName;
|
||||
if (rawConfig.get(keyProperty) == null) {
|
||||
rawConfig.set(keyProperty, getAccountKey());
|
||||
}
|
||||
|
||||
azureNativeFileSystemStore.initialize(
|
||||
wasbUri,
|
||||
configuration,
|
||||
new AzureFileSystemInstrumentation(getConfiguration()));
|
||||
rawConfig,
|
||||
new AzureFileSystemInstrumentation(rawConfig));
|
||||
|
||||
wasb = new NativeAzureFileSystem(azureNativeFileSystemStore);
|
||||
wasb.initialize(wasbUri, configuration);
|
||||
wasb.initialize(wasbUri, rawConfig);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -197,8 +198,8 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
}
|
||||
|
||||
public AzureBlobFileSystem getFileSystem(String abfsUri) throws Exception {
|
||||
configuration.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, abfsUri);
|
||||
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
|
||||
abfsConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, abfsUri);
|
||||
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(rawConfig);
|
||||
return fs;
|
||||
}
|
||||
|
||||
|
@ -210,7 +211,7 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
public AzureBlobFileSystem createFileSystem() throws IOException {
|
||||
Preconditions.checkState(abfs == null,
|
||||
"existing ABFS instance exists: %s", abfs);
|
||||
abfs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
|
||||
abfs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig);
|
||||
return abfs;
|
||||
}
|
||||
|
||||
|
@ -221,7 +222,7 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
|
||||
protected String getHostName() {
|
||||
// READ FROM ENDPOINT, THIS IS CALLED ONLY WHEN TESTING AGAINST DEV-FABRIC
|
||||
String endPoint = configuration.get(AZURE_ABFS_ENDPOINT);
|
||||
String endPoint = abfsConfig.get(AZURE_ABFS_ENDPOINT);
|
||||
return endPoint.split(":")[0];
|
||||
}
|
||||
|
||||
|
@ -245,13 +246,15 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
}
|
||||
|
||||
protected String getAccountKey() {
|
||||
return configuration.get(
|
||||
FS_AZURE_ACCOUNT_KEY_PREFIX
|
||||
+ accountName);
|
||||
return abfsConfig.get(FS_AZURE_ACCOUNT_KEY);
|
||||
}
|
||||
|
||||
protected Configuration getConfiguration() {
|
||||
return configuration;
|
||||
public AbfsConfiguration getConfiguration() {
|
||||
return abfsConfig;
|
||||
}
|
||||
|
||||
public Configuration getRawConfiguration() {
|
||||
return abfsConfig.getRawConfiguration();
|
||||
}
|
||||
|
||||
protected boolean isIPAddress() {
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.fs.azurebfs;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azure.integration.AzureTestConstants;
|
||||
|
||||
import static org.apache.hadoop.fs.azure.integration.AzureTestUtils.assumeScaleTestsEnabled;
|
||||
|
@ -34,6 +35,10 @@ public class AbstractAbfsScaleTest extends AbstractAbfsIntegrationTest {
|
|||
protected static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbstractAbfsScaleTest.class);
|
||||
|
||||
public AbstractAbfsScaleTest() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getTestTimeoutMillis() {
|
||||
return AzureTestConstants.SCALE_TEST_TIMEOUT_MILLIS;
|
||||
|
@ -43,7 +48,8 @@ public class AbstractAbfsScaleTest extends AbstractAbfsIntegrationTest {
|
|||
public void setup() throws Exception {
|
||||
super.setup();
|
||||
LOG.debug("Scale test operation count = {}", getOperationCount());
|
||||
assumeScaleTestsEnabled(getConfiguration());
|
||||
Configuration rawConfiguration = getRawConfiguration();
|
||||
assumeScaleTestsEnabled(rawConfiguration);
|
||||
}
|
||||
|
||||
protected long getOperationCount() {
|
||||
|
|
|
@ -30,6 +30,10 @@ import org.junit.Test;
|
|||
public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
||||
private static final int LIST_MAX_RESULTS = 5000;
|
||||
|
||||
public ITestAbfsClient() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContinuationTokenHavingEqualSign() throws Exception {
|
||||
final AzureBlobFileSystem fs = this.getFileSystem();
|
||||
|
@ -42,4 +46,4 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
|
|||
Assert.assertEquals("InvalidQueryParameterValue", ex.getErrorCode().getErrorCode());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
|||
|
||||
private final int size;
|
||||
|
||||
public ITestAbfsReadWriteAndSeek(final int size) {
|
||||
public ITestAbfsReadWriteAndSeek(final int size) throws Exception {
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
|
@ -62,7 +62,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
|
|||
|
||||
private void testReadWriteAndSeek(int bufferSize) throws Exception {
|
||||
final AzureBlobFileSystem fs = getFileSystem();
|
||||
final AbfsConfiguration abfsConfiguration = new AbfsConfiguration(getConfiguration());
|
||||
final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
|
||||
|
||||
abfsConfiguration.setWriteBufferSize(bufferSize);
|
||||
abfsConfiguration.setReadBufferSize(bufferSize);
|
||||
|
|
|
@ -34,7 +34,8 @@ public class ITestAzureBlobFileSystemAppend extends
|
|||
AbstractAbfsIntegrationTest {
|
||||
private static final Path TEST_FILE_PATH = new Path("testfile");
|
||||
private static final Path TEST_FOLDER_PATH = new Path("testFolder");
|
||||
public ITestAzureBlobFileSystemAppend() {
|
||||
|
||||
public ITestAzureBlobFileSystemAppend() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
|
|
|
@ -35,7 +35,8 @@ import org.apache.hadoop.fs.Path;
|
|||
*/
|
||||
public class ITestAzureBlobFileSystemBackCompat extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
public ITestAzureBlobFileSystemBackCompat() {
|
||||
|
||||
public ITestAzureBlobFileSystemBackCompat() throws Exception {
|
||||
super();
|
||||
Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
|
||||
}
|
||||
|
|
|
@ -39,7 +39,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
|||
* Test copy operation.
|
||||
*/
|
||||
public class ITestAzureBlobFileSystemCopy extends AbstractAbfsIntegrationTest {
|
||||
public ITestAzureBlobFileSystemCopy() {
|
||||
|
||||
public ITestAzureBlobFileSystemCopy() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
|
|
|
@ -38,7 +38,8 @@ public class ITestAzureBlobFileSystemCreate extends
|
|||
private static final Path TEST_FILE_PATH = new Path("testfile");
|
||||
private static final Path TEST_FOLDER_PATH = new Path("testFolder");
|
||||
private static final String TEST_CHILD_FILE = "childFile";
|
||||
public ITestAzureBlobFileSystemCreate() {
|
||||
|
||||
public ITestAzureBlobFileSystemCreate() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
|
|
|
@ -41,7 +41,8 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|||
*/
|
||||
public class ITestAzureBlobFileSystemDelete extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
public ITestAzureBlobFileSystemDelete() {
|
||||
|
||||
public ITestAzureBlobFileSystemDelete() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -45,9 +44,9 @@ public class ITestAzureBlobFileSystemE2E extends AbstractAbfsIntegrationTest {
|
|||
private static final int TEST_DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;
|
||||
private static final int TEST_DEFAULT_READ_BUFFER_SIZE = 1023900;
|
||||
|
||||
public ITestAzureBlobFileSystemE2E() {
|
||||
public ITestAzureBlobFileSystemE2E() throws Exception {
|
||||
super();
|
||||
Configuration configuration = this.getConfiguration();
|
||||
AbfsConfiguration configuration = this.getConfiguration();
|
||||
configuration.set(ConfigurationKeys.FS_AZURE_READ_AHEAD_QUEUE_DEPTH, "0");
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ public class ITestAzureBlobFileSystemE2EScale extends
|
|||
private static final int ONE_MB = 1024 * 1024;
|
||||
private static final int DEFAULT_WRITE_TIMES = 100;
|
||||
|
||||
public ITestAzureBlobFileSystemE2EScale() {
|
||||
public ITestAzureBlobFileSystemE2EScale() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -41,7 +41,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
|
|||
private static final Path TEST_FILE = new Path("testFile");
|
||||
private static final Path TEST_FOLDER = new Path("testDir");
|
||||
|
||||
public ITestAzureBlobFileSystemFileStatus() {
|
||||
public ITestAzureBlobFileSystemFileStatus() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
|
|
|
@ -41,12 +41,12 @@ public class ITestAzureBlobFileSystemFinalize extends AbstractAbfsScaleTest{
|
|||
@Test
|
||||
public void testFinalize() throws Exception {
|
||||
// Disable the cache for filesystem to make sure there is no reference.
|
||||
Configuration configuration = this.getConfiguration();
|
||||
configuration.setBoolean(
|
||||
Configuration rawConfig = this.getRawConfiguration();
|
||||
rawConfig.setBoolean(
|
||||
this.getAuthType() == AuthType.SharedKey ? DISABLE_ABFS_CACHE_KEY : DISABLE_ABFSSS_CACHE_KEY,
|
||||
true);
|
||||
|
||||
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(configuration);
|
||||
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(rawConfig);
|
||||
|
||||
WeakReference<Object> ref = new WeakReference<Object>(fs);
|
||||
fs = null;
|
||||
|
@ -61,4 +61,4 @@ public class ITestAzureBlobFileSystemFinalize extends AbstractAbfsScaleTest{
|
|||
|
||||
Assert.assertTrue("testFinalizer didn't get cleaned up within maxTries", ref.get() == null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,7 +60,7 @@ public class ITestAzureBlobFileSystemFlush extends AbstractAbfsScaleTest {
|
|||
private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
|
||||
private static final int WAITING_TIME = 1000;
|
||||
|
||||
public ITestAzureBlobFileSystemFlush() {
|
||||
public ITestAzureBlobFileSystemFlush() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
|
|
|
@ -31,8 +31,8 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
|||
*/
|
||||
public class ITestAzureBlobFileSystemInitAndCreate extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
public ITestAzureBlobFileSystemInitAndCreate() {
|
||||
|
||||
public ITestAzureBlobFileSystemInitAndCreate() throws Exception {
|
||||
this.getConfiguration().unset(ConfigurationKeys.AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION);
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,8 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
|
|||
public class ITestAzureBlobFileSystemListStatus extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
private static final int TEST_FILES_NUMBER = 6000;
|
||||
public ITestAzureBlobFileSystemListStatus() {
|
||||
|
||||
public ITestAzureBlobFileSystemListStatus() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,8 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
|
|||
* Test mkdir operation.
|
||||
*/
|
||||
public class ITestAzureBlobFileSystemMkDir extends AbstractAbfsIntegrationTest {
|
||||
public ITestAzureBlobFileSystemMkDir() {
|
||||
|
||||
public ITestAzureBlobFileSystemMkDir() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
|||
private static final Path EXISTED_FILE_PATH = new Path("/existedFile");
|
||||
private static final Path EXISTED_FOLDER_PATH = new Path("/existedFolder");
|
||||
|
||||
public ITestAzureBlobFileSystemOauth() {
|
||||
public ITestAzureBlobFileSystemOauth() throws Exception {
|
||||
Assume.assumeTrue(this.getAuthType() == AuthType.OAuth);
|
||||
}
|
||||
/*
|
||||
|
@ -161,16 +161,18 @@ public class ITestAzureBlobFileSystemOauth extends AbstractAbfsIntegrationTest{
|
|||
}
|
||||
|
||||
private AzureBlobFileSystem getBlobConributor() throws Exception {
|
||||
Configuration configuration = this.getConfiguration();
|
||||
configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID));
|
||||
configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET));
|
||||
return getFileSystem(configuration);
|
||||
AbfsConfiguration abfsConfig = this.getConfiguration();
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + "." + this.getAccountName(), abfsConfig.get(FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID));
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + "." + this.getAccountName(), abfsConfig.get(FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_SECRET));
|
||||
Configuration rawConfig = abfsConfig.getRawConfiguration();
|
||||
return getFileSystem(rawConfig);
|
||||
}
|
||||
|
||||
private AzureBlobFileSystem getBlobReader() throws Exception {
|
||||
Configuration configuration = this.getConfiguration();
|
||||
configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_READER_CLIENT_ID));
|
||||
configuration.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + this.getAccountName(), configuration.get(FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET));
|
||||
return getFileSystem(configuration);
|
||||
AbfsConfiguration abfsConfig = this.getConfiguration();
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID + "." + this.getAccountName(), abfsConfig.get(FS_AZURE_BLOB_DATA_READER_CLIENT_ID));
|
||||
abfsConfig.set(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET + "." + this.getAccountName(), abfsConfig.get(FS_AZURE_BLOB_DATA_READER_CLIENT_SECRET));
|
||||
Configuration rawConfig = abfsConfig.getRawConfiguration();
|
||||
return getFileSystem(rawConfig);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,7 +41,9 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.assertIsFile;
|
|||
*/
|
||||
public class ITestAzureBlobFileSystemRename extends
|
||||
AbstractAbfsIntegrationTest {
|
||||
public ITestAzureBlobFileSystemRename() {
|
||||
|
||||
public ITestAzureBlobFileSystemRename() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -66,7 +66,7 @@ public class ITestAzureBlobFileSystemRenameUnicode extends
|
|||
});
|
||||
}
|
||||
|
||||
public ITestAzureBlobFileSystemRenameUnicode() {
|
||||
public ITestAzureBlobFileSystemRenameUnicode() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.fs.azurebfs.services.AuthType;
|
|||
* Test AzureBlobFileSystem initialization.
|
||||
*/
|
||||
public class ITestFileSystemInitialization extends AbstractAbfsIntegrationTest {
|
||||
public ITestFileSystemInitialization() {
|
||||
public ITestFileSystemInitialization() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
|
@ -62,10 +62,10 @@ public class ITestFileSystemInitialization extends AbstractAbfsIntegrationTest {
|
|||
null,
|
||||
null,
|
||||
null);
|
||||
Configuration conf = getConfiguration();
|
||||
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
|
||||
Configuration rawConfig = getRawConfiguration();
|
||||
rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, defaultUri.toString());
|
||||
|
||||
try(SecureAzureBlobFileSystem fs = (SecureAzureBlobFileSystem) FileSystem.newInstance(conf)) {
|
||||
try(SecureAzureBlobFileSystem fs = (SecureAzureBlobFileSystem) FileSystem.newInstance(rawConfig)) {
|
||||
assertEquals(fs.getUri(), new URI(FileSystemUriSchemes.ABFS_SECURE_SCHEME,
|
||||
filesystem + "@" + accountName,
|
||||
null,
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.fs.Path;
|
|||
public class ITestFileSystemProperties extends AbstractAbfsIntegrationTest {
|
||||
private static final int TEST_DATA = 100;
|
||||
private static final Path TEST_PATH = new Path("/testfile");
|
||||
public ITestFileSystemProperties() {
|
||||
public ITestFileSystemProperties() throws Exception {
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -77,14 +77,15 @@ public class ITestFileSystemRegistration extends AbstractAbfsIntegrationTest {
|
|||
|
||||
@Test
|
||||
public void ensureAzureBlobFileSystemIsDefaultFileSystem() throws Exception {
|
||||
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(getConfiguration());
|
||||
Configuration rawConfig = getRawConfiguration();
|
||||
AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.get(rawConfig);
|
||||
assertNotNull("filesystem", fs);
|
||||
|
||||
if (this.getAuthType() == AuthType.OAuth) {
|
||||
Abfss afs = (Abfss) FileContext.getFileContext(getConfiguration()).getDefaultFileSystem();
|
||||
Abfss afs = (Abfss) FileContext.getFileContext(rawConfig).getDefaultFileSystem();
|
||||
assertNotNull("filecontext", afs);
|
||||
} else {
|
||||
Abfs afs = (Abfs) FileContext.getFileContext(getConfiguration()).getDefaultFileSystem();
|
||||
Abfs afs = (Abfs) FileContext.getFileContext(rawConfig).getDefaultFileSystem();
|
||||
assertNotNull("filecontext", afs);
|
||||
}
|
||||
|
||||
|
@ -100,13 +101,13 @@ public class ITestFileSystemRegistration extends AbstractAbfsIntegrationTest {
|
|||
null,
|
||||
null,
|
||||
null);
|
||||
getConfiguration().set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
||||
Configuration rawConfig = getRawConfiguration();
|
||||
rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
|
||||
defaultUri.toString());
|
||||
|
||||
SecureAzureBlobFileSystem fs = (SecureAzureBlobFileSystem) FileSystem.get(
|
||||
getConfiguration());
|
||||
SecureAzureBlobFileSystem fs = (SecureAzureBlobFileSystem) FileSystem.get(rawConfig);
|
||||
assertNotNull("filesystem", fs);
|
||||
Abfss afs = (Abfss) FileContext.getFileContext(getConfiguration()).getDefaultFileSystem();
|
||||
Abfss afs = (Abfss) FileContext.getFileContext(rawConfig).getDefaultFileSystem();
|
||||
assertNotNull("filecontext", afs);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Field;
|
||||
|
||||
import org.apache.commons.codec.Charsets;
|
||||
|
@ -66,6 +67,7 @@ public class TestAbfsConfigurationFieldsValidation {
|
|||
private static final int TEST_INT = 1234565;
|
||||
private static final int TEST_LONG = 4194304;
|
||||
|
||||
private final String accountName;
|
||||
private final String encodedString;
|
||||
private final String encodedAccountKey;
|
||||
|
||||
|
@ -96,6 +98,7 @@ public class TestAbfsConfigurationFieldsValidation {
|
|||
public TestAbfsConfigurationFieldsValidation() throws Exception {
|
||||
super();
|
||||
Base64 base64 = new Base64();
|
||||
this.accountName = "testaccount1.blob.core.windows.net";
|
||||
this.encodedString = new String(base64.encode("base64Value".getBytes(Charsets.UTF_8)), Charsets.UTF_8);
|
||||
this.encodedAccountKey = new String(base64.encode("someAccountKey".getBytes(Charsets.UTF_8)), Charsets.UTF_8);
|
||||
Configuration configuration = new Configuration();
|
||||
|
@ -105,8 +108,8 @@ public class TestAbfsConfigurationFieldsValidation {
|
|||
configuration.set(STRING_KEY, "stringValue");
|
||||
configuration.set(BASE64_KEY, encodedString);
|
||||
configuration.set(BOOLEAN_KEY, "true");
|
||||
configuration.set(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + "testaccount1.blob.core.windows.net", this.encodedAccountKey);
|
||||
abfsConfiguration = new AbfsConfiguration(configuration);
|
||||
configuration.set(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + "." + accountName, this.encodedAccountKey);
|
||||
abfsConfiguration = new AbfsConfiguration(configuration, accountName);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -143,30 +146,35 @@ public class TestAbfsConfigurationFieldsValidation {
|
|||
|
||||
@Test
|
||||
public void testGetAccountKey() throws Exception {
|
||||
String accountKey = abfsConfiguration.getStorageAccountKey("testaccount1.blob.core.windows.net");
|
||||
String accountKey = abfsConfiguration.getStorageAccountKey();
|
||||
assertEquals(this.encodedAccountKey, accountKey);
|
||||
}
|
||||
|
||||
@Test(expected = ConfigurationPropertyNotFoundException.class)
|
||||
public void testGetAccountKeyWithNonExistingAccountName() throws Exception {
|
||||
abfsConfiguration.getStorageAccountKey("bogusAccountName");
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.addResource(TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME);
|
||||
configuration.unset(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME);
|
||||
AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration, "bogusAccountName");
|
||||
abfsConfig.getStorageAccountKey();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSSLSocketFactoryConfiguration() throws InvalidConfigurationValueException, IllegalAccessException {
|
||||
public void testSSLSocketFactoryConfiguration()
|
||||
throws InvalidConfigurationValueException, IllegalAccessException, IOException {
|
||||
assertEquals(SSLSocketFactoryEx.SSLChannelMode.Default, abfsConfiguration.getPreferredSSLFactoryOption());
|
||||
assertNotEquals(SSLSocketFactoryEx.SSLChannelMode.Default_JSSE, abfsConfiguration.getPreferredSSLFactoryOption());
|
||||
assertNotEquals(SSLSocketFactoryEx.SSLChannelMode.OpenSSL, abfsConfiguration.getPreferredSSLFactoryOption());
|
||||
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, SSLSocketFactoryEx.SSLChannelMode.Default_JSSE);
|
||||
AbfsConfiguration localAbfsConfiguration = new AbfsConfiguration(configuration);
|
||||
AbfsConfiguration localAbfsConfiguration = new AbfsConfiguration(configuration, accountName);
|
||||
assertEquals(SSLSocketFactoryEx.SSLChannelMode.Default_JSSE, localAbfsConfiguration.getPreferredSSLFactoryOption());
|
||||
|
||||
configuration = new Configuration();
|
||||
configuration.setEnum(FS_AZURE_SSL_CHANNEL_MODE_KEY, SSLSocketFactoryEx.SSLChannelMode.OpenSSL);
|
||||
localAbfsConfiguration = new AbfsConfiguration(configuration);
|
||||
localAbfsConfiguration = new AbfsConfiguration(configuration, accountName);
|
||||
assertEquals(SSLSocketFactoryEx.SSLChannelMode.OpenSSL, localAbfsConfiguration.getPreferredSSLFactoryOption());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,273 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Tests correct precedence of various configurations that might be returned.
|
||||
* Configuration can be specified with the account name as a suffix to the
|
||||
* config key, or without one. Account-specific values should be returned
|
||||
* whenever they exist. Account-agnostic values are returned if they do not.
|
||||
* Default values are returned if neither exists.
|
||||
*
|
||||
* These tests are in 2 main groups: tests of methods that allow default values
|
||||
* (such as get and getPasswordString) are of one form, while tests of methods
|
||||
* that do allow default values (all others) follow another form.
|
||||
*/
|
||||
public class TestAccountConfiguration {
|
||||
|
||||
@Test
|
||||
public void testStringPrecedence()
|
||||
throws IllegalAccessException, IOException, InvalidConfigurationValueException {
|
||||
AbfsConfiguration abfsConf;
|
||||
final Configuration conf = new Configuration();
|
||||
|
||||
final String accountName1 = "account1";
|
||||
final String accountName2 = "account2";
|
||||
final String accountName3 = "account3";
|
||||
|
||||
final String globalKey = "fs.azure.configuration";
|
||||
final String accountKey1 = globalKey + "." + accountName1;
|
||||
final String accountKey2 = globalKey + "." + accountName2;
|
||||
final String accountKey3 = globalKey + "." + accountName3;
|
||||
|
||||
final String globalValue = "global";
|
||||
final String accountValue1 = "one";
|
||||
final String accountValue2 = "two";
|
||||
|
||||
conf.set(accountKey1, accountValue1);
|
||||
conf.set(accountKey2, accountValue2);
|
||||
conf.set(globalKey, globalValue);
|
||||
|
||||
abfsConf = new AbfsConfiguration(conf, accountName1);
|
||||
assertEquals("Wrong value returned when account-specific value was requested",
|
||||
abfsConf.get(accountKey1), accountValue1);
|
||||
assertEquals("Account-specific value was not returned when one existed",
|
||||
abfsConf.get(globalKey), accountValue1);
|
||||
|
||||
abfsConf = new AbfsConfiguration(conf, accountName2);
|
||||
assertEquals("Wrong value returned when a different account-specific value was requested",
|
||||
abfsConf.get(accountKey1), accountValue1);
|
||||
assertEquals("Wrong value returned when account-specific value was requested",
|
||||
abfsConf.get(accountKey2), accountValue2);
|
||||
assertEquals("Account-agnostic value return even though account-specific value was set",
|
||||
abfsConf.get(globalKey), accountValue2);
|
||||
|
||||
abfsConf = new AbfsConfiguration(conf, accountName3);
|
||||
assertNull("Account-specific value returned when none was set",
|
||||
abfsConf.get(accountKey3));
|
||||
assertEquals("Account-agnostic value not returned when no account-specific value was set",
|
||||
abfsConf.get(globalKey), globalValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPasswordPrecedence()
|
||||
throws IllegalAccessException, IOException, InvalidConfigurationValueException {
|
||||
AbfsConfiguration abfsConf;
|
||||
final Configuration conf = new Configuration();
|
||||
|
||||
final String accountName1 = "account1";
|
||||
final String accountName2 = "account2";
|
||||
final String accountName3 = "account3";
|
||||
|
||||
final String globalKey = "fs.azure.password";
|
||||
final String accountKey1 = globalKey + "." + accountName1;
|
||||
final String accountKey2 = globalKey + "." + accountName2;
|
||||
final String accountKey3 = globalKey + "." + accountName3;
|
||||
|
||||
final String globalValue = "global";
|
||||
final String accountValue1 = "one";
|
||||
final String accountValue2 = "two";
|
||||
|
||||
conf.set(accountKey1, accountValue1);
|
||||
conf.set(accountKey2, accountValue2);
|
||||
conf.set(globalKey, globalValue);
|
||||
|
||||
abfsConf = new AbfsConfiguration(conf, accountName1);
|
||||
assertEquals("Wrong value returned when account-specific value was requested",
|
||||
abfsConf.getPasswordString(accountKey1), accountValue1);
|
||||
assertEquals("Account-specific value was not returned when one existed",
|
||||
abfsConf.getPasswordString(globalKey), accountValue1);
|
||||
|
||||
abfsConf = new AbfsConfiguration(conf, accountName2);
|
||||
assertEquals("Wrong value returned when a different account-specific value was requested",
|
||||
abfsConf.getPasswordString(accountKey1), accountValue1);
|
||||
assertEquals("Wrong value returned when account-specific value was requested",
|
||||
abfsConf.getPasswordString(accountKey2), accountValue2);
|
||||
assertEquals("Account-agnostic value return even though account-specific value was set",
|
||||
abfsConf.getPasswordString(globalKey), accountValue2);
|
||||
|
||||
abfsConf = new AbfsConfiguration(conf, accountName3);
|
||||
assertNull("Account-specific value returned when none was set",
|
||||
abfsConf.getPasswordString(accountKey3));
|
||||
assertEquals("Account-agnostic value not returned when no account-specific value was set",
|
||||
abfsConf.getPasswordString(globalKey), globalValue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBooleanPrecedence()
|
||||
throws IllegalAccessException, IOException, InvalidConfigurationValueException {
|
||||
|
||||
final String accountName = "account";
|
||||
final String globalKey = "fs.azure.bool";
|
||||
final String accountKey = globalKey + "." + accountName;
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
final AbfsConfiguration abfsConf = new AbfsConfiguration(conf, accountName);
|
||||
|
||||
conf.setBoolean(globalKey, false);
|
||||
assertEquals("Default value returned even though account-agnostic config was set",
|
||||
abfsConf.getBoolean(globalKey, true), false);
|
||||
conf.unset(globalKey);
|
||||
assertEquals("Default value not returned even though config was unset",
|
||||
abfsConf.getBoolean(globalKey, true), true);
|
||||
|
||||
conf.setBoolean(accountKey, false);
|
||||
assertEquals("Default value returned even though account-specific config was set",
|
||||
abfsConf.getBoolean(globalKey, true), false);
|
||||
conf.unset(accountKey);
|
||||
assertEquals("Default value not returned even though config was unset",
|
||||
abfsConf.getBoolean(globalKey, true), true);
|
||||
|
||||
conf.setBoolean(accountKey, true);
|
||||
conf.setBoolean(globalKey, false);
|
||||
assertEquals("Account-agnostic or default value returned even though account-specific config was set",
|
||||
abfsConf.getBoolean(globalKey, false), true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLongPrecedence()
|
||||
throws IllegalAccessException, IOException, InvalidConfigurationValueException {
|
||||
|
||||
final String accountName = "account";
|
||||
final String globalKey = "fs.azure.long";
|
||||
final String accountKey = globalKey + "." + accountName;
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
final AbfsConfiguration abfsConf = new AbfsConfiguration(conf, accountName);
|
||||
|
||||
conf.setLong(globalKey, 0);
|
||||
assertEquals("Default value returned even though account-agnostic config was set",
|
||||
abfsConf.getLong(globalKey, 1), 0);
|
||||
conf.unset(globalKey);
|
||||
assertEquals("Default value not returned even though config was unset",
|
||||
abfsConf.getLong(globalKey, 1), 1);
|
||||
|
||||
conf.setLong(accountKey, 0);
|
||||
assertEquals("Default value returned even though account-specific config was set",
|
||||
abfsConf.getLong(globalKey, 1), 0);
|
||||
conf.unset(accountKey);
|
||||
assertEquals("Default value not returned even though config was unset",
|
||||
abfsConf.getLong(globalKey, 1), 1);
|
||||
|
||||
conf.setLong(accountKey, 1);
|
||||
conf.setLong(globalKey, 0);
|
||||
assertEquals("Account-agnostic or default value returned even though account-specific config was set",
|
||||
abfsConf.getLong(globalKey, 0), 1);
|
||||
}
|
||||
|
||||
public enum GetEnumType {
|
||||
TRUE, FALSE
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEnumPrecedence()
|
||||
throws IllegalAccessException, IOException, InvalidConfigurationValueException {
|
||||
|
||||
final String accountName = "account";
|
||||
final String globalKey = "fs.azure.enum";
|
||||
final String accountKey = globalKey + "." + accountName;
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
final AbfsConfiguration abfsConf = new AbfsConfiguration(conf, accountName);
|
||||
|
||||
conf.setEnum(globalKey, GetEnumType.FALSE);
|
||||
assertEquals("Default value returned even though account-agnostic config was set",
|
||||
abfsConf.getEnum(globalKey, GetEnumType.TRUE), GetEnumType.FALSE);
|
||||
conf.unset(globalKey);
|
||||
assertEquals("Default value not returned even though config was unset",
|
||||
abfsConf.getEnum(globalKey, GetEnumType.TRUE), GetEnumType.TRUE);
|
||||
|
||||
conf.setEnum(accountKey, GetEnumType.FALSE);
|
||||
assertEquals("Default value returned even though account-specific config was set",
|
||||
abfsConf.getEnum(globalKey, GetEnumType.TRUE), GetEnumType.FALSE);
|
||||
conf.unset(accountKey);
|
||||
assertEquals("Default value not returned even though config was unset",
|
||||
abfsConf.getEnum(globalKey, GetEnumType.TRUE), GetEnumType.TRUE);
|
||||
|
||||
conf.setEnum(accountKey, GetEnumType.TRUE);
|
||||
conf.setEnum(globalKey, GetEnumType.FALSE);
|
||||
assertEquals("Account-agnostic or default value returned even though account-specific config was set",
|
||||
abfsConf.getEnum(globalKey, GetEnumType.FALSE), GetEnumType.TRUE);
|
||||
}
|
||||
|
||||
interface GetClassInterface {
|
||||
}
|
||||
|
||||
private class GetClassImpl0 implements GetClassInterface {
|
||||
}
|
||||
|
||||
private class GetClassImpl1 implements GetClassInterface {
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClassPrecedence()
|
||||
throws IllegalAccessException, IOException, InvalidConfigurationValueException {
|
||||
|
||||
final String accountName = "account";
|
||||
final String globalKey = "fs.azure.class";
|
||||
final String accountKey = globalKey + "." + accountName;
|
||||
|
||||
final Configuration conf = new Configuration();
|
||||
final AbfsConfiguration abfsConf = new AbfsConfiguration(conf, accountName);
|
||||
|
||||
final Class class0 = GetClassImpl0.class;
|
||||
final Class class1 = GetClassImpl1.class;
|
||||
final Class xface = GetClassInterface.class;
|
||||
|
||||
conf.setClass(globalKey, class0, xface);
|
||||
assertEquals("Default value returned even though account-agnostic config was set",
|
||||
abfsConf.getClass(globalKey, class1, xface), class0);
|
||||
conf.unset(globalKey);
|
||||
assertEquals("Default value not returned even though config was unset",
|
||||
abfsConf.getClass(globalKey, class1, xface), class1);
|
||||
|
||||
conf.setClass(accountKey, class0, xface);
|
||||
assertEquals("Default value returned even though account-specific config was set",
|
||||
abfsConf.getClass(globalKey, class1, xface), class0);
|
||||
conf.unset(accountKey);
|
||||
assertEquals("Default value not returned even though config was unset",
|
||||
abfsConf.getClass(globalKey, class1, xface), class1);
|
||||
|
||||
conf.setClass(accountKey, class1, xface);
|
||||
conf.setClass(globalKey, class0, xface);
|
||||
assertEquals("Account-agnostic or default value returned even though account-specific config was set",
|
||||
abfsConf.getClass(globalKey, class0, xface), class1);
|
||||
}
|
||||
|
||||
}
|
|
@ -24,7 +24,7 @@ package org.apache.hadoop.fs.azurebfs.constants;
|
|||
public final class TestConfigurationKeys {
|
||||
public static final String FS_AZURE_ACCOUNT_NAME = "fs.azure.account.name";
|
||||
public static final String FS_AZURE_ABFS_ACCOUNT_NAME = "fs.azure.abfs.account.name";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY_PREFIX = "fs.azure.account.key.";
|
||||
public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key";
|
||||
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
|
||||
|
||||
public static final String FS_AZURE_BLOB_DATA_CONTRIBUTOR_CLIENT_ID = "fs.azure.account.oauth2.contributor.client.id";
|
||||
|
|
|
@ -20,8 +20,8 @@ package org.apache.hadoop.fs.azurebfs.contract;
|
|||
|
||||
import java.net.URI;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
|
||||
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
|
||||
|
@ -41,7 +41,7 @@ public class ABFSContractTestBinding extends AbstractAbfsIntegrationTest {
|
|||
public ABFSContractTestBinding(
|
||||
final boolean useExistingFileSystem) throws Exception{
|
||||
if (useExistingFileSystem) {
|
||||
Configuration configuration = getConfiguration();
|
||||
AbfsConfiguration configuration = getConfiguration();
|
||||
String testUrl = configuration.get(TestConfigurationKeys.FS_AZURE_CONTRACT_TEST_URI);
|
||||
Assume.assumeTrue("Contract tests are skipped because of missing config property :"
|
||||
+ TestConfigurationKeys.FS_AZURE_CONTRACT_TEST_URI, testUrl != null);
|
||||
|
@ -61,10 +61,6 @@ public class ABFSContractTestBinding extends AbstractAbfsIntegrationTest {
|
|||
}
|
||||
}
|
||||
|
||||
public Configuration getConfiguration() {
|
||||
return super.getConfiguration();
|
||||
}
|
||||
|
||||
public boolean isSecureMode() {
|
||||
return this.getAuthType() == AuthType.SharedKey ? false : true;
|
||||
}
|
||||
|
|
|
@ -45,7 +45,7 @@ public class ITestAbfsFileSystemContractAppend extends AbstractContractAppendTes
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -41,7 +41,7 @@ public class ITestAbfsFileSystemContractConcat extends AbstractContractConcatTes
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ public class ITestAbfsFileSystemContractCreate extends AbstractContractCreateTes
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ public class ITestAbfsFileSystemContractDelete extends AbstractContractDeleteTes
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -41,7 +41,7 @@ public class ITestAbfsFileSystemContractGetFileStatus extends AbstractContractGe
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return this.binding.getConfiguration();
|
||||
return this.binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ public class ITestAbfsFileSystemContractMkdir extends AbstractContractMkdirTest
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ public class ITestAbfsFileSystemContractOpen extends AbstractContractOpenTest {
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ public class ITestAbfsFileSystemContractRename extends AbstractContractRenameTes
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ public class ITestAbfsFileSystemContractRootDirectory extends AbstractContractRo
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -42,7 +42,7 @@ public class ITestAbfsFileSystemContractSeek extends AbstractContractSeekTest{
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -41,7 +41,7 @@ public class ITestAbfsFileSystemContractSetTimes extends AbstractContractSetTime
|
|||
|
||||
@Override
|
||||
protected Configuration createConfiguration() {
|
||||
return binding.getConfiguration();
|
||||
return binding.getRawConfiguration();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
|
|||
*/
|
||||
public final class TestAbfsClient {
|
||||
|
||||
private final String accountName = "bogusAccountName";
|
||||
|
||||
private void validateUserAgent(String expectedPattern,
|
||||
URL baseUrl,
|
||||
AbfsConfiguration config,
|
||||
|
@ -55,7 +57,7 @@ public final class TestAbfsClient {
|
|||
String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+)\\)";
|
||||
final Configuration configuration = new Configuration();
|
||||
configuration.unset(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY);
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, accountName);
|
||||
validateUserAgent(expectedUserAgentPattern, new URL("http://azure.com"),
|
||||
abfsConfiguration, false);
|
||||
}
|
||||
|
@ -65,7 +67,7 @@ public final class TestAbfsClient {
|
|||
String expectedUserAgentPattern = "Azure Blob FS\\/1.0 \\(JavaJRE ([^\\)]+)\\) Partner Service";
|
||||
final Configuration configuration = new Configuration();
|
||||
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, "Partner Service");
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, accountName);
|
||||
validateUserAgent(expectedUserAgentPattern, new URL("http://azure.com"),
|
||||
abfsConfiguration, false);
|
||||
}
|
||||
|
@ -77,8 +79,8 @@ public final class TestAbfsClient {
|
|||
configuration.set(ConfigurationKeys.FS_AZURE_USER_AGENT_PREFIX_KEY, "Partner Service");
|
||||
configuration.set(ConfigurationKeys.FS_AZURE_SSL_CHANNEL_MODE_KEY,
|
||||
SSLSocketFactoryEx.SSLChannelMode.Default_JSSE.name());
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration);
|
||||
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, accountName);
|
||||
validateUserAgent(expectedUserAgentPattern, new URL("https://azure.com"),
|
||||
abfsConfiguration, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,6 +54,7 @@ public class TestShellDecryptionKeyProvider {
|
|||
String key = "key";
|
||||
|
||||
conf.set(ConfigurationKeys.FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME + account, key);
|
||||
|
||||
try {
|
||||
provider.getStorageAccountKey(account, conf);
|
||||
Assert
|
||||
|
|
|
@ -37,6 +37,11 @@ import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST
|
|||
public final class AbfsTestUtils extends AbstractAbfsIntegrationTest{
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(AbfsTestUtils.class);
|
||||
|
||||
public AbfsTestUtils() throws Exception {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* If unit tests were interrupted and crushed accidentally, the test containers won't be deleted.
|
||||
* In that case, dev can use this tool to list and delete all test containers.
|
||||
|
|
Loading…
Reference in New Issue