From dcc4850ff32c6b4237910129d52e95b0b5d7f640 Mon Sep 17 00:00:00 2001 From: cnauroth Date: Wed, 11 Mar 2015 14:36:51 -0700 Subject: [PATCH] HADOOP-11693. Azure Storage FileSystem rename operations are throttled too aggressively to complete HBase WAL archiving. Contributed by Duo Xu. (cherry picked from commit 7a346bcb4fa5b56191ed00a39e72e51c9bdf1b56) --- .../hadoop-common/CHANGES.txt | 3 + .../fs/azure/AzureNativeFileSystemStore.java | 55 ++++++++++++++++++- .../hadoop/fs/azure/StorageInterface.java | 6 +- .../hadoop/fs/azure/StorageInterfaceImpl.java | 4 +- .../hadoop/fs/azure/MockStorageInterface.java | 2 +- 5 files changed, 63 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 8f196d5f0d5..4f3b404d07e 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -674,6 +674,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11618. DelegateToFileSystem erroneously uses default FS's port in constructor. (Brahma Reddy Battula via gera) + HADOOP-11693. Azure Storage FileSystem rename operations are throttled too + aggressively to complete HBase WAL archiving. (Duo Xu via cnauroth) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index b664fe760cb..c6ba84fc479 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -134,6 +134,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private static final String KEY_MAX_BACKOFF_INTERVAL = "fs.azure.io.retry.max.backoff.interval"; private static final String KEY_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval"; private static final String KEY_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries"; + + private static final String KEY_COPYBLOB_MIN_BACKOFF_INTERVAL = + "fs.azure.io.copyblob.retry.min.backoff.interval"; + private static final String KEY_COPYBLOB_MAX_BACKOFF_INTERVAL = + "fs.azure.io.copyblob.retry.max.backoff.interval"; + private static final String KEY_COPYBLOB_BACKOFF_INTERVAL = + "fs.azure.io.copyblob.retry.backoff.interval"; + private static final String KEY_COPYBLOB_MAX_IO_RETRIES = + "fs.azure.io.copyblob.retry.max.retries"; private static final String KEY_SELF_THROTTLE_ENABLE = "fs.azure.selfthrottling.enable"; private static final String KEY_SELF_THROTTLE_READ_FACTOR = "fs.azure.selfthrottling.read.factor"; @@ -199,6 +208,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15; + + private static final int DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL = 3 * 1000; + private static final int DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL = 90 * 1000; + private static final int DEFAULT_COPYBLOB_BACKOFF_INTERVAL = 30 * 1000; + private static final int DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS = 15; // Self-throttling defaults. Allowed range = (0,1.0] // Value of 1.0 means no self-throttling. @@ -2435,11 +2449,46 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // Rename the source blob to the destination blob by copying it to // the destination blob then deleting it. // - dstBlob.startCopyFromBlob(srcUri, getInstrumentedContext()); - waitForCopyToComplete(dstBlob, getInstrumentedContext()); + // Copy blob operation in Azure storage is very costly. It will be highly + // likely throttled during Azure storage gc. Short term fix will be using + // a more intensive exponential retry policy when the cluster is getting + // throttled. + try { + dstBlob.startCopyFromBlob(srcUri, null, getInstrumentedContext()); + } catch (StorageException se) { + if (se.getErrorCode().equals( + StorageErrorCode.SERVER_BUSY.toString())) { + int copyBlobMinBackoff = sessionConfiguration.getInt( + KEY_COPYBLOB_MIN_BACKOFF_INTERVAL, + DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL); + int copyBlobMaxBackoff = sessionConfiguration.getInt( + KEY_COPYBLOB_MAX_BACKOFF_INTERVAL, + DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL); + + int copyBlobDeltaBackoff = sessionConfiguration.getInt( + KEY_COPYBLOB_BACKOFF_INTERVAL, + DEFAULT_COPYBLOB_BACKOFF_INTERVAL); + + int copyBlobMaxRetries = sessionConfiguration.getInt( + KEY_COPYBLOB_MAX_IO_RETRIES, + DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS); + + BlobRequestOptions options = new BlobRequestOptions(); + options.setRetryPolicyFactory(new RetryExponentialRetry( + copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff, + copyBlobMaxRetries)); + dstBlob.startCopyFromBlob(srcUri, options, getInstrumentedContext()); + } else { + throw se; + } + } + waitForCopyToComplete(dstBlob, getInstrumentedContext()); safeDelete(srcBlob, lease); - } catch (Exception e) { + } catch (StorageException e) { + // Re-throw exception as an Azure storage exception. + throw new AzureException(e); + } catch (URISyntaxException e) { // Re-throw exception as an Azure storage exception. throw new AzureException(e); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java index 91928a29043..e89151d7098 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterface.java @@ -383,6 +383,10 @@ abstract class StorageInterface { * * @param source * A java.net.URI The URI of a source blob. + * @param options + * A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying + * null will use the default request options from the associated service client ( + * {@link CloudBlobClient}). * @param opContext * An {@link OperationContext} object that represents the context for the current operation. This object * is used to track requests to the storage service, and to provide additional runtime information about @@ -394,7 +398,7 @@ abstract class StorageInterface { * */ public abstract void startCopyFromBlob(URI source, - OperationContext opContext) + BlobRequestOptions options, OperationContext opContext) throws StorageException, URISyntaxException; /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java index 21205361098..90d4d8838ad 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/StorageInterfaceImpl.java @@ -393,11 +393,11 @@ class StorageInterfaceImpl extends StorageInterface { } @Override - public void startCopyFromBlob(URI source, + public void startCopyFromBlob(URI source, BlobRequestOptions options, OperationContext opContext) throws StorageException, URISyntaxException { getBlob().startCopyFromBlob(source, - null, null, null, opContext); + null, null, options, opContext); } @Override diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java index c51c05bd535..cde0e38ef6a 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/MockStorageInterface.java @@ -429,7 +429,7 @@ public class MockStorageInterface extends StorageInterface { } @Override - public void startCopyFromBlob(URI source, + public void startCopyFromBlob(URI source, BlobRequestOptions options, OperationContext opContext) throws StorageException, URISyntaxException { backingStore.copy(convertUriToDecodedString(source), convertUriToDecodedString(uri)); //TODO: set the backingStore.properties.CopyState and