HADOOP-18457. ABFS: Support account level throttling (#5034)

This allows  abfs request throttling to be shared across all
abfs connections talking to containers belonging to the same abfs storage
account -as that is the level at which IO throttling is applied.

The option is enabled/disabled in the configuration option
"fs.azure.account.throttling.enabled";
The default is "true"

Contributed by Anmol Asrani
This commit is contained in:
Anmol Asrani 2022-11-30 18:35:31 +05:30 committed by Steve Loughran
parent 19468c1232
commit 98aac2a15c
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
18 changed files with 796 additions and 92 deletions

View File

@ -117,6 +117,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
private boolean accountThrottlingEnabled;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE,
@ -260,6 +264,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
private int accountOperationIdleTimeout;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ANALYSIS_PERIOD,
DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
private int analysisPeriod;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
MinValue = 0,
DefaultValue = RATE_LIMIT_DEFAULT)
@ -694,6 +706,10 @@ public class AbfsConfiguration{
return this.azureAppendBlobDirs;
}
public boolean accountThrottlingEnabled() {
return accountThrottlingEnabled;
}
public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs;
}
@ -736,6 +752,14 @@ public class AbfsConfiguration{
return this.enableAutoThrottling;
}
public int getAccountOperationIdleTimeout() {
return accountOperationIdleTimeout;
}
public int getAnalysisPeriod() {
return analysisPeriod;
}
public int getRateLimit() {
return rateLimit;
}

View File

@ -55,7 +55,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.classification.InterfaceStability;
@ -225,7 +224,6 @@ public class AzureBlobFileSystem extends FileSystem
}
}
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
}

View File

@ -38,6 +38,7 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled";
// Retry strategy defined by the user
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
@ -116,6 +117,8 @@ public final class ConfigurationKeys {
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout";
public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
/** This config ensures that during create overwrite an existing file will be

View File

@ -94,6 +94,9 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
= DelegatingSSLSocketFactory.SSLChannelMode.Default;

View File

@ -101,6 +101,7 @@ public class AbfsClient implements Closeable {
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
private final AbfsThrottlingIntercept intercept;
private final ListeningScheduledExecutorService executorService;
@ -120,6 +121,7 @@ public class AbfsClient implements Closeable {
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
String encryptionKey = this.abfsConfiguration
.getClientProvidedEncryptionKey();
@ -216,6 +218,10 @@ public class AbfsClient implements Closeable {
return sharedKeyCredentials;
}
AbfsThrottlingIntercept getIntercept() {
return intercept;
}
List<AbfsHttpHeader> createDefaultHeaders() {
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
@ -1277,6 +1283,14 @@ public class AbfsClient implements Closeable {
return abfsCounters;
}
/**
* Getter for abfsConfiguration from AbfsClient.
* @return AbfsConfiguration instance
*/
protected AbfsConfiguration getAbfsConfiguration() {
return abfsConfiguration;
}
public int getNumLeaseThreads() {
return abfsConfiguration.getNumLeaseThreads();
}

View File

@ -20,20 +20,23 @@ package org.apache.hadoop.fs.azurebfs.services;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.util.Time.now;
class AbfsClientThrottlingAnalyzer {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingAnalyzer.class);
private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
@ -50,42 +53,38 @@ class AbfsClientThrottlingAnalyzer {
private String name = null;
private Timer timer = null;
private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
private AtomicLong lastExecutionTime = null;
private final AtomicBoolean isOperationOnAccountIdle = new AtomicBoolean(false);
private AbfsConfiguration abfsConfiguration = null;
private boolean accountLevelThrottlingEnabled = true;
private AbfsClientThrottlingAnalyzer() {
// hide default constructor
}
/**
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
* the specified name.
*
* @param name a name used to identify this instance.
* @throws IllegalArgumentException if name is null or empty.
*/
AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
this(name, DEFAULT_ANALYSIS_PERIOD_MS);
}
/**
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
* the specified name and period.
*
* @param name A name used to identify this instance.
* @param period The frequency, in milliseconds, at which metrics are
* analyzed.
* @param abfsConfiguration The configuration set.
* @throws IllegalArgumentException If name is null or empty.
* If period is less than 1000 or greater than 30000 milliseconds.
*/
AbfsClientThrottlingAnalyzer(String name, int period)
AbfsClientThrottlingAnalyzer(String name, AbfsConfiguration abfsConfiguration)
throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotEmpty(name),
"The argument 'name' cannot be null or empty.");
int period = abfsConfiguration.getAnalysisPeriod();
Preconditions.checkArgument(
period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
"The argument 'period' must be between 1000 and 30000.");
this.name = name;
this.analysisPeriodMs = period;
this.abfsConfiguration = abfsConfiguration;
this.accountLevelThrottlingEnabled = abfsConfiguration.accountThrottlingEnabled();
this.analysisPeriodMs = abfsConfiguration.getAnalysisPeriod();
this.lastExecutionTime = new AtomicLong(now());
this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
new AbfsOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer(
@ -95,6 +94,47 @@ class AbfsClientThrottlingAnalyzer {
analysisPeriodMs);
}
/**
* Resumes the timer if it was stopped.
*/
private void resumeTimer() {
blobMetrics = new AtomicReference<AbfsOperationMetrics>(
new AbfsOperationMetrics(System.currentTimeMillis()));
timer.schedule(new TimerTaskImpl(),
analysisPeriodMs,
analysisPeriodMs);
isOperationOnAccountIdle.set(false);
}
/**
* Synchronized method to suspend or resume timer.
* @param timerFunctionality resume or suspend.
* @param timerTask The timertask object.
* @return true or false.
*/
private synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,
TimerTask timerTask) {
switch (timerFunctionality) {
case RESUME:
if (isOperationOnAccountIdle.get()) {
resumeTimer();
}
break;
case SUSPEND:
if (accountLevelThrottlingEnabled && (System.currentTimeMillis()
- lastExecutionTime.get() >= getOperationIdleTimeout())) {
isOperationOnAccountIdle.set(true);
timerTask.cancel();
timer.purge();
return true;
}
break;
default:
break;
}
return false;
}
/**
* Updates metrics with results from the current storage operation.
*
@ -104,12 +144,13 @@ class AbfsClientThrottlingAnalyzer {
public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) {
metrics.bytesFailed.addAndGet(count);
metrics.operationsFailed.incrementAndGet();
metrics.addBytesFailed(count);
metrics.incrementOperationsFailed();
} else {
metrics.bytesSuccessful.addAndGet(count);
metrics.operationsSuccessful.incrementAndGet();
metrics.addBytesSuccessful(count);
metrics.incrementOperationsSuccessful();
}
blobMetrics.set(metrics);
}
/**
@ -117,6 +158,8 @@ class AbfsClientThrottlingAnalyzer {
* @return true if Thread sleeps(Throttling occurs) else false.
*/
public boolean suspendIfNecessary() {
lastExecutionTime.set(now());
timerOrchestrator(TimerFunctionality.RESUME, null);
int duration = sleepDuration;
if (duration > 0) {
try {
@ -134,19 +177,27 @@ class AbfsClientThrottlingAnalyzer {
return sleepDuration;
}
int getOperationIdleTimeout() {
return abfsConfiguration.getAccountOperationIdleTimeout();
}
AtomicBoolean getIsOperationOnAccountIdle() {
return isOperationOnAccountIdle;
}
private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
int sleepDuration) {
final double percentageConversionFactor = 100;
double bytesFailed = metrics.bytesFailed.get();
double bytesSuccessful = metrics.bytesSuccessful.get();
double operationsFailed = metrics.operationsFailed.get();
double operationsSuccessful = metrics.operationsSuccessful.get();
double bytesFailed = metrics.getBytesFailed().get();
double bytesSuccessful = metrics.getBytesSuccessful().get();
double operationsFailed = metrics.getOperationsFailed().get();
double operationsSuccessful = metrics.getOperationsSuccessful().get();
double errorPercentage = (bytesFailed <= 0)
? 0
: (percentageConversionFactor
* bytesFailed
/ (bytesFailed + bytesSuccessful));
long periodMs = metrics.endTime - metrics.startTime;
long periodMs = metrics.getEndTime() - metrics.getStartTime();
double newSleepDuration;
@ -238,10 +289,13 @@ class AbfsClientThrottlingAnalyzer {
}
long now = System.currentTimeMillis();
if (now - blobMetrics.get().startTime >= analysisPeriodMs) {
if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
return;
}
if (now - blobMetrics.get().getStartTime() >= analysisPeriodMs) {
AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
new AbfsOperationMetrics(now));
oldMetrics.endTime = now;
oldMetrics.setEndTime(now);
sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
sleepDuration);
}
@ -252,24 +306,4 @@ class AbfsClientThrottlingAnalyzer {
}
}
}
/**
* Stores Abfs operation metrics during each analysis period.
*/
static class AbfsOperationMetrics {
private AtomicLong bytesFailed;
private AtomicLong bytesSuccessful;
private AtomicLong operationsFailed;
private AtomicLong operationsSuccessful;
private long endTime;
private long startTime;
AbfsOperationMetrics(long startTime) {
this.startTime = startTime;
this.bytesFailed = new AtomicLong();
this.bytesSuccessful = new AtomicLong();
this.operationsFailed = new AtomicLong();
this.operationsSuccessful = new AtomicLong();
}
}
}

