HADOOP-15703. ABFS - Implement client-side throttling.

Contributed by Sneha Varma and Thomas Marquardt.
This commit is contained in:
Thomas Marquardt 2018-09-03 01:37:10 +00:00
parent 4410eacba7
commit 97f06b3fc7
11 changed files with 633 additions and 5 deletions

View File

@ -99,7 +99,7 @@ private ClientThrottlingAnalyzer() {
this.blobMetrics = new AtomicReference<BlobOperationMetrics>(
new BlobOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer(
String.format("wasb-timer-client-throttling-analyzer-%s", name));
String.format("wasb-timer-client-throttling-analyzer-%s", name), true);
this.timer.schedule(new TimerTaskImpl(),
analysisPeriodMs,
analysisPeriodMs);

View File

@ -141,6 +141,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_FLUSH)
private boolean enableFlush;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING,
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;
@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY,
DefaultValue = "")
private String userAgentId;
@ -279,6 +283,10 @@ public boolean isFlushEnabled() {
return this.enableFlush;
}
public boolean isAutoThrottlingEnabled() {
return this.enableAutoThrottling;
}
public String getCustomUserAgentPrefix() {
return this.userAgentId;
}

View File

@ -38,6 +38,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -130,6 +131,8 @@ public void initialize(URI uri, Configuration configuration)
this.delegationTokenManager = abfsStore.getAbfsConfiguration().getDelegationTokenManager();
}
}
AbfsClientThrottlingIntercept.initializeSingleton(abfsStore.getAbfsConfiguration().isAutoThrottlingEnabled());
}
@Override

View File

@ -47,7 +47,7 @@ public final class ConfigurationKeys {
public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append";
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
public static final String FS_AZURE_AUTOTHROTTLING_ENABLE = "fs.azure.autothrottling.enable";
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth";
public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush";

View File

@ -57,6 +57,7 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1;
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
public static final SSLSocketFactoryEx.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
= SSLSocketFactoryEx.SSLChannelMode.Default;

View File

@ -125,6 +125,7 @@ public AbfsRestOperation createFilesystem() throws AzureBlobFileSystemException
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CreateFileSystem,
this,
HTTP_METHOD_PUT,
url,
@ -148,6 +149,7 @@ public AbfsRestOperation setFilesystemProperties(final String properties) throws
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetFileSystemProperties,
this,
HTTP_METHOD_PUT,
url,
@ -170,6 +172,7 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.ListPaths,
this,
HTTP_METHOD_GET,
url,
@ -186,6 +189,7 @@ public AbfsRestOperation getFilesystemProperties() throws AzureBlobFileSystemExc
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.GetFileSystemProperties,
this,
HTTP_METHOD_HEAD,
url,
@ -202,6 +206,7 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.DeleteFileSystem,
this,
HTTP_METHOD_DELETE,
url,
@ -230,6 +235,7 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CreatePath,
this,
HTTP_METHOD_PUT,
url,
@ -251,6 +257,7 @@ public AbfsRestOperation renamePath(final String source, final String destinatio
final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.RenamePath,
this,
HTTP_METHOD_PUT,
url,
@ -273,6 +280,7 @@ public AbfsRestOperation append(final String path, final long position, final by
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.Append,
this,
HTTP_METHOD_PUT,
url,
@ -296,6 +304,7 @@ public AbfsRestOperation flush(final String path, final long position, boolean r
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.Flush,
this,
HTTP_METHOD_PUT,
url,
@ -319,6 +328,7 @@ public AbfsRestOperation setPathProperties(final String path, final String prope
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetPathProperties,
this,
HTTP_METHOD_PUT,
url,
@ -334,6 +344,7 @@ public AbfsRestOperation getPathProperties(final String path) throws AzureBlobFi
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.GetPathProperties,
this,
HTTP_METHOD_HEAD,
url,
@ -354,6 +365,7 @@ public AbfsRestOperation read(final String path, final long position, final byte
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.ReadFile,
this,
HTTP_METHOD_GET,
url,
@ -376,6 +388,7 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive,
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.DeletePath,
this,
HTTP_METHOD_DELETE,
url,
@ -404,6 +417,7 @@ public AbfsRestOperation setOwner(final String path, final String owner, final S
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetOwner,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
@ -427,6 +441,7 @@ public AbfsRestOperation setPermission(final String path, final String permissio
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetPermissions,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
@ -458,6 +473,7 @@ public AbfsRestOperation setAcl(final String path, final String aclSpecString, f
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetAcl,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
@ -474,6 +490,7 @@ public AbfsRestOperation getAclStatus(final String path) throws AzureBlobFileSys
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.GetAcl,
this,
AbfsHttpConstants.HTTP_METHOD_HEAD,
url,

View File

@ -0,0 +1,272 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class AbfsClientThrottlingAnalyzer {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingAnalyzer.class);
private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1;
private static final double RAPID_SLEEP_DECREASE_FACTOR = .75;
private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150
* 1000;
private static final double SLEEP_DECREASE_FACTOR = .975;
private static final double SLEEP_INCREASE_FACTOR = 1.05;
private int analysisPeriodMs;
private volatile int sleepDuration = 0;
private long consecutiveNoErrorCount = 0;
private String name = null;
private Timer timer = null;
private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
private AbfsClientThrottlingAnalyzer() {
// hide default constructor
}
/**
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
* the specified name.
*
* @param name a name used to identify this instance.
* @throws IllegalArgumentException if name is null or empty.
*/
AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
this(name, DEFAULT_ANALYSIS_PERIOD_MS);
}
/**
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
* the specified name and period.
*
* @param name A name used to identify this instance.
* @param period The frequency, in milliseconds, at which metrics are
* analyzed.
* @throws IllegalArgumentException If name is null or empty.
* If period is less than 1000 or greater than 30000 milliseconds.
*/
AbfsClientThrottlingAnalyzer(String name, int period)
throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotEmpty(name),
"The argument 'name' cannot be null or empty.");
Preconditions.checkArgument(
period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
"The argument 'period' must be between 1000 and 30000.");
this.name = name;
this.analysisPeriodMs = period;
this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
new AbfsOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer(
String.format("abfs-timer-client-throttling-analyzer-%s", name), true);
this.timer.schedule(new TimerTaskImpl(),
analysisPeriodMs,
analysisPeriodMs);
}
/**
* Updates metrics with results from the current storage operation.
*
* @param count The count of bytes transferred.
* @param isFailedOperation True if the operation failed; otherwise false.
*/
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
metrics.bytesFailed.addAndGet(count);
metrics.operationsFailed.incrementAndGet();
} else {
metrics.bytesSuccessful.addAndGet(count);
metrics.operationsSuccessful.incrementAndGet();
}
}
/**
* Suspends the current storage operation, as necessary, to reduce throughput.
*/
public void suspendIfNecessary() {
int duration = sleepDuration;
if (duration > 0) {
try {
Thread.sleep(duration);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
@VisibleForTesting
int getSleepDuration() {
return sleepDuration;
}
private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
int sleepDuration) {
final double percentageConversionFactor = 100;
double bytesFailed = metrics.bytesFailed.get();
double bytesSuccessful = metrics.bytesSuccessful.get();
double operationsFailed = metrics.operationsFailed.get();
double operationsSuccessful = metrics.operationsSuccessful.get();
double errorPercentage = (bytesFailed <= 0)
? 0
: (percentageConversionFactor
* bytesFailed
/ (bytesFailed + bytesSuccessful));
long periodMs = metrics.endTime - metrics.startTime;
double newSleepDuration;
if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) {
++consecutiveNoErrorCount;
// Decrease sleepDuration in order to increase throughput.
double reductionFactor =
(consecutiveNoErrorCount * analysisPeriodMs
>= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
? RAPID_SLEEP_DECREASE_FACTOR
: SLEEP_DECREASE_FACTOR;
newSleepDuration = sleepDuration * reductionFactor;
} else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) {
// Do not modify sleepDuration in order to stabilize throughput.
newSleepDuration = sleepDuration;
} else {
// Increase sleepDuration in order to minimize error rate.
consecutiveNoErrorCount = 0;
// Increase sleep duration in order to reduce throughput and error rate.
// First, calculate target throughput: bytesSuccessful / periodMs.
// Next, calculate time required to send *all* data (assuming next period
// is similar to previous) at the target throughput: (bytesSuccessful
// + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to
// get the total additional delay needed.
double additionalDelayNeeded = 5 * analysisPeriodMs;
if (bytesSuccessful > 0) {
additionalDelayNeeded = (bytesSuccessful + bytesFailed)
* periodMs
/ bytesSuccessful
- periodMs;
}
// amortize the additional delay needed across the estimated number of
// requests during the next period
newSleepDuration = additionalDelayNeeded
/ (operationsFailed + operationsSuccessful);
final double maxSleepDuration = analysisPeriodMs;
final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR;
// Add 1 ms to avoid rounding down and to decrease proximity to the server
// side ingress/egress limit. Ensure that the new sleep duration is
// larger than the current one to more quickly reduce the number of
// errors. Don't allow the sleep duration to grow unbounded, after a
// certain point throttling won't help, for example, if there are far too
// many tasks/containers/nodes no amount of throttling will help.
newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1;
newSleepDuration = Math.min(newSleepDuration, maxSleepDuration);
}
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
name,
(int) bytesFailed,
(int) bytesSuccessful,
(int) operationsFailed,
(int) operationsSuccessful,
errorPercentage,
periodMs,
(int) sleepDuration,
(int) newSleepDuration));
}
return (int) newSleepDuration;
}
/**
* Timer callback implementation for periodically analyzing metrics.
*/
class TimerTaskImpl extends TimerTask {
private AtomicInteger doingWork = new AtomicInteger(0);
/**
* Periodically analyzes a snapshot of the blob storage metrics and updates
* the sleepDuration in order to appropriately throttle storage operations.
*/
@Override
public void run() {
boolean doWork = false;
try {
doWork = doingWork.compareAndSet(0, 1);
// prevent concurrent execution of this task
if (!doWork) {
return;
}
long now = System.currentTimeMillis();
if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
new AbfsOperationMetrics(now));
oldMetrics.endTime = now;
sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
sleepDuration);
}
} finally {
if (doWork) {
doingWork.set(0);
}
}
}
}
/**
* Stores Abfs operation metrics during each analysis period.
*/
static class AbfsOperationMetrics {
private AtomicLong bytesFailed;
private AtomicLong bytesSuccessful;
private AtomicLong operationsFailed;
private AtomicLong operationsSuccessful;
private long endTime;
private long startTime;
AbfsOperationMetrics(long startTime) {
this.startTime = startTime;
this.bytesFailed = new AtomicLong();
this.bytesSuccessful = new AtomicLong();
this.operationsFailed = new AtomicLong();
this.operationsSuccessful = new AtomicLong();
}
}
}

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.net.HttpURLConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Throttles Azure Blob File System read and write operations to achieve maximum
* throughput by minimizing errors. The errors occur when the account ingress
* or egress limits are exceeded and the server-side throttles requests.
* Server-side throttling causes the retry policy to be used, but the retry
* policy sleeps for long periods of time causing the total ingress or egress
* throughput to be as much as 35% lower than optimal. The retry policy is also
* after the fact, in that it applies after a request fails. On the other hand,
* the client-side throttling implemented here happens before requests are made
* and sleeps just enough to minimize errors, allowing optimal ingress and/or
* egress throughput.
*/
public final class AbfsClientThrottlingIntercept {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingIntercept.class);
private static AbfsClientThrottlingIntercept singleton = null;
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
// Hide default constructor
private AbfsClientThrottlingIntercept() {
readThrottler = new AbfsClientThrottlingAnalyzer("read");
writeThrottler = new AbfsClientThrottlingAnalyzer("write");
isAutoThrottlingEnabled = true;
LOG.debug("Client-side throttling is enabled for the ABFS file system.");
}
public static synchronized void initializeSingleton(boolean isAutoThrottlingEnabled) {
if (!isAutoThrottlingEnabled) {
return;
}
if (singleton == null) {
singleton = new AbfsClientThrottlingIntercept();
}
}
static void updateMetrics(AbfsRestOperationType operationType,
AbfsHttpOperation abfsHttpOperation) {
if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
return;
}
int status = abfsHttpOperation.getStatusCode();
long contentLength = 0;
// If the socket is terminated prior to receiving a response, the HTTP
// status may be 0 or -1. A status less than 200 or greater than or equal
// to 500 is considered an error.
boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
|| status >= HttpURLConnection.HTTP_INTERNAL_ERROR);
switch (operationType) {
case Append:
contentLength = abfsHttpOperation.getBytesSent();
if (contentLength > 0) {
singleton.writeThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
case ReadFile:
contentLength = abfsHttpOperation.getBytesReceived();
if (contentLength > 0) {
singleton.readThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
default:
break;
}
}
/**
* Called before the request is sent. Client-side throttling
* uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput.
*/
static void sendingRequest(AbfsRestOperationType operationType) {
if (!isAutoThrottlingEnabled) {
return;
}
switch (operationType) {
case ReadFile:
singleton.readThrottler.suspendIfNecessary();
break;
case Append:
singleton.writeThrottler.suspendIfNecessary();
break;
default:
break;
}
}
}

