HADOOP-14547. [WASB] the configured retry policy is not used for all storage operations.
Contributed by Thomas.
This commit is contained in:
parent
0b77262890
commit
c6bd73c6c5
|
@ -481,6 +481,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Configure Azure storage session.
|
||||||
|
configureAzureStorageSession();
|
||||||
|
|
||||||
// Start an Azure storage session.
|
// Start an Azure storage session.
|
||||||
//
|
//
|
||||||
createAzureStorageSession();
|
createAzureStorageSession();
|
||||||
|
@ -792,9 +795,6 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
// Accessing the storage server unauthenticated using
|
// Accessing the storage server unauthenticated using
|
||||||
// anonymous credentials.
|
// anonymous credentials.
|
||||||
isAnonymousCredentials = true;
|
isAnonymousCredentials = true;
|
||||||
|
|
||||||
// Configure Azure storage session.
|
|
||||||
configureAzureStorageSession();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void connectUsingCredentials(String accountName,
|
private void connectUsingCredentials(String accountName,
|
||||||
|
@ -820,9 +820,6 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
|
|
||||||
// Can only create container if using account key credentials
|
// Can only create container if using account key credentials
|
||||||
canCreateOrModifyContainer = credentials instanceof StorageCredentialsAccountAndKey;
|
canCreateOrModifyContainer = credentials instanceof StorageCredentialsAccountAndKey;
|
||||||
|
|
||||||
// Configure Azure storage session.
|
|
||||||
configureAzureStorageSession();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -848,8 +845,6 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
rootDirectory = container.getDirectoryReference("");
|
rootDirectory = container.getDirectoryReference("");
|
||||||
|
|
||||||
canCreateOrModifyContainer = true;
|
canCreateOrModifyContainer = true;
|
||||||
|
|
||||||
configureAzureStorageSession();
|
|
||||||
tolerateOobAppends = false;
|
tolerateOobAppends = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,6 +69,8 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
||||||
public static final String SAS_ERROR_CODE = "SAS Error";
|
public static final String SAS_ERROR_CODE = "SAS Error";
|
||||||
private SASKeyGeneratorInterface sasKeyGenerator;
|
private SASKeyGeneratorInterface sasKeyGenerator;
|
||||||
private String storageAccount;
|
private String storageAccount;
|
||||||
|
private RetryPolicyFactory retryPolicy;
|
||||||
|
private int timeoutIntervalInMs;
|
||||||
|
|
||||||
public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode,
|
public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode,
|
||||||
Configuration conf) throws SecureModeException {
|
Configuration conf) throws SecureModeException {
|
||||||
|
@ -90,10 +92,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setTimeoutInMs(int timeoutInMs) {
|
public void setTimeoutInMs(int timeoutInMs) {
|
||||||
|
timeoutIntervalInMs = timeoutInMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setRetryPolicyFactory(RetryPolicyFactory retryPolicyFactory) {
|
public void setRetryPolicyFactory(RetryPolicyFactory retryPolicyFactory) {
|
||||||
|
retryPolicy = retryPolicyFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -133,9 +137,15 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
||||||
throws URISyntaxException, StorageException {
|
throws URISyntaxException, StorageException {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return new SASCloudBlobContainerWrapperImpl(storageAccount,
|
CloudBlobContainer container = new CloudBlobContainer(sasKeyGenerator.getContainerSASUri(
|
||||||
new CloudBlobContainer(sasKeyGenerator.getContainerSASUri(
|
storageAccount, name));
|
||||||
storageAccount, name)), sasKeyGenerator);
|
if (retryPolicy != null) {
|
||||||
|
container.getServiceClient().getDefaultRequestOptions().setRetryPolicyFactory(retryPolicy);
|
||||||
|
}
|
||||||
|
if (timeoutIntervalInMs > 0) {
|
||||||
|
container.getServiceClient().getDefaultRequestOptions().setTimeoutIntervalInMs(timeoutIntervalInMs);
|
||||||
|
}
|
||||||
|
return new SASCloudBlobContainerWrapperImpl(storageAccount, container, sasKeyGenerator);
|
||||||
} catch (SASKeyGenerationException sasEx) {
|
} catch (SASKeyGenerationException sasEx) {
|
||||||
String errorMsg = "Encountered SASKeyGeneration exception while "
|
String errorMsg = "Encountered SASKeyGeneration exception while "
|
||||||
+ "generating SAS Key for container : " + name
|
+ "generating SAS Key for container : " + name
|
||||||
|
@ -216,9 +226,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
||||||
public CloudBlobWrapper getBlockBlobReference(String relativePath)
|
public CloudBlobWrapper getBlockBlobReference(String relativePath)
|
||||||
throws URISyntaxException, StorageException {
|
throws URISyntaxException, StorageException {
|
||||||
try {
|
try {
|
||||||
|
CloudBlockBlob blob = new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri(
|
||||||
|
storageAccount, getName(), relativePath));
|
||||||
|
blob.getServiceClient().setDefaultRequestOptions(
|
||||||
|
container.getServiceClient().getDefaultRequestOptions());
|
||||||
return new SASCloudBlockBlobWrapperImpl(
|
return new SASCloudBlockBlobWrapperImpl(
|
||||||
new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri(
|
blob);
|
||||||
storageAccount, getName(), relativePath)));
|
|
||||||
} catch (SASKeyGenerationException sasEx) {
|
} catch (SASKeyGenerationException sasEx) {
|
||||||
String errorMsg = "Encountered SASKeyGeneration exception while "
|
String errorMsg = "Encountered SASKeyGeneration exception while "
|
||||||
+ "generating SAS Key for relativePath : " + relativePath
|
+ "generating SAS Key for relativePath : " + relativePath
|
||||||
|
@ -232,9 +245,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
||||||
public CloudBlobWrapper getPageBlobReference(String relativePath)
|
public CloudBlobWrapper getPageBlobReference(String relativePath)
|
||||||
throws URISyntaxException, StorageException {
|
throws URISyntaxException, StorageException {
|
||||||
try {
|
try {
|
||||||
|
CloudPageBlob blob = new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri(
|
||||||
|
storageAccount, getName(), relativePath));
|
||||||
|
blob.getServiceClient().setDefaultRequestOptions(
|
||||||
|
container.getServiceClient().getDefaultRequestOptions());
|
||||||
return new SASCloudPageBlobWrapperImpl(
|
return new SASCloudPageBlobWrapperImpl(
|
||||||
new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri(
|
blob);
|
||||||
storageAccount, getName(), relativePath)));
|
|
||||||
} catch (SASKeyGenerationException sasEx) {
|
} catch (SASKeyGenerationException sasEx) {
|
||||||
String errorMsg = "Encountered SASKeyGeneration exception while "
|
String errorMsg = "Encountered SASKeyGeneration exception while "
|
||||||
+ "generating SAS Key for relativePath : " + relativePath
|
+ "generating SAS Key for relativePath : " + relativePath
|
||||||
|
|
|
@ -60,32 +60,50 @@ import com.microsoft.azure.storage.blob.PageRange;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
class StorageInterfaceImpl extends StorageInterface {
|
class StorageInterfaceImpl extends StorageInterface {
|
||||||
private CloudBlobClient serviceClient;
|
private CloudBlobClient serviceClient;
|
||||||
|
private RetryPolicyFactory retryPolicyFactory;
|
||||||
|
private int timeoutIntervalInMs;
|
||||||
|
|
||||||
|
private void updateRetryPolicy() {
|
||||||
|
if (serviceClient != null && retryPolicyFactory != null) {
|
||||||
|
serviceClient.getDefaultRequestOptions().setRetryPolicyFactory(retryPolicyFactory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateTimeoutInMs() {
|
||||||
|
if (serviceClient != null && timeoutIntervalInMs > 0) {
|
||||||
|
serviceClient.getDefaultRequestOptions().setTimeoutIntervalInMs(timeoutIntervalInMs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setRetryPolicyFactory(final RetryPolicyFactory retryPolicyFactory) {
|
public void setRetryPolicyFactory(final RetryPolicyFactory retryPolicyFactory) {
|
||||||
serviceClient.getDefaultRequestOptions().setRetryPolicyFactory(
|
this.retryPolicyFactory = retryPolicyFactory;
|
||||||
retryPolicyFactory);
|
updateRetryPolicy();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setTimeoutInMs(int timeoutInMs) {
|
public void setTimeoutInMs(int timeoutInMs) {
|
||||||
serviceClient.getDefaultRequestOptions().setTimeoutIntervalInMs(
|
timeoutIntervalInMs = timeoutInMs;
|
||||||
timeoutInMs);
|
updateTimeoutInMs();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createBlobClient(CloudStorageAccount account) {
|
public void createBlobClient(CloudStorageAccount account) {
|
||||||
serviceClient = account.createCloudBlobClient();
|
serviceClient = account.createCloudBlobClient();
|
||||||
|
updateRetryPolicy();
|
||||||
|
updateTimeoutInMs();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createBlobClient(URI baseUri) {
|
public void createBlobClient(URI baseUri) {
|
||||||
serviceClient = new CloudBlobClient(baseUri);
|
createBlobClient(baseUri, (StorageCredentials)null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void createBlobClient(URI baseUri, StorageCredentials credentials) {
|
public void createBlobClient(URI baseUri, StorageCredentials credentials) {
|
||||||
serviceClient = new CloudBlobClient(baseUri, credentials);
|
serviceClient = new CloudBlobClient(baseUri, credentials);
|
||||||
|
updateRetryPolicy();
|
||||||
|
updateTimeoutInMs();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue