HADOOP-14547. [WASB] the configured retry policy is not used for all storage operations.
Contributed by Thomas.
(cherry picked from commit c6bd73c6c5
)
This commit is contained in:
parent
4c59b446e2
commit
093eb0d361
|
@ -481,6 +481,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
}
|
||||
}
|
||||
|
||||
// Configure Azure storage session.
|
||||
configureAzureStorageSession();
|
||||
|
||||
// Start an Azure storage session.
|
||||
//
|
||||
createAzureStorageSession();
|
||||
|
@ -792,9 +795,6 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
// Accessing the storage server unauthenticated using
|
||||
// anonymous credentials.
|
||||
isAnonymousCredentials = true;
|
||||
|
||||
// Configure Azure storage session.
|
||||
configureAzureStorageSession();
|
||||
}
|
||||
|
||||
private void connectUsingCredentials(String accountName,
|
||||
|
@ -820,9 +820,6 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
|
||||
// Can only create container if using account key credentials
|
||||
canCreateOrModifyContainer = credentials instanceof StorageCredentialsAccountAndKey;
|
||||
|
||||
// Configure Azure storage session.
|
||||
configureAzureStorageSession();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -848,8 +845,6 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
|||
rootDirectory = container.getDirectoryReference("");
|
||||
|
||||
canCreateOrModifyContainer = true;
|
||||
|
||||
configureAzureStorageSession();
|
||||
tolerateOobAppends = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -69,6 +69,8 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
|||
public static final String SAS_ERROR_CODE = "SAS Error";
|
||||
private SASKeyGeneratorInterface sasKeyGenerator;
|
||||
private String storageAccount;
|
||||
private RetryPolicyFactory retryPolicy;
|
||||
private int timeoutIntervalInMs;
|
||||
|
||||
public SecureStorageInterfaceImpl(boolean useLocalSASKeyMode,
|
||||
Configuration conf) throws SecureModeException {
|
||||
|
@ -90,10 +92,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
|||
|
||||
@Override
|
||||
public void setTimeoutInMs(int timeoutInMs) {
|
||||
timeoutIntervalInMs = timeoutInMs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRetryPolicyFactory(RetryPolicyFactory retryPolicyFactory) {
|
||||
retryPolicy = retryPolicyFactory;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -133,9 +137,15 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
|||
throws URISyntaxException, StorageException {
|
||||
|
||||
try {
|
||||
return new SASCloudBlobContainerWrapperImpl(storageAccount,
|
||||
new CloudBlobContainer(sasKeyGenerator.getContainerSASUri(
|
||||
storageAccount, name)), sasKeyGenerator);
|
||||
CloudBlobContainer container = new CloudBlobContainer(sasKeyGenerator.getContainerSASUri(
|
||||
storageAccount, name));
|
||||
if (retryPolicy != null) {
|
||||
container.getServiceClient().getDefaultRequestOptions().setRetryPolicyFactory(retryPolicy);
|
||||
}
|
||||
if (timeoutIntervalInMs > 0) {
|
||||
container.getServiceClient().getDefaultRequestOptions().setTimeoutIntervalInMs(timeoutIntervalInMs);
|
||||
}
|
||||
return new SASCloudBlobContainerWrapperImpl(storageAccount, container, sasKeyGenerator);
|
||||
} catch (SASKeyGenerationException sasEx) {
|
||||
String errorMsg = "Encountered SASKeyGeneration exception while "
|
||||
+ "generating SAS Key for container : " + name
|
||||
|
@ -216,9 +226,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
|||
public CloudBlobWrapper getBlockBlobReference(String relativePath)
|
||||
throws URISyntaxException, StorageException {
|
||||
try {
|
||||
CloudBlockBlob blob = new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri(
|
||||
storageAccount, getName(), relativePath));
|
||||
blob.getServiceClient().setDefaultRequestOptions(
|
||||
container.getServiceClient().getDefaultRequestOptions());
|
||||
return new SASCloudBlockBlobWrapperImpl(
|
||||
new CloudBlockBlob(sasKeyGenerator.getRelativeBlobSASUri(
|
||||
storageAccount, getName(), relativePath)));
|
||||
blob);
|
||||
} catch (SASKeyGenerationException sasEx) {
|
||||
String errorMsg = "Encountered SASKeyGeneration exception while "
|
||||
+ "generating SAS Key for relativePath : " + relativePath
|
||||
|
@ -232,9 +245,12 @@ public class SecureStorageInterfaceImpl extends StorageInterface {
|
|||
public CloudBlobWrapper getPageBlobReference(String relativePath)
|
||||
throws URISyntaxException, StorageException {
|
||||
try {
|
||||
CloudPageBlob blob = new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri(
|
||||
storageAccount, getName(), relativePath));
|
||||
blob.getServiceClient().setDefaultRequestOptions(
|
||||
container.getServiceClient().getDefaultRequestOptions());
|
||||
return new SASCloudPageBlobWrapperImpl(
|
||||
new CloudPageBlob(sasKeyGenerator.getRelativeBlobSASUri(
|
||||
storageAccount, getName(), relativePath)));
|
||||
blob);
|
||||
} catch (SASKeyGenerationException sasEx) {
|
||||
String errorMsg = "Encountered SASKeyGeneration exception while "
|
||||
+ "generating SAS Key for relativePath : " + relativePath
|
||||
|
|
|
@ -60,32 +60,50 @@ import com.microsoft.azure.storage.blob.PageRange;
|
|||
@InterfaceAudience.Private
|
||||
class StorageInterfaceImpl extends StorageInterface {
|
||||
private CloudBlobClient serviceClient;
|
||||
private RetryPolicyFactory retryPolicyFactory;
|
||||
private int timeoutIntervalInMs;
|
||||
|
||||
private void updateRetryPolicy() {
|
||||
if (serviceClient != null && retryPolicyFactory != null) {
|
||||
serviceClient.getDefaultRequestOptions().setRetryPolicyFactory(retryPolicyFactory);
|
||||
}
|
||||
}
|
||||
|
||||
private void updateTimeoutInMs() {
|
||||
if (serviceClient != null && timeoutIntervalInMs > 0) {
|
||||
serviceClient.getDefaultRequestOptions().setTimeoutIntervalInMs(timeoutIntervalInMs);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setRetryPolicyFactory(final RetryPolicyFactory retryPolicyFactory) {
|
||||
serviceClient.getDefaultRequestOptions().setRetryPolicyFactory(
|
||||
retryPolicyFactory);
|
||||
this.retryPolicyFactory = retryPolicyFactory;
|
||||
updateRetryPolicy();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimeoutInMs(int timeoutInMs) {
|
||||
serviceClient.getDefaultRequestOptions().setTimeoutIntervalInMs(
|
||||
timeoutInMs);
|
||||
timeoutIntervalInMs = timeoutInMs;
|
||||
updateTimeoutInMs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createBlobClient(CloudStorageAccount account) {
|
||||
serviceClient = account.createCloudBlobClient();
|
||||
updateRetryPolicy();
|
||||
updateTimeoutInMs();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createBlobClient(URI baseUri) {
|
||||
serviceClient = new CloudBlobClient(baseUri);
|
||||
createBlobClient(baseUri, (StorageCredentials)null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createBlobClient(URI baseUri, StorageCredentials credentials) {
|
||||
serviceClient = new CloudBlobClient(baseUri, credentials);
|
||||
updateRetryPolicy();
|
||||
updateTimeoutInMs();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue