HADOOP-14660 wasb: improve throughput by 34% when account limit exceeded.

Contributed by Thomas Marquardt.
This commit is contained in:
Steve Loughran 2017-08-01 21:33:52 +01:00
parent 7774759830
commit 778d4edd9a
No known key found for this signature in database
GPG Key ID: 950CC3E032B79CA2
8 changed files with 1282 additions and 43 deletions

View File

@ -1524,13 +1524,21 @@ public class ContractTestUtils extends Assert {
* printing some useful results in the process.
*/
public static final class NanoTimer {
private final long startTime;
private long startTime;
private long endTime;
public NanoTimer() {
startTime = now();
}
/**
* Reset the timer. Equivalent to the reset button of a stopwatch.
*/
public void reset() {
endTime = 0;
startTime = now();
}
/**
* End the operation.
* @return the duration of the operation

View File

@ -158,6 +158,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final String KEY_SELF_THROTTLE_READ_FACTOR = "fs.azure.selfthrottling.read.factor";
private static final String KEY_SELF_THROTTLE_WRITE_FACTOR = "fs.azure.selfthrottling.write.factor";
private static final String KEY_AUTO_THROTTLE_ENABLE = "fs.azure.autothrottling.enable";
private static final String KEY_ENABLE_STORAGE_CLIENT_LOGGING = "fs.azure.storage.client.logging";
/**
@ -239,10 +241,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// Retry parameter defaults.
//
private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 1 * 1000; // 1s
private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 1s
private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s
private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s
private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15;
private static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 1s
private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30;
private static final int DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL = 3 * 1000;
private static final int DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL = 90 * 1000;
@ -256,6 +258,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private static final float DEFAULT_SELF_THROTTLE_READ_FACTOR = 1.0f;
private static final float DEFAULT_SELF_THROTTLE_WRITE_FACTOR = 1.0f;
private static final boolean DEFAULT_AUTO_THROTTLE_ENABLE = false;
private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90;
/**
@ -283,7 +287,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private boolean connectingUsingSAS = false;
private AzureFileSystemInstrumentation instrumentation;
private BandwidthGaugeUpdater bandwidthGaugeUpdater;
private final static JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
private boolean suppressRetryPolicy = false;
private boolean canCreateOrModifyContainer = false;
@ -308,6 +312,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private float selfThrottlingReadFactor;
private float selfThrottlingWriteFactor;
private boolean autoThrottlingEnabled;
private TestHookOperationContext testHookOperationContext = null;
// Set if we're running against a storage emulator..
@ -664,9 +670,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private String getHTTPScheme() {
String sessionScheme = sessionUri.getScheme();
// Check if we're on a secure URI scheme: wasbs or the legacy asvs scheme.
if (sessionScheme != null &&
(sessionScheme.equalsIgnoreCase("asvs") ||
sessionScheme.equalsIgnoreCase("wasbs"))) {
if (sessionScheme != null
&& (sessionScheme.equalsIgnoreCase("asvs")
|| sessionScheme.equalsIgnoreCase("wasbs"))) {
return HTTPS_SCHEME;
} else {
// At this point the scheme should be either null or asv or wasb.
@ -766,6 +772,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
selfThrottlingWriteFactor = sessionConfiguration.getFloat(
KEY_SELF_THROTTLE_WRITE_FACTOR, DEFAULT_SELF_THROTTLE_WRITE_FACTOR);
if (!selfThrottlingEnabled) {
autoThrottlingEnabled = sessionConfiguration.getBoolean(
KEY_AUTO_THROTTLE_ENABLE,
DEFAULT_AUTO_THROTTLE_ENABLE);
if (autoThrottlingEnabled) {
ClientThrottlingIntercept.initializeSingleton();
}
} else {
// cannot enable both self-throttling and client-throttling
autoThrottlingEnabled = false;
}
OperationContext.setLoggingEnabledByDefault(sessionConfiguration.
getBoolean(KEY_ENABLE_STORAGE_CLIENT_LOGGING, false));
@ -839,8 +857,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
CloudStorageAccount.getDevelopmentStorageAccount();
storageInteractionLayer.createBlobClient(account);
} else {
blobEndPoint = new URI(getHTTPScheme() + "://" +
accountName);
blobEndPoint = new URI(getHTTPScheme() + "://" + accountName);
storageInteractionLayer.createBlobClient(blobEndPoint, credentials);
}
suppressRetryPolicyInClientIfNeeded();
@ -1128,8 +1145,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
myDir = verifyAndConvertToStandardFormat(currentDir);
} catch (URISyntaxException ex) {
throw new AzureException(String.format(
"The directory %s specified in the configuration entry %s is not" +
" a valid URI.",
"The directory %s specified in the configuration entry %s is not"
+ " a valid URI.",
currentDir, configVar));
}
if (myDir != null) {
@ -1159,8 +1176,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
public boolean isKeyForDirectorySet(String key, Set<String> dirSet) {
String defaultFS = FileSystem.getDefaultUri(sessionConfiguration).toString();
for (String dir : dirSet) {
if (dir.isEmpty() ||
key.startsWith(dir + "/")) {
if (dir.isEmpty() || key.startsWith(dir + "/")) {
return true;
}
@ -1739,10 +1755,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
private Iterable<ListBlobItem> listRootBlobs(boolean includeMetadata,
boolean useFlatBlobListing) throws StorageException, URISyntaxException {
return rootDirectory.listBlobs(
null, useFlatBlobListing,
includeMetadata ?
EnumSet.of(BlobListingDetails.METADATA) :
EnumSet.noneOf(BlobListingDetails.class),
null,
useFlatBlobListing,
includeMetadata
? EnumSet.of(BlobListingDetails.METADATA)
: EnumSet.noneOf(BlobListingDetails.class),
null,
getInstrumentedContext());
}
@ -1771,9 +1788,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
Iterable<ListBlobItem> list = rootDirectory.listBlobs(aPrefix,
useFlatBlobListing,
includeMetadata ?
EnumSet.of(BlobListingDetails.METADATA) :
EnumSet.noneOf(BlobListingDetails.class),
includeMetadata
? EnumSet.of(BlobListingDetails.METADATA)
: EnumSet.noneOf(BlobListingDetails.class),
null,
getInstrumentedContext());
return list;
@ -1941,6 +1958,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
if (selfThrottlingEnabled) {
SelfThrottlingIntercept.hook(operationContext, selfThrottlingReadFactor,
selfThrottlingWriteFactor);
} else if (autoThrottlingEnabled) {
ClientThrottlingIntercept.hook(operationContext);
}
if (bandwidthGaugeUpdater != null) {
@ -2446,10 +2465,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
// 1. It's a BlobNotFound exception AND
// 2. It got there after one-or-more retries THEN
// we swallow the exception.
if (e.getErrorCode() != null &&
"BlobNotFound".equals(e.getErrorCode()) &&
operationContext.getRequestResults().size() > 1 &&
operationContext.getRequestResults().get(0).getException() != null) {
if (e.getErrorCode() != null
&& "BlobNotFound".equals(e.getErrorCode())
&& operationContext.getRequestResults().size() > 1
&& operationContext.getRequestResults().get(0).getException() != null) {
LOG.debug("Swallowing delete exception on retry: {}", e.getMessage());
return;
} else {
@ -2561,8 +2580,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore {
srcBlob = getBlobReference(srcKey);
if (!srcBlob.exists(getInstrumentedContext())) {
throw new AzureException ("Source blob " + srcKey +
" does not exist.");
throw new AzureException("Source blob " + srcKey + " does not exist.");
}
/**

View File

@ -0,0 +1,222 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import com.microsoft.azure.storage.Constants.HeaderConstants;
import org.apache.hadoop.classification.InterfaceAudience;
import java.net.HttpURLConnection;
import java.net.URL;
/**
* Determines the operation type (PutBlock, PutPage, GetBlob, etc) of Azure
* Storage operations. This is used by the handlers of the SendingRequestEvent
* and ResponseReceivedEvent exposed by the Azure Storage SDK to identify
* operation types (since the type of operation is not exposed by the SDK).
*/
@InterfaceAudience.Private
final class BlobOperationDescriptor {
private BlobOperationDescriptor() {
// hide default constructor
}
/**
* Gets the content length for the Azure Storage operation from the
* 'x-ms-range' header, if set.
* @param range the value of the 'x-ms-range' header.
* @return the content length, or zero if not set.
*/
private static long getContentLengthIfKnown(String range) {
long contentLength = 0;
// Format is "bytes=%d-%d"
if (range != null && range.startsWith("bytes=")) {
String[] offsets = range.substring("bytes=".length()).split("-");
if (offsets.length == 2) {
contentLength = Long.parseLong(offsets[1]) - Long.parseLong(offsets[0])
+ 1;
}
}
return contentLength;
}
/**
* Gets the content length for the Azure Storage operation, or returns zero if
* unknown.
* @param conn the connection object for the Azure Storage operation.
* @param operationType the Azure Storage operation type.
* @return the content length, or zero if unknown.
*/
static long getContentLengthIfKnown(HttpURLConnection conn,
OperationType operationType) {
long contentLength = 0;
switch (operationType) {
case AppendBlock:
case PutBlock:
String lengthString = conn.getRequestProperty(
HeaderConstants.CONTENT_LENGTH);
contentLength = (lengthString != null)
? Long.parseLong(lengthString)
: 0;
break;
case PutPage:
case GetBlob:
contentLength = BlobOperationDescriptor.getContentLengthIfKnown(
conn.getRequestProperty("x-ms-range"));
break;
default:
break;
}
return contentLength;
}
/**
* Gets the operation type of an Azure Storage operation.
*
* @param conn the connection object for the Azure Storage operation.
* @return the operation type.
*/
static OperationType getOperationType(HttpURLConnection conn) {
OperationType operationType = OperationType.Unknown;
String method = conn.getRequestMethod();
String compValue = getQueryParameter(conn.getURL(),
"comp");
if (method.equalsIgnoreCase("PUT")) {
if (compValue != null) {
switch (compValue) {
case "metadata":
operationType = OperationType.SetMetadata;
break;
case "properties":
operationType = OperationType.SetProperties;
break;
case "block":
operationType = OperationType.PutBlock;
break;
case "page":
String pageWrite = conn.getRequestProperty("x-ms-page-write");
if (pageWrite != null && pageWrite.equalsIgnoreCase(
"UPDATE")) {
operationType = OperationType.PutPage;
}
break;
case "appendblock":
operationType = OperationType.AppendBlock;
break;
case "blocklist":
operationType = OperationType.PutBlockList;
break;
default:
break;
}
} else {
String blobType = conn.getRequestProperty("x-ms-blob-type");
if (blobType != null
&& (blobType.equalsIgnoreCase("PageBlob")
|| blobType.equalsIgnoreCase("BlockBlob")
|| blobType.equalsIgnoreCase("AppendBlob"))) {
operationType = OperationType.CreateBlob;
} else if (blobType == null) {
String resType = getQueryParameter(conn.getURL(),
"restype");
if (resType != null
&& resType.equalsIgnoreCase("container")) {
operationType = operationType.CreateContainer;
}
}
}
} else if (method.equalsIgnoreCase("GET")) {
if (compValue != null) {
switch (compValue) {
case "list":
operationType = OperationType.ListBlobs;
break;
case "metadata":
operationType = OperationType.GetMetadata;
break;
case "blocklist":
operationType = OperationType.GetBlockList;
break;
case "pagelist":
operationType = OperationType.GetPageList;
break;
default:
break;
}
} else if (conn.getRequestProperty("x-ms-range") != null) {
operationType = OperationType.GetBlob;
}
} else if (method.equalsIgnoreCase("HEAD")) {
operationType = OperationType.GetProperties;
} else if (method.equalsIgnoreCase("DELETE")) {
String resType = getQueryParameter(conn.getURL(),
"restype");
if (resType != null
&& resType.equalsIgnoreCase("container")) {
operationType = operationType.DeleteContainer;
} else {
operationType = OperationType.DeleteBlob;
}
}
return operationType;
}
private static String getQueryParameter(URL url, String queryParameterName) {
String query = (url != null) ? url.getQuery(): null;
if (query == null) {
return null;
}
String searchValue = queryParameterName + "=";
int offset = query.indexOf(searchValue);
String value = null;
if (offset != -1) {
int beginIndex = offset + searchValue.length();
int endIndex = query.indexOf('&', beginIndex);
value = (endIndex == -1)
? query.substring(beginIndex)
: query.substring(beginIndex, endIndex);
}
return value;
}
@InterfaceAudience.Private
enum OperationType {
AppendBlock,
CreateBlob,
CreateContainer,
DeleteBlob,
DeleteContainer,
GetBlob,
GetBlockList,
GetMetadata,
GetPageList,
GetProperties,
ListBlobs,
PutBlock,
PutBlockList,
PutPage,
SetMetadata,
SetProperties,
Unknown
}
}

View File

@ -0,0 +1,284 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
/**
* Throttles storage operations to minimize errors and maximum throughput. This
* improves throughput by as much as 35% when the service throttles requests due
* to exceeding account level ingress or egress limits.
*/
@InterfaceAudience.Private
class ClientThrottlingAnalyzer {
private static final Logger LOG = LoggerFactory.getLogger(
ClientThrottlingAnalyzer.class);
private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1;
private static final double RAPID_SLEEP_DECREASE_FACTOR = .75;
private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150
* 1000;
private static final double SLEEP_DECREASE_FACTOR = .975;
private static final double SLEEP_INCREASE_FACTOR = 1.05;
private int analysisPeriodMs;
private volatile int sleepDuration = 0;
private long consecutiveNoErrorCount = 0;
private String name = null;
private Timer timer = null;
private AtomicReference<BlobOperationMetrics> blobMetrics = null;
private ClientThrottlingAnalyzer() {
// hide default constructor
}
/**
* Creates an instance of the <code>ClientThrottlingAnalyzer</code> class with
* the specified name.
*
* @param name a name used to identify this instance.
*
* @throws IllegalArgumentException if name is null or empty.
*/
ClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
this(name, DEFAULT_ANALYSIS_PERIOD_MS);
}
/**
* Creates an instance of the <code>ClientThrottlingAnalyzer</code> class with
* the specified name and period.
*
* @param name A name used to identify this instance.
*
* @param period The frequency, in milliseconds, at which metrics are
* analyzed.
*
* @throws IllegalArgumentException
* If name is null or empty.
* If period is less than 1000 or greater than 30000 milliseconds.
*/
ClientThrottlingAnalyzer(String name, int period)
throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotEmpty(name),
"The argument 'name' cannot be null or empty.");
Preconditions.checkArgument(
period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
"The argument 'period' must be between 1000 and 30000.");
this.name = name;
this.analysisPeriodMs = period;
this.blobMetrics = new AtomicReference<BlobOperationMetrics>(
new BlobOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer(
String.format("wasb-timer-client-throttling-analyzer-%s", name));
this.timer.schedule(new TimerTaskImpl(),
analysisPeriodMs,
analysisPeriodMs);
}
/**
* Updates metrics with results from the current storage operation.
*
* @param count The count of bytes transferred.
*
* @param isFailedOperation True if the operation failed; otherwise false.
*/
public void addBytesTransferred(long count, boolean isFailedOperation) {
BlobOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
metrics.bytesFailed.addAndGet(count);
metrics.operationsFailed.incrementAndGet();
} else {
metrics.bytesSuccessful.addAndGet(count);
metrics.operationsSuccessful.incrementAndGet();
}
}
/**
* Suspends the current storage operation, as necessary, to reduce throughput.
*/
public void suspendIfNecessary() {
int duration = sleepDuration;
if (duration > 0) {
try {
Thread.sleep(duration);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
@VisibleForTesting
int getSleepDuration() {
return sleepDuration;
}
private int analyzeMetricsAndUpdateSleepDuration(BlobOperationMetrics metrics,
int sleepDuration) {
final double percentageConversionFactor = 100;
double bytesFailed = metrics.bytesFailed.get();
double bytesSuccessful = metrics.bytesSuccessful.get();
double operationsFailed = metrics.operationsFailed.get();
double operationsSuccessful = metrics.operationsSuccessful.get();
double errorPercentage = (bytesFailed <= 0)
? 0
: percentageConversionFactor
* bytesFailed
/ (bytesFailed + bytesSuccessful);
long periodMs = metrics.endTime - metrics.startTime;
double newSleepDuration;
if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) {
++consecutiveNoErrorCount;
// Decrease sleepDuration in order to increase throughput.
double reductionFactor =
(consecutiveNoErrorCount * analysisPeriodMs
>= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS)
? RAPID_SLEEP_DECREASE_FACTOR
: SLEEP_DECREASE_FACTOR;
newSleepDuration = sleepDuration * reductionFactor;
} else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) {
// Do not modify sleepDuration in order to stabilize throughput.
newSleepDuration = sleepDuration;
} else {
// Increase sleepDuration in order to minimize error rate.
consecutiveNoErrorCount = 0;
// Increase sleep duration in order to reduce throughput and error rate.
// First, calculate target throughput: bytesSuccessful / periodMs.
// Next, calculate time required to send *all* data (assuming next period
// is similar to previous) at the target throughput: (bytesSuccessful
// + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to
// get the total additional delay needed.
double additionalDelayNeeded = 5 * analysisPeriodMs;
if (bytesSuccessful > 0) {
additionalDelayNeeded = (bytesSuccessful + bytesFailed)
* periodMs
/ bytesSuccessful
- periodMs;
}
// amortize the additional delay needed across the estimated number of
// requests during the next period
newSleepDuration = additionalDelayNeeded
/ (operationsFailed + operationsSuccessful);
final double maxSleepDuration = analysisPeriodMs;
final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR;
// Add 1 ms to avoid rounding down and to decrease proximity to the server
// side ingress/egress limit. Ensure that the new sleep duration is
// larger than the current one to more quickly reduce the number of
// errors. Don't allow the sleep duration to grow unbounded, after a
// certain point throttling won't help, for example, if there are far too
// many tasks/containers/nodes no amount of throttling will help.
newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1;
newSleepDuration = Math.min(newSleepDuration, maxSleepDuration);
}
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d",
name,
(int) bytesFailed,
(int) bytesSuccessful,
(int) operationsFailed,
(int) operationsSuccessful,
errorPercentage,
periodMs,
(int) sleepDuration,
(int) newSleepDuration));
}
return (int) newSleepDuration;
}
/**
* Timer callback implementation for periodically analyzing metrics.
*/
class TimerTaskImpl extends TimerTask {
private AtomicInteger doingWork = new AtomicInteger(0);
/**
* Periodically analyzes a snapshot of the blob storage metrics and updates
* the sleepDuration in order to appropriately throttle storage operations.
*/
@Override
public void run() {
boolean doWork = false;
try {
doWork = doingWork.compareAndSet(0, 1);
// prevent concurrent execution of this task
if (!doWork) {
return;
}
long now = System.currentTimeMillis();
if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
BlobOperationMetrics oldMetrics = blobMetrics.getAndSet(
new BlobOperationMetrics(now));
oldMetrics.endTime = now;
sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
sleepDuration);
}
}
finally {
if (doWork) {
doingWork.set(0);
}
}
}
}
/**
* Stores blob operation metrics during each analysis period.
*/
static class BlobOperationMetrics {
private AtomicLong bytesFailed;
private AtomicLong bytesSuccessful;
private AtomicLong operationsFailed;
private AtomicLong operationsSuccessful;
private long endTime;
private long startTime;
BlobOperationMetrics(long startTime) {
this.startTime = startTime;
this.bytesFailed = new AtomicLong();
this.bytesSuccessful = new AtomicLong();
this.operationsFailed = new AtomicLong();
this.operationsSuccessful = new AtomicLong();
}
}
}

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import com.microsoft.azure.storage.ErrorReceivingResponseEvent;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RequestResult;
import com.microsoft.azure.storage.ResponseReceivedEvent;
import com.microsoft.azure.storage.SendingRequestEvent;
import com.microsoft.azure.storage.StorageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import java.net.HttpURLConnection;
/**
* Throttles Azure Storage read and write operations to achieve maximum
* throughput by minimizing errors. The errors occur when the account ingress
* or egress limits are exceeded and the server-side throttles requests.
* Server-side throttling causes the retry policy to be used, but the retry
* policy sleeps for long periods of time causing the total ingress or egress
* throughput to be as much as 35% lower than optimal. The retry policy is also
* after the fact, in that it applies after a request fails. On the other hand,
* the client-side throttling implemented here happens before requests are made
* and sleeps just enough to minimize errors, allowing optimal ingress and/or
* egress throughput.
*/
@InterfaceAudience.Private
final class ClientThrottlingIntercept {
private static final Logger LOG = LoggerFactory.getLogger(
ClientThrottlingIntercept.class);
private static ClientThrottlingIntercept singleton = null;
private ClientThrottlingAnalyzer readThrottler = null;
private ClientThrottlingAnalyzer writeThrottler = null;
// Hide default constructor
private ClientThrottlingIntercept() {
readThrottler = new ClientThrottlingAnalyzer("read");
writeThrottler = new ClientThrottlingAnalyzer("write");
LOG.debug("Client-side throttling is enabled for the WASB file system.");
}
static synchronized void initializeSingleton() {
if (singleton == null) {
singleton = new ClientThrottlingIntercept();
}
}
static void hook(OperationContext context) {
context.getErrorReceivingResponseEventHandler().addListener(
new ErrorReceivingResponseEventHandler());
context.getSendingRequestEventHandler().addListener(
new SendingRequestEventHandler());
context.getResponseReceivedEventHandler().addListener(
new ResponseReceivedEventHandler());
}
private static void updateMetrics(HttpURLConnection conn,
RequestResult result) {
BlobOperationDescriptor.OperationType operationType
= BlobOperationDescriptor.getOperationType(conn);
int status = result.getStatusCode();
long contentLength = 0;
// If the socket is terminated prior to receiving a response, the HTTP
// status may be 0 or -1. A status less than 200 or greater than or equal
// to 500 is considered an error.
boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK
|| status >= java.net.HttpURLConnection.HTTP_INTERNAL_ERROR);
switch (operationType) {
case AppendBlock:
case PutBlock:
case PutPage:
contentLength = BlobOperationDescriptor.getContentLengthIfKnown(conn,
operationType);
if (contentLength > 0) {
singleton.writeThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
case GetBlob:
contentLength = BlobOperationDescriptor.getContentLengthIfKnown(conn,
operationType);
if (contentLength > 0) {
singleton.readThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
default:
break;
}
}
/**
* Called when a network error occurs before the HTTP status and response
* headers are received. Client-side throttling uses this to collect metrics.
*
* @param event The connection, operation, and request state.
*/
public static void errorReceivingResponse(ErrorReceivingResponseEvent event) {
updateMetrics((HttpURLConnection) event.getConnectionObject(),
event.getRequestResult());
}
/**
* Called before the Azure Storage SDK sends a request. Client-side throttling
* uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput.
*
* @param event The connection, operation, and request state.
*/
public static void sendingRequest(SendingRequestEvent event) {
BlobOperationDescriptor.OperationType operationType
= BlobOperationDescriptor.getOperationType(
(HttpURLConnection) event.getConnectionObject());
switch (operationType) {
case GetBlob:
singleton.readThrottler.suspendIfNecessary();
break;
case AppendBlock:
case PutBlock:
case PutPage:
singleton.writeThrottler.suspendIfNecessary();
break;
default:
break;
}
}
/**
* Called after the Azure Storage SDK receives a response. Client-side
* throttling uses this to collect metrics.
*
* @param event The connection, operation, and request state.
*/
public static void responseReceived(ResponseReceivedEvent event) {
updateMetrics((HttpURLConnection) event.getConnectionObject(),
event.getRequestResult());
}
/**
* The ErrorReceivingResponseEvent is fired when the Azure Storage SDK
* encounters a network error before the HTTP status and response headers are
* received.
*/
@InterfaceAudience.Private
static class ErrorReceivingResponseEventHandler
extends StorageEvent<ErrorReceivingResponseEvent> {
/**
* Called when a network error occurs before the HTTP status and response
* headers are received. Client-side throttling uses this to collect
* metrics.
*
* @param event The connection, operation, and request state.
*/
@Override
public void eventOccurred(ErrorReceivingResponseEvent event) {
singleton.errorReceivingResponse(event);
}
}
/**
* The SendingRequestEvent is fired before the Azure Storage SDK sends a
* request.
*/
@InterfaceAudience.Private
static class SendingRequestEventHandler
extends StorageEvent<SendingRequestEvent> {
/**
* Called before the Azure Storage SDK sends a request. Client-side
* throttling uses this to suspend the request, if necessary, to minimize
* errors and maximize throughput.
*
* @param event The connection, operation, and request state.
*/
@Override
public void eventOccurred(SendingRequestEvent event) {
singleton.sendingRequest(event);
}
}
/**
* The ResponseReceivedEvent is fired after the Azure Storage SDK receives a
* response.
*/
@InterfaceAudience.Private
static class ResponseReceivedEventHandler
extends StorageEvent<ResponseReceivedEvent> {
/**
* Called after the Azure Storage SDK receives a response. Client-side
* throttling uses this
* to collect metrics.
*
* @param event The connection, operation, and request state.
*/
@Override
public void eventOccurred(ResponseReceivedEvent event) {
singleton.responseReceived(event);
}
}
}

View File

@ -67,4 +67,8 @@ public abstract class AbstractWasbTestBase {
protected abstract AzureBlobStorageTestAccount createTestAccount()
throws Exception;
protected AzureBlobStorageTestAccount getTestAccount() {
return testAccount;
}
}

View File

@ -0,0 +1,305 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.ResponseReceivedEvent;
import com.microsoft.azure.storage.SendingRequestEvent;
import com.microsoft.azure.storage.StorageEvent;
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobOutputStream;
import com.microsoft.azure.storage.blob.CloudAppendBlob;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.CloudPageBlob;
import org.apache.hadoop.classification.InterfaceAudience;
import org.junit.Test;
import java.net.HttpURLConnection;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertEquals;
/**
* Tests for <code>BlobOperationDescriptor</code>.
*/
public class TestBlobOperationDescriptor extends AbstractWasbTestBase {
private BlobOperationDescriptor.OperationType lastOperationTypeReceived;
private BlobOperationDescriptor.OperationType lastOperationTypeSent;
private long lastContentLengthReceived;
@Override
protected AzureBlobStorageTestAccount createTestAccount() throws Exception {
return AzureBlobStorageTestAccount.create();
}
@Test
public void testAppendBlockOperations() throws Exception {
CloudBlobContainer container = getTestAccount().getRealContainer();
OperationContext context = new OperationContext();
context.getResponseReceivedEventHandler().addListener(
new ResponseReceivedEventHandler());
context.getSendingRequestEventHandler().addListener(
new SendingRequestEventHandler());
CloudAppendBlob appendBlob = container.getAppendBlobReference(
"testAppendBlockOperations");
assertNull(lastOperationTypeSent);
assertNull(lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
try (
BlobOutputStream output
= appendBlob.openWriteNew(null, null, context);
) {
assertEquals(BlobOperationDescriptor.OperationType.CreateBlob,
lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
String message = "this is a test";
output.write(message.getBytes("UTF-8"));
output.flush();
assertEquals(BlobOperationDescriptor.OperationType.AppendBlock,
lastOperationTypeSent);
assertEquals(BlobOperationDescriptor.OperationType.AppendBlock,
lastOperationTypeReceived);
assertEquals(message.length(), lastContentLengthReceived);
}
}
@Test
public void testPutBlockOperations() throws Exception {
CloudBlobContainer container = getTestAccount().getRealContainer();
OperationContext context = new OperationContext();
context.getResponseReceivedEventHandler().addListener(
new ResponseReceivedEventHandler());
context.getSendingRequestEventHandler().addListener(
new SendingRequestEventHandler());
CloudBlockBlob blockBlob = container.getBlockBlobReference(
"testPutBlockOperations");
assertNull(lastOperationTypeSent);
assertNull(lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
try (
BlobOutputStream output
= blockBlob.openOutputStream(null,
null,
context);
) {
assertNull(lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
String message = "this is a test";
output.write(message.getBytes("UTF-8"));
output.flush();
assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
lastOperationTypeSent);
assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
lastOperationTypeReceived);
assertEquals(message.length(), lastContentLengthReceived);
}
assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
lastOperationTypeSent);
assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
}
@Test
public void testPutPageOperations() throws Exception {
CloudBlobContainer container = getTestAccount().getRealContainer();
OperationContext context = new OperationContext();
context.getResponseReceivedEventHandler().addListener(
new ResponseReceivedEventHandler());
context.getSendingRequestEventHandler().addListener(
new SendingRequestEventHandler());
CloudPageBlob pageBlob = container.getPageBlobReference(
"testPutPageOperations");
assertNull(lastOperationTypeSent);
assertNull(lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
try (
BlobOutputStream output = pageBlob.openWriteNew(1024,
null,
null,
context);
) {
assertEquals(BlobOperationDescriptor.OperationType.CreateBlob,
lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
final int pageSize = 512;
byte[] buffer = new byte[pageSize];
output.write(buffer);
output.flush();
assertEquals(BlobOperationDescriptor.OperationType.PutPage,
lastOperationTypeSent);
assertEquals(BlobOperationDescriptor.OperationType.PutPage,
lastOperationTypeReceived);
assertEquals(buffer.length, lastContentLengthReceived);
}
}
@Test
public void testGetBlobOperations() throws Exception {
CloudBlobContainer container = getTestAccount().getRealContainer();
OperationContext context = new OperationContext();
context.getResponseReceivedEventHandler().addListener(
new ResponseReceivedEventHandler());
context.getSendingRequestEventHandler().addListener(
new SendingRequestEventHandler());
CloudBlockBlob blockBlob = container.getBlockBlobReference(
"testGetBlobOperations");
assertNull(lastOperationTypeSent);
assertNull(lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
String message = "this is a test";
try (
BlobOutputStream output = blockBlob.openOutputStream(null,
null,
context);
) {
assertNull(lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
output.write(message.getBytes("UTF-8"));
output.flush();
assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
lastOperationTypeSent);
assertEquals(BlobOperationDescriptor.OperationType.PutBlock,
lastOperationTypeReceived);
assertEquals(message.length(), lastContentLengthReceived);
}
assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
lastOperationTypeSent);
assertEquals(BlobOperationDescriptor.OperationType.PutBlockList,
lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
try (
BlobInputStream input = blockBlob.openInputStream(null,
null,
context);
) {
assertEquals(BlobOperationDescriptor.OperationType.GetProperties,
lastOperationTypeSent);
assertEquals(BlobOperationDescriptor.OperationType.GetProperties,
lastOperationTypeReceived);
assertEquals(0, lastContentLengthReceived);
byte[] buffer = new byte[1024];
int numBytesRead = input.read(buffer);
assertEquals(BlobOperationDescriptor.OperationType.GetBlob,
lastOperationTypeSent);
assertEquals(BlobOperationDescriptor.OperationType.GetBlob,
lastOperationTypeReceived);
assertEquals(message.length(), lastContentLengthReceived);
assertEquals(numBytesRead, lastContentLengthReceived);
}
}
/**
* Called after the Azure Storage SDK receives a response.
*
* @param event The connection, operation, and request state.
*/
private void responseReceived(ResponseReceivedEvent event) {
HttpURLConnection conn = (HttpURLConnection) event.getConnectionObject();
BlobOperationDescriptor.OperationType operationType
= BlobOperationDescriptor.getOperationType(conn);
lastOperationTypeReceived = operationType;
switch (operationType) {
case AppendBlock:
case PutBlock:
case PutPage:
lastContentLengthReceived
= BlobOperationDescriptor.getContentLengthIfKnown(conn,
operationType);
break;
case GetBlob:
lastContentLengthReceived
= BlobOperationDescriptor.getContentLengthIfKnown(conn,
operationType);
break;
default:
lastContentLengthReceived = 0;
break;
}
}
/**
* Called before the Azure Storage SDK sends a request.
*
* @param event The connection, operation, and request state.
*/
private void sendingRequest(SendingRequestEvent event) {
this.lastOperationTypeSent
= BlobOperationDescriptor.getOperationType(
(HttpURLConnection) event.getConnectionObject());
}
/**
* The ResponseReceivedEvent is fired after the Azure Storage SDK receives a
* response.
*/
@InterfaceAudience.Private
class ResponseReceivedEventHandler
extends StorageEvent<ResponseReceivedEvent> {
/**
* Called after the Azure Storage SDK receives a response.
*
* @param event The connection, operation, and request state.
*/
@Override
public void eventOccurred(ResponseReceivedEvent event) {
responseReceived(event);
}
}
/**
* The SendingRequestEvent is fired before the Azure Storage SDK sends a
* request.
*/
@InterfaceAudience.Private
class SendingRequestEventHandler extends StorageEvent<SendingRequestEvent> {
/**
* Called before the Azure Storage SDK sends a request.
*
* @param event The connection, operation, and request state.
*/
@Override
public void eventOccurred(SendingRequestEvent event) {
sendingRequest(event);
}
}
}

View File

@ -0,0 +1,177 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure;
import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
/**
* Tests for <code>ClientThrottlingAnalyzer</code>.
*/
public class TestClientThrottlingAnalyzer {
private static final int ANALYSIS_PERIOD = 1000;
private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD
+ ANALYSIS_PERIOD / 10;
private static final long MEGABYTE = 1024 * 1024;
private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
private void sleep(long milliseconds) {
try {
Thread.sleep(milliseconds);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void fuzzyValidate(long expected, long actual, double percentage) {
final double lowerBound = Math.max(expected - percentage / 100 * expected, 0);
final double upperBound = expected + percentage / 100 * expected;
assertTrue(
String.format(
"The actual value %1$d is not within the expected range: "
+ "[%2$.2f, %3$.2f].",
actual,
lowerBound,
upperBound),
actual >= lowerBound && actual <= upperBound);
}
private void validate(long expected, long actual) {
assertEquals(
String.format("The actual value %1$d is not the expected value %2$d.",
actual,
expected),
expected, actual);
}
private void validateLessThanOrEqual(long maxExpected, long actual) {
assertTrue(
String.format(
"The actual value %1$d is not less than or equal to the maximum"
+ " expected value %2$d.",
actual,
maxExpected),
actual < maxExpected);
}
/**
* Ensure that there is no waiting (sleepDuration = 0) if the metrics have
* never been updated. This validates proper initialization of
* ClientThrottlingAnalyzer.
*/
@Test
public void testNoMetricUpdatesThenNoWaiting() {
ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
validate(0, analyzer.getSleepDuration());
}
/**
* Ensure that there is no waiting (sleepDuration = 0) if the metrics have
* only been updated with successful requests.
*/
@Test
public void testOnlySuccessThenNoWaiting() {
ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
analyzer.addBytesTransferred(8 * MEGABYTE, false);
validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
validate(0, analyzer.getSleepDuration());
}
/**
* Ensure that there is waiting (sleepDuration != 0) if the metrics have
* only been updated with failed requests. Also ensure that the
* sleepDuration decreases over time.
*/
@Test
public void testOnlyErrorsAndWaiting() {
ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(4 * MEGABYTE, true);
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
final int expectedSleepDuration1 = 1100;
validateLessThanOrEqual(expectedSleepDuration1, analyzer.getSleepDuration());
sleep(10 * ANALYSIS_PERIOD);
final int expectedSleepDuration2 = 900;
validateLessThanOrEqual(expectedSleepDuration2, analyzer.getSleepDuration());
}
/**
* Ensure that there is waiting (sleepDuration != 0) if the metrics have
* only been updated with both successful and failed requests. Also ensure
* that the sleepDuration decreases over time.
*/
@Test
public void testSuccessAndErrorsAndWaiting() {
ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(8 * MEGABYTE, false);
analyzer.addBytesTransferred(2 * MEGABYTE, true);
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
NanoTimer timer = new NanoTimer();
analyzer.suspendIfNecessary();
final int expectedElapsedTime = 126;
fuzzyValidate(expectedElapsedTime,
timer.elapsedTimeMs(),
MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
sleep(10 * ANALYSIS_PERIOD);
final int expectedSleepDuration = 110;
validateLessThanOrEqual(expectedSleepDuration, analyzer.getSleepDuration());
}
/**
* Ensure that there is waiting (sleepDuration != 0) if the metrics have
* only been updated with many successful and failed requests. Also ensure
* that the sleepDuration decreases to zero over time.
*/
@Test
public void testManySuccessAndErrorsAndWaiting() {
ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
final int numberOfRequests = 20;
for (int i = 0; i < numberOfRequests; i++) {
analyzer.addBytesTransferred(8 * MEGABYTE, false);
analyzer.addBytesTransferred(2 * MEGABYTE, true);
}
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
NanoTimer timer = new NanoTimer();
analyzer.suspendIfNecessary();
fuzzyValidate(7,
timer.elapsedTimeMs(),
MAX_ACCEPTABLE_PERCENT_DIFFERENCE);
sleep(10 * ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration());
}
}