View File

@ -19,10 +19,12 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.net.HttpURLConnection;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
@ -38,35 +40,89 @@ import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
* and sleeps just enough to minimize errors, allowing optimal ingress and/or
* egress throughput.
*/
public final class AbfsClientThrottlingIntercept {
public final class AbfsClientThrottlingIntercept implements AbfsThrottlingIntercept {
private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingIntercept.class);
private static final String RANGE_PREFIX = "bytes=";
private static AbfsClientThrottlingIntercept singleton = null;
private AbfsClientThrottlingAnalyzer readThrottler = null;
private AbfsClientThrottlingAnalyzer writeThrottler = null;
private static boolean isAutoThrottlingEnabled = false;
private static AbfsClientThrottlingIntercept singleton; // singleton, initialized in static initialization block
private static final ReentrantLock LOCK = new ReentrantLock();
private final AbfsClientThrottlingAnalyzer readThrottler;
private final AbfsClientThrottlingAnalyzer writeThrottler;
private final String accountName;
// Hide default constructor
private AbfsClientThrottlingIntercept() {
readThrottler = new AbfsClientThrottlingAnalyzer("read");
writeThrottler = new AbfsClientThrottlingAnalyzer("write");
public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsConfiguration) {
this.accountName = accountName;
this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
this.writeThrottler = setAnalyzer("write " + accountName, abfsConfiguration);
LOG.debug("Client-side throttling is enabled for the ABFS file system for the account : {}", accountName);
}
public static synchronized void initializeSingleton(boolean enableAutoThrottling) {
if (!enableAutoThrottling) {
return;
}
// Hide default constructor
private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
//Account name is kept as empty as same instance is shared across all accounts
this.accountName = "";
this.readThrottler = setAnalyzer("read", abfsConfiguration);
this.writeThrottler = setAnalyzer("write", abfsConfiguration);
LOG.debug("Client-side throttling is enabled for the ABFS file system using singleton intercept");
}
/**
* Sets the analyzer for the intercept.
* @param name Name of the analyzer.
* @param abfsConfiguration The configuration.
* @return AbfsClientThrottlingAnalyzer instance.
*/
private AbfsClientThrottlingAnalyzer setAnalyzer(String name, AbfsConfiguration abfsConfiguration) {
return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
}
/**
* Returns the analyzer for read operations.
* @return AbfsClientThrottlingAnalyzer for read.
*/
AbfsClientThrottlingAnalyzer getReadThrottler() {
return readThrottler;
}
/**
* Returns the analyzer for write operations.
* @return AbfsClientThrottlingAnalyzer for write.
*/
AbfsClientThrottlingAnalyzer getWriteThrottler() {
return writeThrottler;
}
/**
* Creates a singleton object of the AbfsClientThrottlingIntercept.
* which is shared across all filesystem instances.
* @param abfsConfiguration configuration set.
* @return singleton object of intercept.
*/
static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsConfiguration) {
if (singleton == null) {
singleton = new AbfsClientThrottlingIntercept();
isAutoThrottlingEnabled = true;
LOG.debug("Client-side throttling is enabled for the ABFS file system.");
LOCK.lock();
try {
if (singleton == null) {
singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
LOG.debug("Client-side throttling is enabled for the ABFS file system.");
}
} finally {
LOCK.unlock();
}
}
return singleton;
}
static void updateMetrics(AbfsRestOperationType operationType,
AbfsHttpOperation abfsHttpOperation) {
if (!isAutoThrottlingEnabled || abfsHttpOperation == null) {
/**
* Updates the metrics for successful and failed read and write operations.
* @param operationType Only applicable for read and write operations.
* @param abfsHttpOperation Used for status code and data transferred.
*/
@Override
public void updateMetrics(AbfsRestOperationType operationType,
AbfsHttpOperation abfsHttpOperation) {
if (abfsHttpOperation == null) {
return;
}
@ -82,7 +138,7 @@ public final class AbfsClientThrottlingIntercept {
case Append:
contentLength = abfsHttpOperation.getBytesSent();
if (contentLength > 0) {
singleton.writeThrottler.addBytesTransferred(contentLength,
writeThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
@ -90,7 +146,7 @@ public final class AbfsClientThrottlingIntercept {
String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
contentLength = getContentLengthIfKnown(range);
if (contentLength > 0) {
singleton.readThrottler.addBytesTransferred(contentLength,
readThrottler.addBytesTransferred(contentLength,
isFailedOperation);
}
break;
@ -104,21 +160,18 @@ public final class AbfsClientThrottlingIntercept {
* uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput.
*/
static void sendingRequest(AbfsRestOperationType operationType,
@Override
public void sendingRequest(AbfsRestOperationType operationType,
AbfsCounters abfsCounters) {
if (!isAutoThrottlingEnabled) {
return;
}
switch (operationType) {
case ReadFile:
if (singleton.readThrottler.suspendIfNecessary()
if (readThrottler.suspendIfNecessary()
&& abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
}
break;
case Append:
if (singleton.writeThrottler.suspendIfNecessary()
if (writeThrottler.suspendIfNecessary()
&& abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
}

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept {
public static final AbfsNoOpThrottlingIntercept INSTANCE = new AbfsNoOpThrottlingIntercept();
private AbfsNoOpThrottlingIntercept() {
}
@Override
public void updateMetrics(final AbfsRestOperationType operationType,
final AbfsHttpOperation abfsHttpOperation) {
}
@Override
public void sendingRequest(final AbfsRestOperationType operationType,
final AbfsCounters abfsCounters) {
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.concurrent.atomic.AtomicLong;
/**
* Stores Abfs operation metrics during each analysis period.
*/
class AbfsOperationMetrics {
/**
* No of bytes which could not be transferred due to a failed operation.
*/
private final AtomicLong bytesFailed;
/**
* No of bytes successfully transferred during a successful operation.
*/
private final AtomicLong bytesSuccessful;
/**
* Total no of failed operations.
*/
private final AtomicLong operationsFailed;
/**
* Total no of successful operations.
*/
private final AtomicLong operationsSuccessful;
/**
* Time when collection of metrics ended.
*/
private long endTime;
/**
* Time when the collection of metrics started.
*/
private final long startTime;
AbfsOperationMetrics(long startTime) {
this.startTime = startTime;
this.bytesFailed = new AtomicLong();
this.bytesSuccessful = new AtomicLong();
this.operationsFailed = new AtomicLong();
this.operationsSuccessful = new AtomicLong();
}
/**
*
* @return bytes failed to transfer.
*/
AtomicLong getBytesFailed() {
return bytesFailed;
}
/**
*
* @return bytes successfully transferred.
*/
AtomicLong getBytesSuccessful() {
return bytesSuccessful;
}
/**
*
* @return no of operations failed.
*/
AtomicLong getOperationsFailed() {
return operationsFailed;
}
/**
*
* @return no of successful operations.
*/
AtomicLong getOperationsSuccessful() {
return operationsSuccessful;
}
/**
*
* @return end time of metric collection.
*/
long getEndTime() {
return endTime;
}
/**
*
* @param endTime sets the end time.
*/
void setEndTime(final long endTime) {
this.endTime = endTime;
}
/**
*
* @return start time of metric collection.
*/
long getStartTime() {
return startTime;
}
void addBytesFailed(long bytes) {
this.getBytesFailed().addAndGet(bytes);
}
void addBytesSuccessful(long bytes) {
this.getBytesSuccessful().addAndGet(bytes);
}
void incrementOperationsFailed() {
this.getOperationsFailed().incrementAndGet();
}
void incrementOperationsSuccessful() {
this.getOperationsSuccessful().incrementAndGet();
}
}

View File

@ -45,6 +45,8 @@ public class AbfsRestOperation {
private final AbfsRestOperationType operationType;
// Blob FS client, which has the credentials, retry policy, and logs.
private final AbfsClient client;
// Return intercept instance
private final AbfsThrottlingIntercept intercept;
// the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
private final String method;
// full URL including query parameters
@ -145,6 +147,7 @@ public class AbfsRestOperation {
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
this.sasToken = sasToken;
this.abfsCounters = client.getAbfsCounters();
this.intercept = client.getIntercept();
}
/**
@ -241,7 +244,8 @@ public class AbfsRestOperation {
*/
private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsHttpOperation httpOperation = null;
AbfsHttpOperation httpOperation;
try {
// initialize the HTTP request and open the connection
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
@ -278,8 +282,7 @@ public class AbfsRestOperation {
// dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties());
AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
intercept.sendingRequest(operationType, abfsCounters);
if (hasRequestBody) {
// HttpUrlConnection requires
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
@ -317,7 +320,7 @@ public class AbfsRestOperation {
return false;
} finally {
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation);
intercept.updateMetrics(operationType, httpOperation);
}
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation.toString());

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* An interface for Abfs Throttling Interface.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface AbfsThrottlingIntercept {
/**
* Updates the metrics for successful and failed read and write operations.
* @param operationType Only applicable for read and write operations.
* @param abfsHttpOperation Used for status code and data transferred.
*/
void updateMetrics(AbfsRestOperationType operationType,
AbfsHttpOperation abfsHttpOperation);
/**
* Called before the request is sent. Client-side throttling
* uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput.
* @param operationType Only applicable for read and write operations.
* @param abfsCounters Used for counters.
*/
void sendingRequest(AbfsRestOperationType operationType,
AbfsCounters abfsCounters);
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.util.WeakReferenceMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class to get an instance of throttling intercept class per account.
*/
final class AbfsThrottlingInterceptFactory {
private AbfsThrottlingInterceptFactory() {
}
private static AbfsConfiguration abfsConfig;
/**
* List of references notified of loss.
*/
private static List<String> lostReferences = new ArrayList<>();
private static final Logger LOG = LoggerFactory.getLogger(
AbfsThrottlingInterceptFactory.class);
/**
* Map which stores instance of ThrottlingIntercept class per account.
*/
private static WeakReferenceMap<String, AbfsThrottlingIntercept>
interceptMap = new WeakReferenceMap<>(
AbfsThrottlingInterceptFactory::factory,
AbfsThrottlingInterceptFactory::referenceLost);
/**
* Returns instance of throttling intercept.
* @param accountName Account name.
* @return instance of throttling intercept.
*/
private static AbfsClientThrottlingIntercept factory(final String accountName) {
return new AbfsClientThrottlingIntercept(accountName, abfsConfig);
}
/**
* Reference lost callback.
* @param accountName key lost.
*/
private static void referenceLost(String accountName) {
lostReferences.add(accountName);
}
/**
* Returns an instance of AbfsThrottlingIntercept.
*
* @param accountName The account for which we need instance of throttling intercept.
@param abfsConfiguration The object of abfsconfiguration class.
* @return Instance of AbfsThrottlingIntercept.
*/
static synchronized AbfsThrottlingIntercept getInstance(String accountName,
AbfsConfiguration abfsConfiguration) {
abfsConfig = abfsConfiguration;
AbfsThrottlingIntercept intercept;
if (!abfsConfiguration.isAutoThrottlingEnabled()) {
return AbfsNoOpThrottlingIntercept.INSTANCE;
}
// If singleton is enabled use a static instance of the intercept class for all accounts
if (!abfsConfiguration.accountThrottlingEnabled()) {
intercept = AbfsClientThrottlingIntercept.initializeSingleton(
abfsConfiguration);
} else {
// Return the instance from the map
intercept = interceptMap.get(accountName);
if (intercept == null) {
intercept = new AbfsClientThrottlingIntercept(accountName,
abfsConfiguration);
interceptMap.put(accountName, intercept);
}
}
return intercept;
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
public enum TimerFunctionality {
RESUME,
SUSPEND
}

View File

@ -767,6 +767,12 @@ Hflush() being the only documented API that can provide persistent data
transfer, Flush() also attempting to persist buffered data will lead to
performance issues.
### <a name="accountlevelthrottlingoptions"></a> Account level throttling Options
`fs.azure.account.operation.idle.timeout`: This value specifies the time after which the timer for the analyzer (read or
write) should be paused until no new request is made again. The default value for the same is 60 seconds.
### <a name="hnscheckconfigoptions"></a> HNS Check Options
Config `fs.azure.account.hns.enabled` provides an option to specify whether
the storage account is HNS enabled or not. In case the config is not provided,
@ -877,6 +883,9 @@ when there are too many writes from the same process.
tuned with this config considering each queued request holds a buffer. Set
the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
`fs.azure.analysis.period`: The time after which sleep duration is recomputed after analyzing metrics. The default value
for the same is 10 seconds.
### <a name="securityconfigoptions"></a> Security Options
`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
is made true. Irrespective of the flag, `AbfsClient` will use HTTPS if the secure

View File

@ -24,6 +24,9 @@ package org.apache.hadoop.fs.azurebfs.constants;
public final class TestConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_NAME = "fs.azure.account.name";
public static final String FS_AZURE_ABFS_ACCOUNT_NAME = "fs.azure.abfs.account.name";
public static final String FS_AZURE_ABFS_ACCOUNT1_NAME = "fs.azure.abfs.account1.name";
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key";
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";

View File

@ -306,6 +306,11 @@ public final class TestAbfsClient {
when(client.getAccessToken()).thenCallRealMethod();
when(client.getSharedKeyCredentials()).thenCallRealMethod();
when(client.createDefaultHeaders()).thenCallRealMethod();
when(client.getAbfsConfiguration()).thenReturn(abfsConfig);
when(client.getIntercept()).thenReturn(
AbfsThrottlingInterceptFactory.getInstance(
abfsConfig.getAccountName().substring(0,
abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
// override baseurl
client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",

View File

@ -18,9 +18,15 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Test;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ANALYSIS_PERIOD;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -33,6 +39,15 @@ public class TestAbfsClientThrottlingAnalyzer {
+ ANALYSIS_PERIOD / 10;
private static final long MEGABYTE = 1024 * 1024;
private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
private AbfsConfiguration abfsConfiguration;
public TestAbfsClientThrottlingAnalyzer() throws IOException, IllegalAccessException {
final Configuration configuration = new Configuration();
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
configuration.setInt(FS_AZURE_ANALYSIS_PERIOD, 1000);
this.abfsConfiguration = new AbfsConfiguration(configuration,
"dummy");
}
private void sleep(long milliseconds) {
try {
@ -82,8 +97,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testNoMetricUpdatesThenNoWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
"test", abfsConfiguration);
validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
validate(0, analyzer.getSleepDuration());
@ -96,8 +110,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testOnlySuccessThenNoWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
"test", abfsConfiguration);
analyzer.addBytesTransferred(8 * MEGABYTE, false);
validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
@ -112,8 +125,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testOnlyErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
"test", abfsConfiguration);
validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(4 * MEGABYTE, true);
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
@ -132,8 +144,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testSuccessAndErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
"test", abfsConfiguration);
validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(8 * MEGABYTE, false);
analyzer.addBytesTransferred(2 * MEGABYTE, true);
@ -157,8 +168,7 @@ public class TestAbfsClientThrottlingAnalyzer {
@Test
public void testManySuccessAndErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test",
ANALYSIS_PERIOD);
"test", abfsConfiguration);
validate(0, analyzer.getSleepDuration());
final int numberOfRequests = 20;
for (int i = 0; i < numberOfRequests; i++) {

View File

@ -18,13 +18,35 @@
package org.apache.hadoop.fs.azurebfs.services;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_AUTOTHROTTLING;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT1_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.fs.FSDataInputStream;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.mockito.Mockito;
import java.net.URI;
import java.util.Random;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.junit.Assert;
import org.junit.Test;
@ -41,6 +63,9 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
private final int noRetryCount = 0;
private final int retryCount = new Random().nextInt(maxRetryCount);
private final int retryCountBeyondMax = maxRetryCount + 1;
private static final String TEST_PATH = "/testfile";
private static final double MULTIPLYING_FACTOR = 1.5;
private static final int ANALYSIS_PERIOD = 10000;
public TestExponentialRetryPolicy() throws Exception {
@ -67,6 +92,173 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
testMaxIOConfig(abfsConfig);
}
@Test
public void testThrottlingIntercept() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
final Configuration configuration = new Configuration();
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, false);
// On disabling throttling AbfsNoOpThrottlingIntercept object is returned
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
"dummy.dfs.core.windows.net");
AbfsThrottlingIntercept intercept;
AbfsClient abfsClient = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
intercept = abfsClient.getIntercept();
Assertions.assertThat(intercept)
.describedAs("AbfsNoOpThrottlingIntercept instance expected")
.isInstanceOf(AbfsNoOpThrottlingIntercept.class);
configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, true);
configuration.setBoolean(FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED, true);
// On disabling throttling AbfsClientThrottlingIntercept object is returned
AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration,
"dummy1.dfs.core.windows.net");
AbfsClient abfsClient1 = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
intercept = abfsClient1.getIntercept();
Assertions.assertThat(intercept)
.describedAs("AbfsClientThrottlingIntercept instance expected")
.isInstanceOf(AbfsClientThrottlingIntercept.class);
}
@Test
public void testCreateMultipleAccountThrottling() throws Exception {
Configuration config = new Configuration(getRawConfiguration());
String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
if (accountName == null) {
// check if accountName is set using different config key
accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
}
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
accountName != null && !accountName.isEmpty());
Configuration rawConfig1 = new Configuration();
rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
AbfsRestOperation successOp = mock(AbfsRestOperation.class);
AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
when(successOp.getResult()).thenReturn(http500Op);
AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
when(configuration.getAnalysisPeriod()).thenReturn(ANALYSIS_PERIOD);
when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
when(configuration.accountThrottlingEnabled()).thenReturn(false);
AbfsThrottlingIntercept instance1 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
accountName1 != null && !accountName1.isEmpty());
AbfsThrottlingIntercept instance2 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
//if singleton is enabled, for different accounts both the instances should return same value
Assertions.assertThat(instance1)
.describedAs(
"if singleton is enabled, for different accounts both the instances should return same value")
.isEqualTo(instance2);
when(configuration.accountThrottlingEnabled()).thenReturn(true);
AbfsThrottlingIntercept instance3 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
AbfsThrottlingIntercept instance4 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
AbfsThrottlingIntercept instance5 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
//if singleton is not enabled, for different accounts instances should return different value
Assertions.assertThat(instance3)
.describedAs(
"iff singleton is not enabled, for different accounts instances should return different value")
.isNotEqualTo(instance4);
//if singleton is not enabled, for same accounts instances should return same value
Assertions.assertThat(instance3)
.describedAs(
"if singleton is not enabled, for same accounts instances should return same value")
.isEqualTo(instance5);
}
@Test
public void testOperationOnAccountIdle() throws Exception {
//Get the filesystem.
AzureBlobFileSystem fs = getFileSystem();
AbfsClient client = fs.getAbfsStore().getClient();
AbfsConfiguration configuration1 = client.getAbfsConfiguration();
Assume.assumeTrue(configuration1.isAutoThrottlingEnabled());
Assume.assumeTrue(configuration1.accountThrottlingEnabled());
AbfsClientThrottlingIntercept accountIntercept
= (AbfsClientThrottlingIntercept) client.getIntercept();
final byte[] b = new byte[2 * MIN_BUFFER_SIZE];
new Random().nextBytes(b);
Path testPath = path(TEST_PATH);
//Do an operation on the filesystem.
try (FSDataOutputStream stream = fs.create(testPath)) {
stream.write(b);
}
//Don't perform any operation on the account.
int sleepTime = (int) ((getAbfsConfig().getAccountOperationIdleTimeout()) * MULTIPLYING_FACTOR);
Thread.sleep(sleepTime);
try (FSDataInputStream streamRead = fs.open(testPath)) {
streamRead.read(b);
}
//Perform operations on another account.
AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
Configuration config = new Configuration(getRawConfiguration());
String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
accountName1 != null && !accountName1.isEmpty());
final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + accountName1;
URI defaultUri1 = null;
defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
fs1.initialize(defaultUri1, getRawConfiguration());
AbfsClient client1 = fs1.getAbfsStore().getClient();
AbfsClientThrottlingIntercept accountIntercept1
= (AbfsClientThrottlingIntercept) client1.getIntercept();
try (FSDataOutputStream stream1 = fs1.create(testPath)) {
stream1.write(b);
}
//Verify the write analyzer for first account is idle but the read analyzer is not idle.
Assertions.assertThat(accountIntercept.getWriteThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs("Write analyzer for first account should be idle the first time")
.isTrue();
Assertions.assertThat(
accountIntercept.getReadThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs("Read analyzer for first account should not be idle")
.isFalse();
//Verify the write analyzer for second account is not idle.
Assertions.assertThat(
accountIntercept1.getWriteThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs("Write analyzer for second account should not be idle")
.isFalse();
//Again perform an operation on the first account.
try (FSDataOutputStream stream2 = fs.create(testPath)) {
stream2.write(b);
}
//Verify the write analyzer on first account is not idle.
Assertions.assertThat(
accountIntercept.getWriteThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs(
"Write analyzer for first account should not be idle second time")
.isFalse();
}
@Test
public void testAbfsConfigConstructor() throws Exception {
// Ensure we choose expected values that are not defaults