HADOOP-16699. Add verbose TRACE logging to ABFS.
Contributed by Sneha Vijayarajan, Change-Id: Ic616a10406e6e9f11616c9cc05d8630ebbedaf65
This commit is contained in:
parent
bc366d4ea7
commit
d1f5976c00
@ -63,6 +63,9 @@
|
|||||||
import org.apache.hadoop.security.ProviderUtils;
|
import org.apache.hadoop.security.ProviderUtils;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.*;
|
||||||
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
|
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.*;
|
||||||
|
|
||||||
@ -76,6 +79,7 @@ public class AbfsConfiguration{
|
|||||||
private final Configuration rawConfig;
|
private final Configuration rawConfig;
|
||||||
private final String accountName;
|
private final String accountName;
|
||||||
private final boolean isSecure;
|
private final boolean isSecure;
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(AbfsConfiguration.class);
|
||||||
|
|
||||||
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
|
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_WRITE_BUFFER_SIZE,
|
||||||
MinValue = MIN_BUFFER_SIZE,
|
MinValue = MIN_BUFFER_SIZE,
|
||||||
@ -505,11 +509,13 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
|
|||||||
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
||||||
String clientSecret = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET);
|
String clientSecret = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_SECRET);
|
||||||
tokenProvider = new ClientCredsTokenProvider(authEndpoint, clientId, clientSecret);
|
tokenProvider = new ClientCredsTokenProvider(authEndpoint, clientId, clientSecret);
|
||||||
|
LOG.trace("ClientCredsTokenProvider initialized");
|
||||||
} else if (tokenProviderClass == UserPasswordTokenProvider.class) {
|
} else if (tokenProviderClass == UserPasswordTokenProvider.class) {
|
||||||
String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT);
|
String authEndpoint = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ENDPOINT);
|
||||||
String username = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_NAME);
|
String username = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_NAME);
|
||||||
String password = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD);
|
String password = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_USER_PASSWORD);
|
||||||
tokenProvider = new UserPasswordTokenProvider(authEndpoint, username, password);
|
tokenProvider = new UserPasswordTokenProvider(authEndpoint, username, password);
|
||||||
|
LOG.trace("UserPasswordTokenProvider initialized");
|
||||||
} else if (tokenProviderClass == MsiTokenProvider.class) {
|
} else if (tokenProviderClass == MsiTokenProvider.class) {
|
||||||
String authEndpoint = getTrimmedPasswordString(
|
String authEndpoint = getTrimmedPasswordString(
|
||||||
FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT,
|
FS_AZURE_ACCOUNT_OAUTH_MSI_ENDPOINT,
|
||||||
@ -522,6 +528,7 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
|
|||||||
authority = appendSlashIfNeeded(authority);
|
authority = appendSlashIfNeeded(authority);
|
||||||
tokenProvider = new MsiTokenProvider(authEndpoint, tenantGuid,
|
tokenProvider = new MsiTokenProvider(authEndpoint, tenantGuid,
|
||||||
clientId, authority);
|
clientId, authority);
|
||||||
|
LOG.trace("MsiTokenProvider initialized");
|
||||||
} else if (tokenProviderClass == RefreshTokenBasedTokenProvider.class) {
|
} else if (tokenProviderClass == RefreshTokenBasedTokenProvider.class) {
|
||||||
String authEndpoint = getTrimmedPasswordString(
|
String authEndpoint = getTrimmedPasswordString(
|
||||||
FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT,
|
FS_AZURE_ACCOUNT_OAUTH_REFRESH_TOKEN_ENDPOINT,
|
||||||
@ -530,6 +537,7 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
|
|||||||
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
String clientId = getPasswordString(FS_AZURE_ACCOUNT_OAUTH_CLIENT_ID);
|
||||||
tokenProvider = new RefreshTokenBasedTokenProvider(authEndpoint,
|
tokenProvider = new RefreshTokenBasedTokenProvider(authEndpoint,
|
||||||
clientId, refreshToken);
|
clientId, refreshToken);
|
||||||
|
LOG.trace("RefreshTokenBasedTokenProvider initialized");
|
||||||
} else {
|
} else {
|
||||||
throw new IllegalArgumentException("Failed to initialize " + tokenProviderClass);
|
throw new IllegalArgumentException("Failed to initialize " + tokenProviderClass);
|
||||||
}
|
}
|
||||||
@ -554,7 +562,9 @@ public AccessTokenProvider getTokenProvider() throws TokenAccessProviderExceptio
|
|||||||
if (azureTokenProvider == null) {
|
if (azureTokenProvider == null) {
|
||||||
throw new IllegalArgumentException("Failed to initialize " + customTokenProviderClass);
|
throw new IllegalArgumentException("Failed to initialize " + customTokenProviderClass);
|
||||||
}
|
}
|
||||||
|
LOG.trace("Initializing {}", customTokenProviderClass.getName());
|
||||||
azureTokenProvider.initialize(rawConfig, accountName);
|
azureTokenProvider.initialize(rawConfig, accountName);
|
||||||
|
LOG.trace("{} init complete", customTokenProviderClass.getName());
|
||||||
return new CustomTokenProviderAdapter(azureTokenProvider);
|
return new CustomTokenProviderAdapter(azureTokenProvider);
|
||||||
} catch(IllegalArgumentException e) {
|
} catch(IllegalArgumentException e) {
|
||||||
throw e;
|
throw e;
|
||||||
@ -581,7 +591,9 @@ public AbfsAuthorizer getAbfsAuthorizer() throws IOException {
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Class<AbfsAuthorizer> authClass = (Class<AbfsAuthorizer>) rawConfig.getClassByName(authClassName);
|
Class<AbfsAuthorizer> authClass = (Class<AbfsAuthorizer>) rawConfig.getClassByName(authClassName);
|
||||||
authorizer = authClass.getConstructor(new Class[] {Configuration.class}).newInstance(rawConfig);
|
authorizer = authClass.getConstructor(new Class[] {Configuration.class}).newInstance(rawConfig);
|
||||||
|
LOG.trace("Initializing {}", authClassName);
|
||||||
authorizer.init();
|
authorizer.init();
|
||||||
|
LOG.trace("{} init complete", authClassName);
|
||||||
}
|
}
|
||||||
} catch (
|
} catch (
|
||||||
IllegalAccessException
|
IllegalAccessException
|
||||||
|
@ -107,6 +107,8 @@ public void initialize(URI uri, Configuration configuration)
|
|||||||
|
|
||||||
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
||||||
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
|
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
|
||||||
|
LOG.trace("AzureBlobFileSystemStore init complete");
|
||||||
|
|
||||||
final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
|
final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
|
||||||
|
|
||||||
this.setWorkingDirectory(this.getHomeDirectory());
|
this.setWorkingDirectory(this.getHomeDirectory());
|
||||||
@ -121,6 +123,7 @@ public void initialize(URI uri, Configuration configuration)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.trace("Initiate check for delegation token manager");
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
|
this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
|
||||||
|
|
||||||
@ -137,6 +140,7 @@ public void initialize(URI uri, Configuration configuration)
|
|||||||
// Initialize ABFS authorizer
|
// Initialize ABFS authorizer
|
||||||
//
|
//
|
||||||
this.authorizer = abfsConfiguration.getAbfsAuthorizer();
|
this.authorizer = abfsConfiguration.getAbfsAuthorizer();
|
||||||
|
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -148,8 +148,12 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration c
|
|||||||
} catch (IllegalAccessException exception) {
|
} catch (IllegalAccessException exception) {
|
||||||
throw new FileSystemOperationUnhandledException(exception);
|
throw new FileSystemOperationUnhandledException(exception);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.trace("AbfsConfiguration init complete");
|
||||||
|
|
||||||
this.userGroupInformation = UserGroupInformation.getCurrentUser();
|
this.userGroupInformation = UserGroupInformation.getCurrentUser();
|
||||||
this.userName = userGroupInformation.getShortUserName();
|
this.userName = userGroupInformation.getShortUserName();
|
||||||
|
LOG.trace("UGI init complete");
|
||||||
if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
|
if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
|
||||||
try {
|
try {
|
||||||
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
|
this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
|
||||||
@ -161,6 +165,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration c
|
|||||||
//Provide a default group name
|
//Provide a default group name
|
||||||
this.primaryUserGroup = userName;
|
this.primaryUserGroup = userName;
|
||||||
}
|
}
|
||||||
|
LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
|
||||||
|
|
||||||
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
|
this.azureAtomicRenameDirSet = new HashSet<>(Arrays.asList(
|
||||||
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
|
abfsConfiguration.getAzureAtomicRenameDirs().split(AbfsHttpConstants.COMMA)));
|
||||||
@ -170,6 +175,7 @@ public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration c
|
|||||||
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
|
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
|
||||||
initializeClient(uri, fileSystemName, accountName, useHttps);
|
initializeClient(uri, fileSystemName, accountName, useHttps);
|
||||||
this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
|
this.identityTransformer = new IdentityTransformer(abfsConfiguration.getRawConfiguration());
|
||||||
|
LOG.trace("IdentityTransformer init complete");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -296,6 +302,7 @@ public Hashtable<String, String> getFilesystemProperties() throws AzureBlobFileS
|
|||||||
public void setFilesystemProperties(final Hashtable<String, String> properties)
|
public void setFilesystemProperties(final Hashtable<String, String> properties)
|
||||||
throws AzureBlobFileSystemException {
|
throws AzureBlobFileSystemException {
|
||||||
if (properties == null || properties.isEmpty()) {
|
if (properties == null || properties.isEmpty()) {
|
||||||
|
LOG.trace("setFilesystemProperties no properties present");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1116,6 +1123,7 @@ private void initializeClient(URI uri, String fileSystemName, String accountName
|
|||||||
AccessTokenProvider tokenProvider = null;
|
AccessTokenProvider tokenProvider = null;
|
||||||
|
|
||||||
if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) {
|
if (abfsConfiguration.getAuthType(accountName) == AuthType.SharedKey) {
|
||||||
|
LOG.trace("Fetching SharedKey credentials");
|
||||||
int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
|
int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
|
||||||
if (dotIndex <= 0) {
|
if (dotIndex <= 0) {
|
||||||
throw new InvalidUriException(
|
throw new InvalidUriException(
|
||||||
@ -1124,14 +1132,17 @@ private void initializeClient(URI uri, String fileSystemName, String accountName
|
|||||||
creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
|
creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
|
||||||
abfsConfiguration.getStorageAccountKey());
|
abfsConfiguration.getStorageAccountKey());
|
||||||
} else {
|
} else {
|
||||||
|
LOG.trace("Fetching token provider");
|
||||||
tokenProvider = abfsConfiguration.getTokenProvider();
|
tokenProvider = abfsConfiguration.getTokenProvider();
|
||||||
ExtensionHelper.bind(tokenProvider, uri,
|
ExtensionHelper.bind(tokenProvider, uri,
|
||||||
abfsConfiguration.getRawConfiguration());
|
abfsConfiguration.getRawConfiguration());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.trace("Initializing AbfsClient for {}", baseUrl);
|
||||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||||
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
||||||
tokenProvider, abfsPerfTracker);
|
tokenProvider, abfsPerfTracker);
|
||||||
|
LOG.trace("AbfsClient init complete");
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getOctalNotation(FsPermission fsPermission) {
|
private String getOctalNotation(FsPermission fsPermission) {
|
||||||
|
@ -81,10 +81,14 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent
|
|||||||
|
|
||||||
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
|
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
|
||||||
try {
|
try {
|
||||||
|
LOG.trace("Initializing DelegatingSSLSocketFactory with {} SSL "
|
||||||
|
+ "Channel Mode", this.abfsConfiguration.getPreferredSSLFactoryOption());
|
||||||
DelegatingSSLSocketFactory.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption());
|
DelegatingSSLSocketFactory.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption());
|
||||||
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();
|
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// Suppress exception. Failure to init DelegatingSSLSocketFactory would have only performance impact.
|
// Suppress exception. Failure to init DelegatingSSLSocketFactory would have only performance impact.
|
||||||
|
LOG.trace("NonCritFailure: DelegatingSSLSocketFactory Init failed : "
|
||||||
|
+ "{}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,6 +145,8 @@ void execute() throws AzureBlobFileSystemException {
|
|||||||
throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(),
|
throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(),
|
||||||
result.getStorageErrorMessage(), null, result);
|
result.getStorageErrorMessage(), null, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.trace("{} REST operation complete", operationType);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -210,7 +212,7 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
|
|||||||
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
|
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.debug("HttpRequest: " + httpOperation.toString());
|
LOG.debug("HttpRequest: {}", httpOperation.toString());
|
||||||
|
|
||||||
if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
|
if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) {
|
||||||
return false;
|
return false;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user