HADOOP-11693. Azure Storage FileSystem rename operations are throttled too aggressively to complete HBase WAL archiving. Contributed by Duo Xu.
This commit is contained in:
parent
fb34f45727
commit
7a346bcb4f
|
@ -1088,6 +1088,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HADOOP-11618. DelegateToFileSystem erroneously uses default FS's port in
|
HADOOP-11618. DelegateToFileSystem erroneously uses default FS's port in
|
||||||
constructor. (Brahma Reddy Battula via gera)
|
constructor. (Brahma Reddy Battula via gera)
|
||||||
|
|
||||||
|
HADOOP-11693. Azure Storage FileSystem rename operations are throttled too
|
||||||
|
aggressively to complete HBase WAL archiving. (Duo Xu via cnauroth)
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -135,6 +135,15 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
private static final String KEY_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
|
private static final String KEY_BACKOFF_INTERVAL = "fs.azure.io.retry.backoff.interval";
|
||||||
private static final String KEY_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
|
private static final String KEY_MAX_IO_RETRIES = "fs.azure.io.retry.max.retries";
|
||||||
|
|
||||||
|
private static final String KEY_COPYBLOB_MIN_BACKOFF_INTERVAL =
|
||||||
|
"fs.azure.io.copyblob.retry.min.backoff.interval";
|
||||||
|
private static final String KEY_COPYBLOB_MAX_BACKOFF_INTERVAL =
|
||||||
|
"fs.azure.io.copyblob.retry.max.backoff.interval";
|
||||||
|
private static final String KEY_COPYBLOB_BACKOFF_INTERVAL =
|
||||||
|
"fs.azure.io.copyblob.retry.backoff.interval";
|
||||||
|
private static final String KEY_COPYBLOB_MAX_IO_RETRIES =
|
||||||
|
"fs.azure.io.copyblob.retry.max.retries";
|
||||||
|
|
||||||
private static final String KEY_SELF_THROTTLE_ENABLE = "fs.azure.selfthrottling.enable";
|
private static final String KEY_SELF_THROTTLE_ENABLE = "fs.azure.selfthrottling.enable";
|
||||||
private static final String KEY_SELF_THROTTLE_READ_FACTOR = "fs.azure.selfthrottling.read.factor";
|
private static final String KEY_SELF_THROTTLE_READ_FACTOR = "fs.azure.selfthrottling.read.factor";
|
||||||
private static final String KEY_SELF_THROTTLE_WRITE_FACTOR = "fs.azure.selfthrottling.write.factor";
|
private static final String KEY_SELF_THROTTLE_WRITE_FACTOR = "fs.azure.selfthrottling.write.factor";
|
||||||
|
@ -200,6 +209,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
|
private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
|
||||||
private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15;
|
private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15;
|
||||||
|
|
||||||
|
private static final int DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL = 3 * 1000;
|
||||||
|
private static final int DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL = 90 * 1000;
|
||||||
|
private static final int DEFAULT_COPYBLOB_BACKOFF_INTERVAL = 30 * 1000;
|
||||||
|
private static final int DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS = 15;
|
||||||
|
|
||||||
// Self-throttling defaults. Allowed range = (0,1.0]
|
// Self-throttling defaults. Allowed range = (0,1.0]
|
||||||
// Value of 1.0 means no self-throttling.
|
// Value of 1.0 means no self-throttling.
|
||||||
// Value of x means process data at factor x of unrestricted rate
|
// Value of x means process data at factor x of unrestricted rate
|
||||||
|
@ -2435,11 +2449,46 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
|
||||||
// Rename the source blob to the destination blob by copying it to
|
// Rename the source blob to the destination blob by copying it to
|
||||||
// the destination blob then deleting it.
|
// the destination blob then deleting it.
|
||||||
//
|
//
|
||||||
dstBlob.startCopyFromBlob(srcUri, getInstrumentedContext());
|
// Copy blob operation in Azure storage is very costly. It will be highly
|
||||||
waitForCopyToComplete(dstBlob, getInstrumentedContext());
|
// likely throttled during Azure storage gc. Short term fix will be using
|
||||||
|
// a more intensive exponential retry policy when the cluster is getting
|
||||||
|
// throttled.
|
||||||
|
try {
|
||||||
|
dstBlob.startCopyFromBlob(srcUri, null, getInstrumentedContext());
|
||||||
|
} catch (StorageException se) {
|
||||||
|
if (se.getErrorCode().equals(
|
||||||
|
StorageErrorCode.SERVER_BUSY.toString())) {
|
||||||
|
int copyBlobMinBackoff = sessionConfiguration.getInt(
|
||||||
|
KEY_COPYBLOB_MIN_BACKOFF_INTERVAL,
|
||||||
|
DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL);
|
||||||
|
|
||||||
|
int copyBlobMaxBackoff = sessionConfiguration.getInt(
|
||||||
|
KEY_COPYBLOB_MAX_BACKOFF_INTERVAL,
|
||||||
|
DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL);
|
||||||
|
|
||||||
|
int copyBlobDeltaBackoff = sessionConfiguration.getInt(
|
||||||
|
KEY_COPYBLOB_BACKOFF_INTERVAL,
|
||||||
|
DEFAULT_COPYBLOB_BACKOFF_INTERVAL);
|
||||||
|
|
||||||
|
int copyBlobMaxRetries = sessionConfiguration.getInt(
|
||||||
|
KEY_COPYBLOB_MAX_IO_RETRIES,
|
||||||
|
DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS);
|
||||||
|
|
||||||
|
BlobRequestOptions options = new BlobRequestOptions();
|
||||||
|
options.setRetryPolicyFactory(new RetryExponentialRetry(
|
||||||
|
copyBlobMinBackoff, copyBlobDeltaBackoff, copyBlobMaxBackoff,
|
||||||
|
copyBlobMaxRetries));
|
||||||
|
dstBlob.startCopyFromBlob(srcUri, options, getInstrumentedContext());
|
||||||
|
} else {
|
||||||
|
throw se;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
waitForCopyToComplete(dstBlob, getInstrumentedContext());
|
||||||
safeDelete(srcBlob, lease);
|
safeDelete(srcBlob, lease);
|
||||||
} catch (Exception e) {
|
} catch (StorageException e) {
|
||||||
|
// Re-throw exception as an Azure storage exception.
|
||||||
|
throw new AzureException(e);
|
||||||
|
} catch (URISyntaxException e) {
|
||||||
// Re-throw exception as an Azure storage exception.
|
// Re-throw exception as an Azure storage exception.
|
||||||
throw new AzureException(e);
|
throw new AzureException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -383,6 +383,10 @@ abstract class StorageInterface {
|
||||||
*
|
*
|
||||||
* @param source
|
* @param source
|
||||||
* A <code>java.net.URI</code> The URI of a source blob.
|
* A <code>java.net.URI</code> The URI of a source blob.
|
||||||
|
* @param options
|
||||||
|
* A {@link BlobRequestOptions} object that specifies any additional options for the request. Specifying
|
||||||
|
* <code>null</code> will use the default request options from the associated service client (
|
||||||
|
* {@link CloudBlobClient}).
|
||||||
* @param opContext
|
* @param opContext
|
||||||
* An {@link OperationContext} object that represents the context for the current operation. This object
|
* An {@link OperationContext} object that represents the context for the current operation. This object
|
||||||
* is used to track requests to the storage service, and to provide additional runtime information about
|
* is used to track requests to the storage service, and to provide additional runtime information about
|
||||||
|
@ -394,7 +398,7 @@ abstract class StorageInterface {
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public abstract void startCopyFromBlob(URI source,
|
public abstract void startCopyFromBlob(URI source,
|
||||||
OperationContext opContext)
|
BlobRequestOptions options, OperationContext opContext)
|
||||||
throws StorageException, URISyntaxException;
|
throws StorageException, URISyntaxException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -393,11 +393,11 @@ class StorageInterfaceImpl extends StorageInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startCopyFromBlob(URI source,
|
public void startCopyFromBlob(URI source, BlobRequestOptions options,
|
||||||
OperationContext opContext)
|
OperationContext opContext)
|
||||||
throws StorageException, URISyntaxException {
|
throws StorageException, URISyntaxException {
|
||||||
getBlob().startCopyFromBlob(source,
|
getBlob().startCopyFromBlob(source,
|
||||||
null, null, null, opContext);
|
null, null, options, opContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -429,7 +429,7 @@ public class MockStorageInterface extends StorageInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startCopyFromBlob(URI source,
|
public void startCopyFromBlob(URI source, BlobRequestOptions options,
|
||||||
OperationContext opContext) throws StorageException, URISyntaxException {
|
OperationContext opContext) throws StorageException, URISyntaxException {
|
||||||
backingStore.copy(convertUriToDecodedString(source), convertUriToDecodedString(uri));
|
backingStore.copy(convertUriToDecodedString(source), convertUriToDecodedString(uri));
|
||||||
//TODO: set the backingStore.properties.CopyState and
|
//TODO: set the backingStore.properties.CopyState and
|
||||||
|
|
Loading…
Reference in New Issue