diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java index 850e552758d..859a608a1e1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java @@ -99,7 +99,7 @@ private ClientThrottlingAnalyzer() { this.blobMetrics = new AtomicReference( new BlobOperationMetrics(System.currentTimeMillis())); this.timer = new Timer( - String.format("wasb-timer-client-throttling-analyzer-%s", name)); + String.format("wasb-timer-client-throttling-analyzer-%s", name), true); this.timer.schedule(new TimerTaskImpl(), analysisPeriodMs, analysisPeriodMs); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 924bc3e8d9e..518fef9495f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -141,6 +141,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ENABLE_FLUSH) private boolean enableFlush; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_AUTOTHROTTLING, + DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING) + private boolean enableAutoThrottling; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_USER_AGENT_PREFIX_KEY, DefaultValue = "") private String userAgentId; @@ -279,6 +283,10 @@ public boolean isFlushEnabled() { return this.enableFlush; } + public boolean isAutoThrottlingEnabled() { + return this.enableAutoThrottling; + } + public String getCustomUserAgentPrefix() { return this.userAgentId; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index b8091921079..c0ecc355007 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -38,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -130,6 +131,8 @@ public void initialize(URI uri, Configuration configuration) this.delegationTokenManager = abfsStore.getAbfsConfiguration().getDelegationTokenManager(); } } + + AbfsClientThrottlingIntercept.initializeSingleton(abfsStore.getAbfsConfiguration().isAutoThrottlingEnabled()); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index ca4c9c3005d..52367193045 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -47,7 +47,7 @@ public final class ConfigurationKeys { public static final String AZURE_TOLERATE_CONCURRENT_APPEND = "fs.azure.io.read.tolerate.concurrent.append"; public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization"; public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization"; - public static final String FS_AZURE_AUTOTHROTTLING_ENABLE = "fs.azure.autothrottling.enable"; + public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; public static final String FS_AZURE_READ_AHEAD_QUEUE_DEPTH = "fs.azure.readaheadqueue.depth"; public static final String FS_AZURE_ENABLE_FLUSH = "fs.azure.enable.flush"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index a921faf8d59..a9412a961c0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -57,6 +57,7 @@ public final class FileSystemConfigurations { public static final int DEFAULT_READ_AHEAD_QUEUE_DEPTH = -1; public static final boolean DEFAULT_ENABLE_FLUSH = true; + public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true; public static final SSLSocketFactoryEx.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE = SSLSocketFactoryEx.SSLChannelMode.Default; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 18773b6881b..258045a4738 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -125,6 +125,7 @@ public AbfsRestOperation createFilesystem() throws AzureBlobFileSystemException final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.CreateFileSystem, this, HTTP_METHOD_PUT, url, @@ -148,6 +149,7 @@ public AbfsRestOperation setFilesystemProperties(final String properties) throws final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.SetFileSystemProperties, this, HTTP_METHOD_PUT, url, @@ -170,6 +172,7 @@ public AbfsRestOperation listPath(final String relativePath, final boolean recur final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.ListPaths, this, HTTP_METHOD_GET, url, @@ -186,6 +189,7 @@ public AbfsRestOperation getFilesystemProperties() throws AzureBlobFileSystemExc final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.GetFileSystemProperties, this, HTTP_METHOD_HEAD, url, @@ -202,6 +206,7 @@ public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException final URL url = createRequestUrl(abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.DeleteFileSystem, this, HTTP_METHOD_DELETE, url, @@ -230,6 +235,7 @@ public AbfsRestOperation createPath(final String path, final boolean isFile, fin final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.CreatePath, this, HTTP_METHOD_PUT, url, @@ -251,6 +257,7 @@ public AbfsRestOperation renamePath(final String source, final String destinatio final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.RenamePath, this, HTTP_METHOD_PUT, url, @@ -273,6 +280,7 @@ public AbfsRestOperation append(final String path, final long position, final by final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.Append, this, HTTP_METHOD_PUT, url, @@ -296,6 +304,7 @@ public AbfsRestOperation flush(final String path, final long position, boolean r final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.Flush, this, HTTP_METHOD_PUT, url, @@ -319,6 +328,7 @@ public AbfsRestOperation setPathProperties(final String path, final String prope final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.SetPathProperties, this, HTTP_METHOD_PUT, url, @@ -334,6 +344,7 @@ public AbfsRestOperation getPathProperties(final String path) throws AzureBlobFi final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.GetPathProperties, this, HTTP_METHOD_HEAD, url, @@ -354,6 +365,7 @@ public AbfsRestOperation read(final String path, final long position, final byte final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.ReadFile, this, HTTP_METHOD_GET, url, @@ -376,6 +388,7 @@ public AbfsRestOperation deletePath(final String path, final boolean recursive, final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.DeletePath, this, HTTP_METHOD_DELETE, url, @@ -404,6 +417,7 @@ public AbfsRestOperation setOwner(final String path, final String owner, final S final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.SetOwner, this, AbfsHttpConstants.HTTP_METHOD_PUT, url, @@ -427,6 +441,7 @@ public AbfsRestOperation setPermission(final String path, final String permissio final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.SetPermissions, this, AbfsHttpConstants.HTTP_METHOD_PUT, url, @@ -458,6 +473,7 @@ public AbfsRestOperation setAcl(final String path, final String aclSpecString, f final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.SetAcl, this, AbfsHttpConstants.HTTP_METHOD_PUT, url, @@ -474,6 +490,7 @@ public AbfsRestOperation getAclStatus(final String path) throws AzureBlobFileSys final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString()); final AbfsRestOperation op = new AbfsRestOperation( + AbfsRestOperationType.GetAcl, this, AbfsHttpConstants.HTTP_METHOD_HEAD, url, diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java new file mode 100644 index 00000000000..f1e5aaae683 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class AbfsClientThrottlingAnalyzer { + private static final Logger LOG = LoggerFactory.getLogger( + AbfsClientThrottlingAnalyzer.class); + private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000; + private static final int MIN_ANALYSIS_PERIOD_MS = 1000; + private static final int MAX_ANALYSIS_PERIOD_MS = 30000; + private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1; + private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1; + private static final double RAPID_SLEEP_DECREASE_FACTOR = .75; + private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150 + * 1000; + private static final double SLEEP_DECREASE_FACTOR = .975; + private static final double SLEEP_INCREASE_FACTOR = 1.05; + private int analysisPeriodMs; + + private volatile int sleepDuration = 0; + private long consecutiveNoErrorCount = 0; + private String name = null; + private Timer timer = null; + private AtomicReference blobMetrics = null; + + private AbfsClientThrottlingAnalyzer() { + // hide default constructor + } + + /** + * Creates an instance of the AbfsClientThrottlingAnalyzer class with + * the specified name. + * + * @param name a name used to identify this instance. + * @throws IllegalArgumentException if name is null or empty. + */ + AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException { + this(name, DEFAULT_ANALYSIS_PERIOD_MS); + } + + /** + * Creates an instance of the AbfsClientThrottlingAnalyzer class with + * the specified name and period. + * + * @param name A name used to identify this instance. + * @param period The frequency, in milliseconds, at which metrics are + * analyzed. + * @throws IllegalArgumentException If name is null or empty. + * If period is less than 1000 or greater than 30000 milliseconds. + */ + AbfsClientThrottlingAnalyzer(String name, int period) + throws IllegalArgumentException { + Preconditions.checkArgument( + StringUtils.isNotEmpty(name), + "The argument 'name' cannot be null or empty."); + Preconditions.checkArgument( + period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS, + "The argument 'period' must be between 1000 and 30000."); + this.name = name; + this.analysisPeriodMs = period; + this.blobMetrics = new AtomicReference( + new AbfsOperationMetrics(System.currentTimeMillis())); + this.timer = new Timer( + String.format("abfs-timer-client-throttling-analyzer-%s", name), true); + this.timer.schedule(new TimerTaskImpl(), + analysisPeriodMs, + analysisPeriodMs); + } + + /** + * Updates metrics with results from the current storage operation. + * + * @param count The count of bytes transferred. + * @param isFailedOperation True if the operation failed; otherwise false. + */ + public void addBytesTransferred(long count, boolean isFailedOperation) { + AbfsOperationMetrics metrics = blobMetrics.get(); + if (isFailedOperation) { + metrics.bytesFailed.addAndGet(count); + metrics.operationsFailed.incrementAndGet(); + } else { + metrics.bytesSuccessful.addAndGet(count); + metrics.operationsSuccessful.incrementAndGet(); + } + } + + /** + * Suspends the current storage operation, as necessary, to reduce throughput. + */ + public void suspendIfNecessary() { + int duration = sleepDuration; + if (duration > 0) { + try { + Thread.sleep(duration); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } + + @VisibleForTesting + int getSleepDuration() { + return sleepDuration; + } + + private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics, + int sleepDuration) { + final double percentageConversionFactor = 100; + double bytesFailed = metrics.bytesFailed.get(); + double bytesSuccessful = metrics.bytesSuccessful.get(); + double operationsFailed = metrics.operationsFailed.get(); + double operationsSuccessful = metrics.operationsSuccessful.get(); + double errorPercentage = (bytesFailed <= 0) + ? 0 + : (percentageConversionFactor + * bytesFailed + / (bytesFailed + bytesSuccessful)); + long periodMs = metrics.endTime - metrics.startTime; + + double newSleepDuration; + + if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) { + ++consecutiveNoErrorCount; + // Decrease sleepDuration in order to increase throughput. + double reductionFactor = + (consecutiveNoErrorCount * analysisPeriodMs + >= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS) + ? RAPID_SLEEP_DECREASE_FACTOR + : SLEEP_DECREASE_FACTOR; + + newSleepDuration = sleepDuration * reductionFactor; + } else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) { + // Do not modify sleepDuration in order to stabilize throughput. + newSleepDuration = sleepDuration; + } else { + // Increase sleepDuration in order to minimize error rate. + consecutiveNoErrorCount = 0; + + // Increase sleep duration in order to reduce throughput and error rate. + // First, calculate target throughput: bytesSuccessful / periodMs. + // Next, calculate time required to send *all* data (assuming next period + // is similar to previous) at the target throughput: (bytesSuccessful + // + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to + // get the total additional delay needed. + double additionalDelayNeeded = 5 * analysisPeriodMs; + if (bytesSuccessful > 0) { + additionalDelayNeeded = (bytesSuccessful + bytesFailed) + * periodMs + / bytesSuccessful + - periodMs; + } + + // amortize the additional delay needed across the estimated number of + // requests during the next period + newSleepDuration = additionalDelayNeeded + / (operationsFailed + operationsSuccessful); + + final double maxSleepDuration = analysisPeriodMs; + final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR; + + // Add 1 ms to avoid rounding down and to decrease proximity to the server + // side ingress/egress limit. Ensure that the new sleep duration is + // larger than the current one to more quickly reduce the number of + // errors. Don't allow the sleep duration to grow unbounded, after a + // certain point throttling won't help, for example, if there are far too + // many tasks/containers/nodes no amount of throttling will help. + newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1; + newSleepDuration = Math.min(newSleepDuration, maxSleepDuration); + } + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d", + name, + (int) bytesFailed, + (int) bytesSuccessful, + (int) operationsFailed, + (int) operationsSuccessful, + errorPercentage, + periodMs, + (int) sleepDuration, + (int) newSleepDuration)); + } + + return (int) newSleepDuration; + } + + /** + * Timer callback implementation for periodically analyzing metrics. + */ + class TimerTaskImpl extends TimerTask { + private AtomicInteger doingWork = new AtomicInteger(0); + + /** + * Periodically analyzes a snapshot of the blob storage metrics and updates + * the sleepDuration in order to appropriately throttle storage operations. + */ + @Override + public void run() { + boolean doWork = false; + try { + doWork = doingWork.compareAndSet(0, 1); + + // prevent concurrent execution of this task + if (!doWork) { + return; + } + + long now = System.currentTimeMillis(); + if (now - blobMetrics.get().startTime >= analysisPeriodMs) { + AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet( + new AbfsOperationMetrics(now)); + oldMetrics.endTime = now; + sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics, + sleepDuration); + } + } finally { + if (doWork) { + doingWork.set(0); + } + } + } + } + + /** + * Stores Abfs operation metrics during each analysis period. + */ + static class AbfsOperationMetrics { + private AtomicLong bytesFailed; + private AtomicLong bytesSuccessful; + private AtomicLong operationsFailed; + private AtomicLong operationsSuccessful; + private long endTime; + private long startTime; + + AbfsOperationMetrics(long startTime) { + this.startTime = startTime; + this.bytesFailed = new AtomicLong(); + this.bytesSuccessful = new AtomicLong(); + this.operationsFailed = new AtomicLong(); + this.operationsSuccessful = new AtomicLong(); + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java new file mode 100644 index 00000000000..e981d76ba18 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.net.HttpURLConnection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Throttles Azure Blob File System read and write operations to achieve maximum + * throughput by minimizing errors. The errors occur when the account ingress + * or egress limits are exceeded and the server-side throttles requests. + * Server-side throttling causes the retry policy to be used, but the retry + * policy sleeps for long periods of time causing the total ingress or egress + * throughput to be as much as 35% lower than optimal. The retry policy is also + * after the fact, in that it applies after a request fails. On the other hand, + * the client-side throttling implemented here happens before requests are made + * and sleeps just enough to minimize errors, allowing optimal ingress and/or + * egress throughput. + */ +public final class AbfsClientThrottlingIntercept { + private static final Logger LOG = LoggerFactory.getLogger( + AbfsClientThrottlingIntercept.class); + private static AbfsClientThrottlingIntercept singleton = null; + private AbfsClientThrottlingAnalyzer readThrottler = null; + private AbfsClientThrottlingAnalyzer writeThrottler = null; + private static boolean isAutoThrottlingEnabled = false; + + // Hide default constructor + private AbfsClientThrottlingIntercept() { + readThrottler = new AbfsClientThrottlingAnalyzer("read"); + writeThrottler = new AbfsClientThrottlingAnalyzer("write"); + isAutoThrottlingEnabled = true; + LOG.debug("Client-side throttling is enabled for the ABFS file system."); + } + + public static synchronized void initializeSingleton(boolean isAutoThrottlingEnabled) { + if (!isAutoThrottlingEnabled) { + return; + } + if (singleton == null) { + singleton = new AbfsClientThrottlingIntercept(); + } + } + + static void updateMetrics(AbfsRestOperationType operationType, + AbfsHttpOperation abfsHttpOperation) { + if (!isAutoThrottlingEnabled || abfsHttpOperation == null) { + return; + } + + int status = abfsHttpOperation.getStatusCode(); + long contentLength = 0; + // If the socket is terminated prior to receiving a response, the HTTP + // status may be 0 or -1. A status less than 200 or greater than or equal + // to 500 is considered an error. + boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK + || status >= HttpURLConnection.HTTP_INTERNAL_ERROR); + + switch (operationType) { + case Append: + contentLength = abfsHttpOperation.getBytesSent(); + if (contentLength > 0) { + singleton.writeThrottler.addBytesTransferred(contentLength, + isFailedOperation); + } + break; + case ReadFile: + contentLength = abfsHttpOperation.getBytesReceived(); + if (contentLength > 0) { + singleton.readThrottler.addBytesTransferred(contentLength, + isFailedOperation); + } + break; + default: + break; + } + } + + /** + * Called before the request is sent. Client-side throttling + * uses this to suspend the request, if necessary, to minimize errors and + * maximize throughput. + */ + static void sendingRequest(AbfsRestOperationType operationType) { + if (!isAutoThrottlingEnabled) { + return; + } + + switch (operationType) { + case ReadFile: + singleton.readThrottler.suspendIfNecessary(); + break; + case Append: + singleton.writeThrottler.suspendIfNecessary(); + break; + default: + break; + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index c0407f58d29..9a7187996de 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -36,6 +36,8 @@ * The AbfsRestOperation for Rest AbfsClient. */ public class AbfsRestOperation { + // The type of the REST operation (Append, ReadFile, etc) + private final AbfsRestOperationType operationType; // Blob FS client, which has the credentials, retry policy, and logs. private final AbfsClient client; // the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE) @@ -71,10 +73,12 @@ public AbfsHttpOperation getResult() { * @param url The full URL including query string parameters. * @param requestHeaders The HTTP request headers. */ - AbfsRestOperation(final AbfsClient client, + AbfsRestOperation(final AbfsRestOperationType operationType, + final AbfsClient client, final String method, final URL url, final List requestHeaders) { + this.operationType = operationType; this.client = client; this.method = method; this.url = url; @@ -86,6 +90,7 @@ public AbfsHttpOperation getResult() { /** * Initializes a new REST operation. * + * @param operationType The type of the REST operation (Append, ReadFile, etc). * @param client The Blob FS client. * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). * @param url The full URL including query string parameters. @@ -95,14 +100,15 @@ public AbfsHttpOperation getResult() { * @param bufferOffset An offset into the buffer where the data beings. * @param bufferLength The length of the data in the buffer. */ - AbfsRestOperation(AbfsClient client, + AbfsRestOperation(AbfsRestOperationType operationType, + AbfsClient client, String method, URL url, List requestHeaders, byte[] buffer, int bufferOffset, int bufferLength) { - this(client, method, url, requestHeaders); + this(operationType, client, method, url, requestHeaders); this.buffer = buffer; this.bufferOffset = bufferOffset; this.bufferLength = bufferLength; @@ -152,6 +158,7 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS if (hasRequestBody) { // HttpUrlConnection requires + AbfsClientThrottlingIntercept.sendingRequest(operationType); httpOperation.sendRequest(buffer, bufferOffset, bufferLength); } @@ -168,6 +175,8 @@ private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileS throw new InvalidAbfsRestOperationException(ex); } return false; + } finally { + AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation); } LOG.debug("HttpRequest: " + httpOperation.toString()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java new file mode 100644 index 00000000000..eeea81750e6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperationType.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +/** + * The REST operation type (Read, Append, Other ). + */ +public enum AbfsRestOperationType { + CreateFileSystem, + GetFileSystemProperties, + SetFileSystemProperties, + ListPaths, + DeleteFileSystem, + CreatePath, + RenamePath, + GetAcl, + GetPathProperties, + SetAcl, + SetOwner, + SetPathProperties, + SetPermissions, + Append, + Flush, + ReadFile, + DeletePath +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java new file mode 100644 index 00000000000..5105b85d000 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java @@ -0,0 +1,159 @@ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Tests for AbfsClientThrottlingAnalyzer. + */ +public class TestAbfsClientThrottlingAnalyzer { + private static final int ANALYSIS_PERIOD = 1000; + private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD + + ANALYSIS_PERIOD / 10; + private static final long MEGABYTE = 1024 * 1024; + private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20; + + private void sleep(long milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private void fuzzyValidate(long expected, long actual, double percentage) { + final double lowerBound = Math.max(expected - percentage / 100 * expected, 0); + final double upperBound = expected + percentage / 100 * expected; + + assertTrue( + String.format( + "The actual value %1$d is not within the expected range: " + + "[%2$.2f, %3$.2f].", + actual, + lowerBound, + upperBound), + actual >= lowerBound && actual <= upperBound); + } + + private void validate(long expected, long actual) { + assertEquals( + String.format("The actual value %1$d is not the expected value %2$d.", + actual, + expected), + expected, actual); + } + + private void validateLessThanOrEqual(long maxExpected, long actual) { + assertTrue( + String.format( + "The actual value %1$d is not less than or equal to the maximum" + + " expected value %2$d.", + actual, + maxExpected), + actual < maxExpected); + } + + /** + * Ensure that there is no waiting (sleepDuration = 0) if the metrics have + * never been updated. This validates proper initialization of + * ClientThrottlingAnalyzer. + */ + @Test + public void testNoMetricUpdatesThenNoWaiting() { + AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + validate(0, analyzer.getSleepDuration()); + } + + /** + * Ensure that there is no waiting (sleepDuration = 0) if the metrics have + * only been updated with successful requests. + */ + @Test + public void testOnlySuccessThenNoWaiting() { + AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + analyzer.addBytesTransferred(8 * MEGABYTE, false); + validate(0, analyzer.getSleepDuration()); + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + validate(0, analyzer.getSleepDuration()); + } + + /** + * Ensure that there is waiting (sleepDuration != 0) if the metrics have + * only been updated with failed requests. Also ensure that the + * sleepDuration decreases over time. + */ + @Test + public void testOnlyErrorsAndWaiting() { + AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + analyzer.addBytesTransferred(4 * MEGABYTE, true); + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + final int expectedSleepDuration1 = 1100; + validateLessThanOrEqual(expectedSleepDuration1, analyzer.getSleepDuration()); + sleep(10 * ANALYSIS_PERIOD); + final int expectedSleepDuration2 = 900; + validateLessThanOrEqual(expectedSleepDuration2, analyzer.getSleepDuration()); + } + + /** + * Ensure that there is waiting (sleepDuration != 0) if the metrics have + * only been updated with both successful and failed requests. Also ensure + * that the sleepDuration decreases over time. + */ + @Test + public void testSuccessAndErrorsAndWaiting() { + AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + analyzer.addBytesTransferred(8 * MEGABYTE, false); + analyzer.addBytesTransferred(2 * MEGABYTE, true); + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + analyzer.suspendIfNecessary(); + final int expectedElapsedTime = 126; + fuzzyValidate(expectedElapsedTime, + timer.elapsedTimeMs(), + MAX_ACCEPTABLE_PERCENT_DIFFERENCE); + sleep(10 * ANALYSIS_PERIOD); + final int expectedSleepDuration = 110; + validateLessThanOrEqual(expectedSleepDuration, analyzer.getSleepDuration()); + } + + /** + * Ensure that there is waiting (sleepDuration != 0) if the metrics have + * only been updated with many successful and failed requests. Also ensure + * that the sleepDuration decreases to zero over time. + */ + @Test + public void testManySuccessAndErrorsAndWaiting() { + AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + final int numberOfRequests = 20; + for (int i = 0; i < numberOfRequests; i++) { + analyzer.addBytesTransferred(8 * MEGABYTE, false); + analyzer.addBytesTransferred(2 * MEGABYTE, true); + } + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer(); + analyzer.suspendIfNecessary(); + fuzzyValidate(7, + timer.elapsedTimeMs(), + MAX_ACCEPTABLE_PERCENT_DIFFERENCE); + sleep(10 * ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + } +} \ No newline at end of file