View File

@ -36,6 +36,8 @@
* The AbfsRestOperation for Rest AbfsClient.
*/
public class AbfsRestOperation {
// The type of the REST operation (Append, ReadFile, etc)
private final AbfsRestOperationType operationType;
// Blob FS client, which has the credentials, retry policy, and logs.
private final AbfsClient client;
// the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
@ -71,10 +73,12 @@ public AbfsHttpOperation getResult() {
* @param url The full URL including query string parameters.
* @param requestHeaders The HTTP request headers.
*/
AbfsRestOperation(final AbfsClient client,
AbfsRestOperation(final AbfsRestOperationType operationType,
final AbfsClient client,
final String method,
final URL url,
final List<AbfsHttpHeader> requestHeaders) {
this.operationType = operationType;
this.client = client;
this.method = method;
this.url = url;
@ -86,6 +90,7 @@ public AbfsHttpOperation getResult() {
/**
* Initializes a new REST operation.
*
* @param operationType The type of the REST operation (Append, ReadFile, etc).
* @param client The Blob FS client.
* @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE).
* @param url The full URL including query string parameters.
@ -95,14 +100,15 @@ public AbfsHttpOperation getResult() {
* @param bufferOffset An offset into the buffer where the data beings.
* @param bufferLength The length of the data in the buffer.
*/
AbfsRestOperation(AbfsClient client,
AbfsRestOperation(AbfsRestOperationType operationType,
AbfsClient client,
String method,
URL url,
List<AbfsHttpHeader> requestHeaders,
byte[] buffer,
int bufferOffset,
int bufferLength) {
this(client, method, url, requestHeaders);
this(operationType, client, method, url, requestHeaders);
this.buffer = buffer;
this.bufferOffset = bufferOffset;
this.bufferLength = bufferLength;
@ -152,6 +158,7 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
if (hasRequestBody) {
// HttpUrlConnection requires
AbfsClientThrottlingIntercept.sendingRequest(operationType);
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
}
@ -168,6 +175,8 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS
throw new InvalidAbfsRestOperationException(ex);
}
return false;
} finally {
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
}
LOG.debug("HttpRequest: " + httpOperation.toString());

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
/**
* The REST operation type (Read, Append, Other ).
*/
public enum AbfsRestOperationType {
CreateFileSystem,
GetFileSystemProperties,
SetFileSystemProperties,
ListPaths,
DeleteFileSystem,
CreatePath,
RenamePath,
GetAcl,
GetPathProperties,
SetAcl,
SetOwner,
SetPathProperties,
SetPermissions,
Append,
Flush,
ReadFile,
DeletePath
}

View File

@ -0,0 +1,159 @@
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests for <code>AbfsClientThrottlingAnalyzer</code>.
*/
public class TestAbfsClientThrottlingAnalyzer {
private static final int ANALYSIS_PERIOD = 1000;
private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD
+ ANALYSIS_PERIOD / 10;
private static final long MEGABYTE = 1024 * 1024;
private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
private void sleep(long milliseconds) {
try {
Thread.sleep(milliseconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void fuzzyValidate(long expected, long actual, double percentage) {
final double lowerBound = Math.max(expected - percentage / 100 * expected, 0);
final double upperBound = expected + percentage / 100 * expected;
assertTrue(
String.format(
"The actual value %1$d is not within the expected range: "
+ "[%2$.2f, %3$.2f].",
actual,
lowerBound,
upperBound),
actual >= lowerBound && actual <= upperBound);
}
private void validate(long expected, long actual) {
assertEquals(
String.format("The actual value %1$d is not the expected value %2$d.",
actual,
expected),
expected, actual);
}
private void validateLessThanOrEqual(long maxExpected, long actual) {
assertTrue(
String.format(
"The actual value %1$d is not less than or equal to the maximum"
+ " expected value %2$d.",
actual,
maxExpected),
actual < maxExpected);
}
/**
* Ensure that there is no waiting (sleepDuration = 0) if the metrics have
* never been updated. This validates proper initialization of
* ClientThrottlingAnalyzer.
*/
@Test
public void testNoMetricUpdatesThenNoWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
validate(0, analyzer.getSleepDuration());
}
/**
* Ensure that there is no waiting (sleepDuration = 0) if the metrics have
* only been updated with successful requests.
*/
@Test
public void testOnlySuccessThenNoWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
analyzer.addBytesTransferred(8 * MEGABYTE, false);
validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
validate(0, analyzer.getSleepDuration());
}
/**
* Ensure that there is waiting (sleepDuration != 0) if the metrics have
* only been updated with failed requests. Also ensure that the
* sleepDuration decreases over time.
*/
@Test
public void testOnlyErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(4 * MEGABYTE, true);
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
final int expectedSleepDuration1 = 1100;
validateLessThanOrEqual(expectedSleepDuration1, analyzer.getSleepDuration());
sleep(10 * ANALYSIS_PERIOD);
final int expectedSleepDuration2 = 900;
validateLessThanOrEqual(expectedSleepDuration2, analyzer.getSleepDuration());
}
/**
* Ensure that there is waiting (sleepDuration != 0) if the metrics have
* only been updated with both successful and failed requests. Also ensure
* that the sleepDuration decreases over time.
*/
@Test
public void testSuccessAndErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(8 * MEGABYTE, false);
analyzer.addBytesTransferred(2 * MEGABYTE, true);
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
analyzer.suspendIfNecessary();
final int expectedElapsedTime = 126;
fuzzyValidate(expectedElapsedTime,
timer.elapsedTimeMs(),
MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
sleep(10 * ANALYSIS_PERIOD);
final int expectedSleepDuration = 110;
validateLessThanOrEqual(expectedSleepDuration, analyzer.getSleepDuration());
}
/**
* Ensure that there is waiting (sleepDuration != 0) if the metrics have
* only been updated with many successful and failed requests. Also ensure
* that the sleepDuration decreases to zero over time.
*/
@Test
public void testManySuccessAndErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
final int numberOfRequests = 20;
for (int i = 0; i < numberOfRequests; i++) {
analyzer.addBytesTransferred(8 * MEGABYTE, false);
analyzer.addBytesTransferred(2 * MEGABYTE, true);
}
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
analyzer.suspendIfNecessary();
fuzzyValidate(7,
timer.elapsedTimeMs(),
MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
sleep(10 * ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
}
}