diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 4c77d2e136d..5a31b10b514 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -117,6 +117,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ) private boolean optimizeFooterRead; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED, + DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED) + private boolean accountThrottlingEnabled; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE, MinValue = MIN_BUFFER_SIZE, MaxValue = MAX_BUFFER_SIZE, @@ -260,6 +264,14 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING) private boolean enableAutoThrottling; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT, + DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS) + private int accountOperationIdleTimeout; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ANALYSIS_PERIOD, + DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS) + private int analysisPeriod; + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT, MinValue = 0, DefaultValue = RATE_LIMIT_DEFAULT) @@ -694,6 +706,10 @@ public class AbfsConfiguration{ return this.azureAppendBlobDirs; } + public boolean accountThrottlingEnabled() { + return accountThrottlingEnabled; + } + public String getAzureInfiniteLeaseDirs() { return this.azureInfiniteLeaseDirs; } @@ -736,6 +752,14 @@ public class AbfsConfiguration{ return this.enableAutoThrottling; } + public int getAccountOperationIdleTimeout() { + return accountOperationIdleTimeout; + } + + public int getAnalysisPeriod() { + return analysisPeriod; + } + public int getRateLimit() { return rateLimit; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index d0bdd9818db..21501d28f42 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -55,7 +55,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; -import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept; import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.classification.InterfaceStability; @@ -225,7 +224,6 @@ public class AzureBlobFileSystem extends FileSystem } } - AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled()); rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit()); LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index 0353f3e01ff..a59f76b6d0f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -38,6 +38,7 @@ public final class ConfigurationKeys { public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key"; public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)"; public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode"; + public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled"; // Retry strategy defined by the user public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval"; @@ -116,6 +117,8 @@ public final class ConfigurationKeys { public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization"; public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization"; public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; + public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout"; + public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; /** This config ensures that during create overwrite an existing file will be diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index 097285bb48f..0ea2c929800 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -94,6 +94,9 @@ public final class FileSystemConfigurations { public static final boolean DEFAULT_ENABLE_FLUSH = true; public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true; public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true; + public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true; + public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000; + public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000; public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE = DelegatingSSLSocketFactory.SSLChannelMode.Default; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index fe2ea35f1df..9974979aeba 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -101,6 +101,7 @@ public class AbfsClient implements Closeable { private AccessTokenProvider tokenProvider; private SASTokenProvider sasTokenProvider; private final AbfsCounters abfsCounters; + private final AbfsThrottlingIntercept intercept; private final ListeningScheduledExecutorService executorService; @@ -120,6 +121,7 @@ public class AbfsClient implements Closeable { this.retryPolicy = abfsClientContext.getExponentialRetryPolicy(); this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT)); this.authType = abfsConfiguration.getAuthType(accountName); + this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration); String encryptionKey = this.abfsConfiguration .getClientProvidedEncryptionKey(); @@ -216,6 +218,10 @@ public class AbfsClient implements Closeable { return sharedKeyCredentials; } + AbfsThrottlingIntercept getIntercept() { + return intercept; + } + List createDefaultHeaders() { final List requestHeaders = new ArrayList(); requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion)); @@ -1277,6 +1283,14 @@ public class AbfsClient implements Closeable { return abfsCounters; } + /** + * Getter for abfsConfiguration from AbfsClient. + * @return AbfsConfiguration instance + */ + protected AbfsConfiguration getAbfsConfiguration() { + return abfsConfiguration; + } + public int getNumLeaseThreads() { return abfsConfiguration.getNumLeaseThreads(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java index a55c924dd81..f1eb3a2a774 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java @@ -20,20 +20,23 @@ package org.apache.hadoop.fs.azurebfs.services; import java.util.Timer; import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.util.Preconditions; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.util.Time.now; + class AbfsClientThrottlingAnalyzer { private static final Logger LOG = LoggerFactory.getLogger( AbfsClientThrottlingAnalyzer.class); - private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000; private static final int MIN_ANALYSIS_PERIOD_MS = 1000; private static final int MAX_ANALYSIS_PERIOD_MS = 30000; private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1; @@ -50,42 +53,38 @@ class AbfsClientThrottlingAnalyzer { private String name = null; private Timer timer = null; private AtomicReference blobMetrics = null; + private AtomicLong lastExecutionTime = null; + private final AtomicBoolean isOperationOnAccountIdle = new AtomicBoolean(false); + private AbfsConfiguration abfsConfiguration = null; + private boolean accountLevelThrottlingEnabled = true; private AbfsClientThrottlingAnalyzer() { // hide default constructor } - /** - * Creates an instance of the AbfsClientThrottlingAnalyzer class with - * the specified name. - * - * @param name a name used to identify this instance. - * @throws IllegalArgumentException if name is null or empty. - */ - AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException { - this(name, DEFAULT_ANALYSIS_PERIOD_MS); - } - /** * Creates an instance of the AbfsClientThrottlingAnalyzer class with * the specified name and period. * * @param name A name used to identify this instance. - * @param period The frequency, in milliseconds, at which metrics are - * analyzed. + * @param abfsConfiguration The configuration set. * @throws IllegalArgumentException If name is null or empty. * If period is less than 1000 or greater than 30000 milliseconds. */ - AbfsClientThrottlingAnalyzer(String name, int period) + AbfsClientThrottlingAnalyzer(String name, AbfsConfiguration abfsConfiguration) throws IllegalArgumentException { Preconditions.checkArgument( StringUtils.isNotEmpty(name), "The argument 'name' cannot be null or empty."); + int period = abfsConfiguration.getAnalysisPeriod(); Preconditions.checkArgument( period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS, "The argument 'period' must be between 1000 and 30000."); this.name = name; - this.analysisPeriodMs = period; + this.abfsConfiguration = abfsConfiguration; + this.accountLevelThrottlingEnabled = abfsConfiguration.accountThrottlingEnabled(); + this.analysisPeriodMs = abfsConfiguration.getAnalysisPeriod(); + this.lastExecutionTime = new AtomicLong(now()); this.blobMetrics = new AtomicReference( new AbfsOperationMetrics(System.currentTimeMillis())); this.timer = new Timer( @@ -95,6 +94,47 @@ class AbfsClientThrottlingAnalyzer { analysisPeriodMs); } + /** + * Resumes the timer if it was stopped. + */ + private void resumeTimer() { + blobMetrics = new AtomicReference( + new AbfsOperationMetrics(System.currentTimeMillis())); + timer.schedule(new TimerTaskImpl(), + analysisPeriodMs, + analysisPeriodMs); + isOperationOnAccountIdle.set(false); + } + + /** + * Synchronized method to suspend or resume timer. + * @param timerFunctionality resume or suspend. + * @param timerTask The timertask object. + * @return true or false. + */ + private synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality, + TimerTask timerTask) { + switch (timerFunctionality) { + case RESUME: + if (isOperationOnAccountIdle.get()) { + resumeTimer(); + } + break; + case SUSPEND: + if (accountLevelThrottlingEnabled && (System.currentTimeMillis() + - lastExecutionTime.get() >= getOperationIdleTimeout())) { + isOperationOnAccountIdle.set(true); + timerTask.cancel(); + timer.purge(); + return true; + } + break; + default: + break; + } + return false; + } + /** * Updates metrics with results from the current storage operation. * @@ -104,12 +144,13 @@ class AbfsClientThrottlingAnalyzer { public void addBytesTransferred(long count, boolean isFailedOperation) { AbfsOperationMetrics metrics = blobMetrics.get(); if (isFailedOperation) { - metrics.bytesFailed.addAndGet(count); - metrics.operationsFailed.incrementAndGet(); + metrics.addBytesFailed(count); + metrics.incrementOperationsFailed(); } else { - metrics.bytesSuccessful.addAndGet(count); - metrics.operationsSuccessful.incrementAndGet(); + metrics.addBytesSuccessful(count); + metrics.incrementOperationsSuccessful(); } + blobMetrics.set(metrics); } /** @@ -117,6 +158,8 @@ class AbfsClientThrottlingAnalyzer { * @return true if Thread sleeps(Throttling occurs) else false. */ public boolean suspendIfNecessary() { + lastExecutionTime.set(now()); + timerOrchestrator(TimerFunctionality.RESUME, null); int duration = sleepDuration; if (duration > 0) { try { @@ -134,19 +177,27 @@ class AbfsClientThrottlingAnalyzer { return sleepDuration; } + int getOperationIdleTimeout() { + return abfsConfiguration.getAccountOperationIdleTimeout(); + } + + AtomicBoolean getIsOperationOnAccountIdle() { + return isOperationOnAccountIdle; + } + private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics, int sleepDuration) { final double percentageConversionFactor = 100; - double bytesFailed = metrics.bytesFailed.get(); - double bytesSuccessful = metrics.bytesSuccessful.get(); - double operationsFailed = metrics.operationsFailed.get(); - double operationsSuccessful = metrics.operationsSuccessful.get(); + double bytesFailed = metrics.getBytesFailed().get(); + double bytesSuccessful = metrics.getBytesSuccessful().get(); + double operationsFailed = metrics.getOperationsFailed().get(); + double operationsSuccessful = metrics.getOperationsSuccessful().get(); double errorPercentage = (bytesFailed <= 0) ? 0 : (percentageConversionFactor * bytesFailed / (bytesFailed + bytesSuccessful)); - long periodMs = metrics.endTime - metrics.startTime; + long periodMs = metrics.getEndTime() - metrics.getStartTime(); double newSleepDuration; @@ -238,10 +289,13 @@ class AbfsClientThrottlingAnalyzer { } long now = System.currentTimeMillis(); - if (now - blobMetrics.get().startTime >= analysisPeriodMs) { + if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) { + return; + } + if (now - blobMetrics.get().getStartTime() >= analysisPeriodMs) { AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet( new AbfsOperationMetrics(now)); - oldMetrics.endTime = now; + oldMetrics.setEndTime(now); sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics, sleepDuration); } @@ -252,24 +306,4 @@ class AbfsClientThrottlingAnalyzer { } } } - - /** - * Stores Abfs operation metrics during each analysis period. - */ - static class AbfsOperationMetrics { - private AtomicLong bytesFailed; - private AtomicLong bytesSuccessful; - private AtomicLong operationsFailed; - private AtomicLong operationsSuccessful; - private long endTime; - private long startTime; - - AbfsOperationMetrics(long startTime) { - this.startTime = startTime; - this.bytesFailed = new AtomicLong(); - this.bytesSuccessful = new AtomicLong(); - this.operationsFailed = new AtomicLong(); - this.operationsSuccessful = new AtomicLong(); - } - } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 7303e833418..52a46bc7469 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -19,10 +19,12 @@ package org.apache.hadoop.fs.azurebfs.services; import java.net.HttpURLConnection; +import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; @@ -38,35 +40,89 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; * and sleeps just enough to minimize errors, allowing optimal ingress and/or * egress throughput. */ -public final class AbfsClientThrottlingIntercept { +public final class AbfsClientThrottlingIntercept implements AbfsThrottlingIntercept { private static final Logger LOG = LoggerFactory.getLogger( AbfsClientThrottlingIntercept.class); private static final String RANGE_PREFIX = "bytes="; - private static AbfsClientThrottlingIntercept singleton = null; - private AbfsClientThrottlingAnalyzer readThrottler = null; - private AbfsClientThrottlingAnalyzer writeThrottler = null; - private static boolean isAutoThrottlingEnabled = false; + private static AbfsClientThrottlingIntercept singleton; // singleton, initialized in static initialization block + private static final ReentrantLock LOCK = new ReentrantLock(); + private final AbfsClientThrottlingAnalyzer readThrottler; + private final AbfsClientThrottlingAnalyzer writeThrottler; + private final String accountName; // Hide default constructor - private AbfsClientThrottlingIntercept() { - readThrottler = new AbfsClientThrottlingAnalyzer("read"); - writeThrottler = new AbfsClientThrottlingAnalyzer("write"); + public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsConfiguration) { + this.accountName = accountName; + this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration); + this.writeThrottler = setAnalyzer("write " + accountName, abfsConfiguration); + LOG.debug("Client-side throttling is enabled for the ABFS file system for the account : {}", accountName); } - public static synchronized void initializeSingleton(boolean enableAutoThrottling) { - if (!enableAutoThrottling) { - return; - } + // Hide default constructor + private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) { + //Account name is kept as empty as same instance is shared across all accounts + this.accountName = ""; + this.readThrottler = setAnalyzer("read", abfsConfiguration); + this.writeThrottler = setAnalyzer("write", abfsConfiguration); + LOG.debug("Client-side throttling is enabled for the ABFS file system using singleton intercept"); + } + + /** + * Sets the analyzer for the intercept. + * @param name Name of the analyzer. + * @param abfsConfiguration The configuration. + * @return AbfsClientThrottlingAnalyzer instance. + */ + private AbfsClientThrottlingAnalyzer setAnalyzer(String name, AbfsConfiguration abfsConfiguration) { + return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration); + } + + /** + * Returns the analyzer for read operations. + * @return AbfsClientThrottlingAnalyzer for read. + */ + AbfsClientThrottlingAnalyzer getReadThrottler() { + return readThrottler; + } + + /** + * Returns the analyzer for write operations. + * @return AbfsClientThrottlingAnalyzer for write. + */ + AbfsClientThrottlingAnalyzer getWriteThrottler() { + return writeThrottler; + } + + /** + * Creates a singleton object of the AbfsClientThrottlingIntercept. + * which is shared across all filesystem instances. + * @param abfsConfiguration configuration set. + * @return singleton object of intercept. + */ + static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsConfiguration) { if (singleton == null) { - singleton = new AbfsClientThrottlingIntercept(); - isAutoThrottlingEnabled = true; - LOG.debug("Client-side throttling is enabled for the ABFS file system."); + LOCK.lock(); + try { + if (singleton == null) { + singleton = new AbfsClientThrottlingIntercept(abfsConfiguration); + LOG.debug("Client-side throttling is enabled for the ABFS file system."); + } + } finally { + LOCK.unlock(); + } } + return singleton; } - static void updateMetrics(AbfsRestOperationType operationType, - AbfsHttpOperation abfsHttpOperation) { - if (!isAutoThrottlingEnabled || abfsHttpOperation == null) { + /** + * Updates the metrics for successful and failed read and write operations. + * @param operationType Only applicable for read and write operations. + * @param abfsHttpOperation Used for status code and data transferred. + */ + @Override + public void updateMetrics(AbfsRestOperationType operationType, + AbfsHttpOperation abfsHttpOperation) { + if (abfsHttpOperation == null) { return; } @@ -82,7 +138,7 @@ public final class AbfsClientThrottlingIntercept { case Append: contentLength = abfsHttpOperation.getBytesSent(); if (contentLength > 0) { - singleton.writeThrottler.addBytesTransferred(contentLength, + writeThrottler.addBytesTransferred(contentLength, isFailedOperation); } break; @@ -90,7 +146,7 @@ public final class AbfsClientThrottlingIntercept { String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE); contentLength = getContentLengthIfKnown(range); if (contentLength > 0) { - singleton.readThrottler.addBytesTransferred(contentLength, + readThrottler.addBytesTransferred(contentLength, isFailedOperation); } break; @@ -104,21 +160,18 @@ public final class AbfsClientThrottlingIntercept { * uses this to suspend the request, if necessary, to minimize errors and * maximize throughput. */ - static void sendingRequest(AbfsRestOperationType operationType, + @Override + public void sendingRequest(AbfsRestOperationType operationType, AbfsCounters abfsCounters) { - if (!isAutoThrottlingEnabled) { - return; - } - switch (operationType) { case ReadFile: - if (singleton.readThrottler.suspendIfNecessary() + if (readThrottler.suspendIfNecessary() && abfsCounters != null) { abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1); } break; case Append: - if (singleton.writeThrottler.suspendIfNecessary() + if (writeThrottler.suspendIfNecessary() && abfsCounters != null) { abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java new file mode 100644 index 00000000000..6b84e583c33 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsNoOpThrottlingIntercept.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept { + + public static final AbfsNoOpThrottlingIntercept INSTANCE = new AbfsNoOpThrottlingIntercept(); + + private AbfsNoOpThrottlingIntercept() { + } + + @Override + public void updateMetrics(final AbfsRestOperationType operationType, + final AbfsHttpOperation abfsHttpOperation) { + } + + @Override + public void sendingRequest(final AbfsRestOperationType operationType, + final AbfsCounters abfsCounters) { + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java new file mode 100644 index 00000000000..2e53367d39f --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOperationMetrics.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Stores Abfs operation metrics during each analysis period. + */ +class AbfsOperationMetrics { + + /** + * No of bytes which could not be transferred due to a failed operation. + */ + private final AtomicLong bytesFailed; + + /** + * No of bytes successfully transferred during a successful operation. + */ + private final AtomicLong bytesSuccessful; + + /** + * Total no of failed operations. + */ + private final AtomicLong operationsFailed; + + /** + * Total no of successful operations. + */ + private final AtomicLong operationsSuccessful; + + /** + * Time when collection of metrics ended. + */ + private long endTime; + + /** + * Time when the collection of metrics started. + */ + private final long startTime; + + AbfsOperationMetrics(long startTime) { + this.startTime = startTime; + this.bytesFailed = new AtomicLong(); + this.bytesSuccessful = new AtomicLong(); + this.operationsFailed = new AtomicLong(); + this.operationsSuccessful = new AtomicLong(); + } + + /** + * + * @return bytes failed to transfer. + */ + AtomicLong getBytesFailed() { + return bytesFailed; + } + + /** + * + * @return bytes successfully transferred. + */ + AtomicLong getBytesSuccessful() { + return bytesSuccessful; + } + + /** + * + * @return no of operations failed. + */ + AtomicLong getOperationsFailed() { + return operationsFailed; + } + + /** + * + * @return no of successful operations. + */ + AtomicLong getOperationsSuccessful() { + return operationsSuccessful; + } + + /** + * + * @return end time of metric collection. + */ + long getEndTime() { + return endTime; + } + + /** + * + * @param endTime sets the end time. + */ + void setEndTime(final long endTime) { + this.endTime = endTime; + } + + /** + * + * @return start time of metric collection. + */ + long getStartTime() { + return startTime; + } + + void addBytesFailed(long bytes) { + this.getBytesFailed().addAndGet(bytes); + } + + void addBytesSuccessful(long bytes) { + this.getBytesSuccessful().addAndGet(bytes); + } + + void incrementOperationsFailed() { + this.getOperationsFailed().incrementAndGet(); + } + + void incrementOperationsSuccessful() { + this.getOperationsSuccessful().incrementAndGet(); + } + +} + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index c2a80f81770..2c669e9e101 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -45,6 +45,8 @@ public class AbfsRestOperation { private final AbfsRestOperationType operationType; // Blob FS client, which has the credentials, retry policy, and logs. private final AbfsClient client; + // Return intercept instance + private final AbfsThrottlingIntercept intercept; // the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE) private final String method; // full URL including query parameters @@ -145,6 +147,7 @@ public class AbfsRestOperation { || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); this.sasToken = sasToken; this.abfsCounters = client.getAbfsCounters(); + this.intercept = client.getIntercept(); } /** @@ -241,7 +244,8 @@ public class AbfsRestOperation { */ private boolean executeHttpOperation(final int retryCount, TracingContext tracingContext) throws AzureBlobFileSystemException { - AbfsHttpOperation httpOperation = null; + AbfsHttpOperation httpOperation; + try { // initialize the HTTP request and open the connection httpOperation = new AbfsHttpOperation(url, method, requestHeaders); @@ -278,8 +282,7 @@ public class AbfsRestOperation { // dump the headers AbfsIoUtils.dumpHeadersToDebugLog("Request Headers", httpOperation.getConnection().getRequestProperties()); - AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters); - + intercept.sendingRequest(operationType, abfsCounters); if (hasRequestBody) { // HttpUrlConnection requires httpOperation.sendRequest(buffer, bufferOffset, bufferLength); @@ -317,7 +320,7 @@ public class AbfsRestOperation { return false; } finally { - AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation); + intercept.updateMetrics(operationType, httpOperation); } LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString()); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java new file mode 100644 index 00000000000..57b5095bb32 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingIntercept.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * An interface for Abfs Throttling Interface. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface AbfsThrottlingIntercept { + + /** + * Updates the metrics for successful and failed read and write operations. + * @param operationType Only applicable for read and write operations. + * @param abfsHttpOperation Used for status code and data transferred. + */ + void updateMetrics(AbfsRestOperationType operationType, + AbfsHttpOperation abfsHttpOperation); + + /** + * Called before the request is sent. Client-side throttling + * uses this to suspend the request, if necessary, to minimize errors and + * maximize throughput. + * @param operationType Only applicable for read and write operations. + * @param abfsCounters Used for counters. + */ + void sendingRequest(AbfsRestOperationType operationType, + AbfsCounters abfsCounters); + +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java new file mode 100644 index 00000000000..279b7a318ca --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsThrottlingInterceptFactory.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.util.WeakReferenceMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class to get an instance of throttling intercept class per account. + */ +final class AbfsThrottlingInterceptFactory { + + private AbfsThrottlingInterceptFactory() { + } + + private static AbfsConfiguration abfsConfig; + + /** + * List of references notified of loss. + */ + private static List lostReferences = new ArrayList<>(); + + private static final Logger LOG = LoggerFactory.getLogger( + AbfsThrottlingInterceptFactory.class); + + /** + * Map which stores instance of ThrottlingIntercept class per account. + */ + private static WeakReferenceMap + interceptMap = new WeakReferenceMap<>( + AbfsThrottlingInterceptFactory::factory, + AbfsThrottlingInterceptFactory::referenceLost); + + /** + * Returns instance of throttling intercept. + * @param accountName Account name. + * @return instance of throttling intercept. + */ + private static AbfsClientThrottlingIntercept factory(final String accountName) { + return new AbfsClientThrottlingIntercept(accountName, abfsConfig); + } + + /** + * Reference lost callback. + * @param accountName key lost. + */ + private static void referenceLost(String accountName) { + lostReferences.add(accountName); + } + + /** + * Returns an instance of AbfsThrottlingIntercept. + * + * @param accountName The account for which we need instance of throttling intercept. + @param abfsConfiguration The object of abfsconfiguration class. + * @return Instance of AbfsThrottlingIntercept. + */ + static synchronized AbfsThrottlingIntercept getInstance(String accountName, + AbfsConfiguration abfsConfiguration) { + abfsConfig = abfsConfiguration; + AbfsThrottlingIntercept intercept; + if (!abfsConfiguration.isAutoThrottlingEnabled()) { + return AbfsNoOpThrottlingIntercept.INSTANCE; + } + // If singleton is enabled use a static instance of the intercept class for all accounts + if (!abfsConfiguration.accountThrottlingEnabled()) { + intercept = AbfsClientThrottlingIntercept.initializeSingleton( + abfsConfiguration); + } else { + // Return the instance from the map + intercept = interceptMap.get(accountName); + if (intercept == null) { + intercept = new AbfsClientThrottlingIntercept(accountName, + abfsConfiguration); + interceptMap.put(accountName, intercept); + } + } + return intercept; + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java new file mode 100644 index 00000000000..bf7da69ec49 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/TimerFunctionality.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +public enum TimerFunctionality { + RESUME, + + SUSPEND +} + diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md index 35d36055604..31498df1790 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/abfs.md @@ -767,6 +767,12 @@ Hflush() being the only documented API that can provide persistent data transfer, Flush() also attempting to persist buffered data will lead to performance issues. + +### Account level throttling Options + +`fs.azure.account.operation.idle.timeout`: This value specifies the time after which the timer for the analyzer (read or +write) should be paused until no new request is made again. The default value for the same is 60 seconds. + ### HNS Check Options Config `fs.azure.account.hns.enabled` provides an option to specify whether the storage account is HNS enabled or not. In case the config is not provided, @@ -877,6 +883,9 @@ when there are too many writes from the same process. tuned with this config considering each queued request holds a buffer. Set the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests. +`fs.azure.analysis.period`: The time after which sleep duration is recomputed after analyzing metrics. The default value +for the same is 10 seconds. + ### Security Options `fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag is made true. Irrespective of the flag, `AbfsClient` will use HTTPS if the secure diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java index 565eb38c4f7..9e40f22d231 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/constants/TestConfigurationKeys.java @@ -24,6 +24,9 @@ package org.apache.hadoop.fs.azurebfs.constants; public final class TestConfigurationKeys { public static final String FS_AZURE_ACCOUNT_NAME = "fs.azure.account.name"; public static final String FS_AZURE_ABFS_ACCOUNT_NAME = "fs.azure.abfs.account.name"; + public static final String FS_AZURE_ABFS_ACCOUNT1_NAME = "fs.azure.abfs.account1.name"; + public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; + public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period"; public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key"; public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs"; public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled"; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index 0a1dca7e7d8..08eb3adc926 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -306,6 +306,11 @@ public final class TestAbfsClient { when(client.getAccessToken()).thenCallRealMethod(); when(client.getSharedKeyCredentials()).thenCallRealMethod(); when(client.createDefaultHeaders()).thenCallRealMethod(); + when(client.getAbfsConfiguration()).thenReturn(abfsConfig); + when(client.getIntercept()).thenReturn( + AbfsThrottlingInterceptFactory.getInstance( + abfsConfig.getAccountName().substring(0, + abfsConfig.getAccountName().indexOf(DOT)), abfsConfig)); // override baseurl client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration", diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java index 3f680e49930..22649cd190d 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClientThrottlingAnalyzer.java @@ -18,9 +18,15 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.contract.ContractTestUtils; import org.junit.Test; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ANALYSIS_PERIOD; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -33,6 +39,15 @@ public class TestAbfsClientThrottlingAnalyzer { + ANALYSIS_PERIOD / 10; private static final long MEGABYTE = 1024 * 1024; private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20; + private AbfsConfiguration abfsConfiguration; + + public TestAbfsClientThrottlingAnalyzer() throws IOException, IllegalAccessException { + final Configuration configuration = new Configuration(); + configuration.addResource(TEST_CONFIGURATION_FILE_NAME); + configuration.setInt(FS_AZURE_ANALYSIS_PERIOD, 1000); + this.abfsConfiguration = new AbfsConfiguration(configuration, + "dummy"); + } private void sleep(long milliseconds) { try { @@ -82,8 +97,7 @@ public class TestAbfsClientThrottlingAnalyzer { @Test public void testNoMetricUpdatesThenNoWaiting() { AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( - "test", - ANALYSIS_PERIOD); + "test", abfsConfiguration); validate(0, analyzer.getSleepDuration()); sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); validate(0, analyzer.getSleepDuration()); @@ -96,8 +110,7 @@ public class TestAbfsClientThrottlingAnalyzer { @Test public void testOnlySuccessThenNoWaiting() { AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( - "test", - ANALYSIS_PERIOD); + "test", abfsConfiguration); analyzer.addBytesTransferred(8 * MEGABYTE, false); validate(0, analyzer.getSleepDuration()); sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); @@ -112,8 +125,7 @@ public class TestAbfsClientThrottlingAnalyzer { @Test public void testOnlyErrorsAndWaiting() { AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( - "test", - ANALYSIS_PERIOD); + "test", abfsConfiguration); validate(0, analyzer.getSleepDuration()); analyzer.addBytesTransferred(4 * MEGABYTE, true); sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); @@ -132,8 +144,7 @@ public class TestAbfsClientThrottlingAnalyzer { @Test public void testSuccessAndErrorsAndWaiting() { AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( - "test", - ANALYSIS_PERIOD); + "test", abfsConfiguration); validate(0, analyzer.getSleepDuration()); analyzer.addBytesTransferred(8 * MEGABYTE, false); analyzer.addBytesTransferred(2 * MEGABYTE, true); @@ -157,8 +168,7 @@ public class TestAbfsClientThrottlingAnalyzer { @Test public void testManySuccessAndErrorsAndWaiting() { AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( - "test", - ANALYSIS_PERIOD); + "test", abfsConfiguration); validate(0, analyzer.getSleepDuration()); final int numberOfRequests = 20; for (int i = 0; i < numberOfRequests; i++) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java index 0f8dc55aa14..a1fc4e138d6 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestExponentialRetryPolicy.java @@ -18,13 +18,35 @@ package org.apache.hadoop.fs.azurebfs.services; +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; + import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_AUTOTHROTTLING; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT1_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME; +import static org.junit.Assume.assumeTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.fs.FSDataInputStream; + +import org.assertj.core.api.Assertions; +import org.junit.Assume; +import org.mockito.Mockito; + +import java.net.URI; import java.util.Random; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; import org.junit.Assert; import org.junit.Test; @@ -41,6 +63,9 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest { private final int noRetryCount = 0; private final int retryCount = new Random().nextInt(maxRetryCount); private final int retryCountBeyondMax = maxRetryCount + 1; + private static final String TEST_PATH = "/testfile"; + private static final double MULTIPLYING_FACTOR = 1.5; + private static final int ANALYSIS_PERIOD = 10000; public TestExponentialRetryPolicy() throws Exception { @@ -67,6 +92,173 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest { testMaxIOConfig(abfsConfig); } + @Test + public void testThrottlingIntercept() throws Exception { + AzureBlobFileSystem fs = getFileSystem(); + final Configuration configuration = new Configuration(); + configuration.addResource(TEST_CONFIGURATION_FILE_NAME); + configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, false); + + // On disabling throttling AbfsNoOpThrottlingIntercept object is returned + AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, + "dummy.dfs.core.windows.net"); + AbfsThrottlingIntercept intercept; + AbfsClient abfsClient = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration); + intercept = abfsClient.getIntercept(); + Assertions.assertThat(intercept) + .describedAs("AbfsNoOpThrottlingIntercept instance expected") + .isInstanceOf(AbfsNoOpThrottlingIntercept.class); + + configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, true); + configuration.setBoolean(FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED, true); + // On disabling throttling AbfsClientThrottlingIntercept object is returned + AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration, + "dummy1.dfs.core.windows.net"); + AbfsClient abfsClient1 = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1); + intercept = abfsClient1.getIntercept(); + Assertions.assertThat(intercept) + .describedAs("AbfsClientThrottlingIntercept instance expected") + .isInstanceOf(AbfsClientThrottlingIntercept.class); + } + + @Test + public void testCreateMultipleAccountThrottling() throws Exception { + Configuration config = new Configuration(getRawConfiguration()); + String accountName = config.get(FS_AZURE_ACCOUNT_NAME); + if (accountName == null) { + // check if accountName is set using different config key + accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME); + } + assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME, + accountName != null && !accountName.isEmpty()); + + Configuration rawConfig1 = new Configuration(); + rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME); + + AbfsRestOperation successOp = mock(AbfsRestOperation.class); + AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class); + when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR); + when(successOp.getResult()).thenReturn(http500Op); + + AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class); + when(configuration.getAnalysisPeriod()).thenReturn(ANALYSIS_PERIOD); + when(configuration.isAutoThrottlingEnabled()).thenReturn(true); + when(configuration.accountThrottlingEnabled()).thenReturn(false); + + AbfsThrottlingIntercept instance1 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration); + String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME); + + assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME, + accountName1 != null && !accountName1.isEmpty()); + + AbfsThrottlingIntercept instance2 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration); + //if singleton is enabled, for different accounts both the instances should return same value + Assertions.assertThat(instance1) + .describedAs( + "if singleton is enabled, for different accounts both the instances should return same value") + .isEqualTo(instance2); + + when(configuration.accountThrottlingEnabled()).thenReturn(true); + AbfsThrottlingIntercept instance3 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration); + AbfsThrottlingIntercept instance4 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration); + AbfsThrottlingIntercept instance5 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration); + //if singleton is not enabled, for different accounts instances should return different value + Assertions.assertThat(instance3) + .describedAs( + "iff singleton is not enabled, for different accounts instances should return different value") + .isNotEqualTo(instance4); + + //if singleton is not enabled, for same accounts instances should return same value + Assertions.assertThat(instance3) + .describedAs( + "if singleton is not enabled, for same accounts instances should return same value") + .isEqualTo(instance5); + } + + @Test + public void testOperationOnAccountIdle() throws Exception { + //Get the filesystem. + AzureBlobFileSystem fs = getFileSystem(); + AbfsClient client = fs.getAbfsStore().getClient(); + AbfsConfiguration configuration1 = client.getAbfsConfiguration(); + Assume.assumeTrue(configuration1.isAutoThrottlingEnabled()); + Assume.assumeTrue(configuration1.accountThrottlingEnabled()); + + AbfsClientThrottlingIntercept accountIntercept + = (AbfsClientThrottlingIntercept) client.getIntercept(); + final byte[] b = new byte[2 * MIN_BUFFER_SIZE]; + new Random().nextBytes(b); + + Path testPath = path(TEST_PATH); + + //Do an operation on the filesystem. + try (FSDataOutputStream stream = fs.create(testPath)) { + stream.write(b); + } + + //Don't perform any operation on the account. + int sleepTime = (int) ((getAbfsConfig().getAccountOperationIdleTimeout()) * MULTIPLYING_FACTOR); + Thread.sleep(sleepTime); + + try (FSDataInputStream streamRead = fs.open(testPath)) { + streamRead.read(b); + } + + //Perform operations on another account. + AzureBlobFileSystem fs1 = new AzureBlobFileSystem(); + Configuration config = new Configuration(getRawConfiguration()); + String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME); + assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME, + accountName1 != null && !accountName1.isEmpty()); + final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + accountName1; + URI defaultUri1 = null; + defaultUri1 = new URI("abfss", abfsUrl1, null, null, null); + fs1.initialize(defaultUri1, getRawConfiguration()); + AbfsClient client1 = fs1.getAbfsStore().getClient(); + AbfsClientThrottlingIntercept accountIntercept1 + = (AbfsClientThrottlingIntercept) client1.getIntercept(); + try (FSDataOutputStream stream1 = fs1.create(testPath)) { + stream1.write(b); + } + + //Verify the write analyzer for first account is idle but the read analyzer is not idle. + Assertions.assertThat(accountIntercept.getWriteThrottler() + .getIsOperationOnAccountIdle() + .get()) + .describedAs("Write analyzer for first account should be idle the first time") + .isTrue(); + + Assertions.assertThat( + accountIntercept.getReadThrottler() + .getIsOperationOnAccountIdle() + .get()) + .describedAs("Read analyzer for first account should not be idle") + .isFalse(); + + //Verify the write analyzer for second account is not idle. + Assertions.assertThat( + accountIntercept1.getWriteThrottler() + .getIsOperationOnAccountIdle() + .get()) + .describedAs("Write analyzer for second account should not be idle") + .isFalse(); + + //Again perform an operation on the first account. + try (FSDataOutputStream stream2 = fs.create(testPath)) { + stream2.write(b); + } + + //Verify the write analyzer on first account is not idle. + Assertions.assertThat( + + accountIntercept.getWriteThrottler() + .getIsOperationOnAccountIdle() + .get()) + .describedAs( + "Write analyzer for first account should not be idle second time") + .isFalse(); + } + @Test public void testAbfsConfigConstructor() throws Exception { // Ensure we choose expected values that are not defaults