HADOOP-18457. ABFS: Support account level throttling (#5034)

This allows  abfs request throttling to be shared across all 
abfs connections talking to containers belonging to the same abfs storage
account -as that is the level at which IO throttling is applied.

The option is enabled/disabled in the configuration option 
"fs.azure.account.throttling.enabled";
The default is "true"

Contributed by Anmol Asrani
This commit is contained in:
Anmol Asrani 2022-11-30 18:35:31 +05:30 committed by GitHub
parent 72749a4ff8
commit 7786600744
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 794 additions and 90 deletions

View File

@ -117,6 +117,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ) DefaultValue = DEFAULT_OPTIMIZE_FOOTER_READ)
private boolean optimizeFooterRead; private boolean optimizeFooterRead;
@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED,
DefaultValue = DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED)
private boolean accountThrottlingEnabled;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE, @IntegerConfigurationValidatorAnnotation(ConfigurationKey = AZURE_READ_BUFFER_SIZE,
MinValue = MIN_BUFFER_SIZE, MinValue = MIN_BUFFER_SIZE,
MaxValue = MAX_BUFFER_SIZE, MaxValue = MAX_BUFFER_SIZE,
@ -260,6 +264,14 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING) DefaultValue = DEFAULT_ENABLE_AUTOTHROTTLING)
private boolean enableAutoThrottling; private boolean enableAutoThrottling;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT,
DefaultValue = DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS)
private int accountOperationIdleTimeout;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ANALYSIS_PERIOD,
DefaultValue = DEFAULT_ANALYSIS_PERIOD_MS)
private int analysisPeriod;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT, @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ABFS_IO_RATE_LIMIT,
MinValue = 0, MinValue = 0,
DefaultValue = RATE_LIMIT_DEFAULT) DefaultValue = RATE_LIMIT_DEFAULT)
@ -694,6 +706,10 @@ public String getAppendBlobDirs() {
return this.azureAppendBlobDirs; return this.azureAppendBlobDirs;
} }
public boolean accountThrottlingEnabled() {
return accountThrottlingEnabled;
}
public String getAzureInfiniteLeaseDirs() { public String getAzureInfiniteLeaseDirs() {
return this.azureInfiniteLeaseDirs; return this.azureInfiniteLeaseDirs;
} }
@ -736,6 +752,14 @@ public boolean isAutoThrottlingEnabled() {
return this.enableAutoThrottling; return this.enableAutoThrottling;
} }
public int getAccountOperationIdleTimeout() {
return accountOperationIdleTimeout;
}
public int getAnalysisPeriod() {
return analysisPeriod;
}
public int getRateLimit() { public int getRateLimit() {
return rateLimit; return rateLimit;
} }

View File

@ -55,7 +55,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename; import org.apache.hadoop.fs.azurebfs.commit.ResilientCommitByRename;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient; import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept;
import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator; import org.apache.hadoop.fs.azurebfs.services.AbfsListStatusRemoteIterator;
import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -225,7 +224,6 @@ public void initialize(URI uri, Configuration configuration)
} }
} }
AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled());
rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit()); rateLimiting = RateLimitingFactory.create(abfsConfiguration.getRateLimit());
LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri); LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
} }

View File

@ -38,6 +38,7 @@ public final class ConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key"; public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME = "fs.azure.account.key";
public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)"; public static final String FS_AZURE_ACCOUNT_KEY_PROPERTY_NAME_REGX = "fs\\.azure\\.account\\.key\\.(.*)";
public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode"; public static final String FS_AZURE_SECURE_MODE = "fs.azure.secure.mode";
public static final String FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = "fs.azure.account.throttling.enabled";
// Retry strategy defined by the user // Retry strategy defined by the user
public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval"; public static final String AZURE_MIN_BACKOFF_INTERVAL = "fs.azure.io.retry.min.backoff.interval";
@ -116,6 +117,8 @@ public final class ConfigurationKeys {
public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization"; public static final String AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION = "fs.azure.createRemoteFileSystemDuringInitialization";
public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization"; public static final String AZURE_SKIP_USER_GROUP_METADATA_DURING_INITIALIZATION = "fs.azure.skipUserGroupMetadataDuringInitialization";
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling"; public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ACCOUNT_OPERATION_IDLE_TIMEOUT = "fs.azure.account.operation.idle.timeout";
public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https"; public static final String FS_AZURE_ALWAYS_USE_HTTPS = "fs.azure.always.use.https";
public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key"; public static final String FS_AZURE_ATOMIC_RENAME_KEY = "fs.azure.atomic.rename.key";
/** This config ensures that during create overwrite an existing file will be /** This config ensures that during create overwrite an existing file will be

View File

@ -94,6 +94,9 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_FLUSH = true; public static final boolean DEFAULT_ENABLE_FLUSH = true;
public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true; public static final boolean DEFAULT_DISABLE_OUTPUTSTREAM_FLUSH = true;
public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true; public static final boolean DEFAULT_ENABLE_AUTOTHROTTLING = true;
public static final boolean DEFAULT_FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED = true;
public static final int DEFAULT_ACCOUNT_OPERATION_IDLE_TIMEOUT_MS = 60_000;
public static final int DEFAULT_ANALYSIS_PERIOD_MS = 10_000;
public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE public static final DelegatingSSLSocketFactory.SSLChannelMode DEFAULT_FS_AZURE_SSL_CHANNEL_MODE
= DelegatingSSLSocketFactory.SSLChannelMode.Default; = DelegatingSSLSocketFactory.SSLChannelMode.Default;

View File

@ -101,6 +101,7 @@ public class AbfsClient implements Closeable {
private AccessTokenProvider tokenProvider; private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider; private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters; private final AbfsCounters abfsCounters;
private final AbfsThrottlingIntercept intercept;
private final ListeningScheduledExecutorService executorService; private final ListeningScheduledExecutorService executorService;
@ -120,6 +121,7 @@ private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCreden
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy(); this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT)); this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName); this.authType = abfsConfiguration.getAuthType(accountName);
this.intercept = AbfsThrottlingInterceptFactory.getInstance(accountName, abfsConfiguration);
String encryptionKey = this.abfsConfiguration String encryptionKey = this.abfsConfiguration
.getClientProvidedEncryptionKey(); .getClientProvidedEncryptionKey();
@ -216,6 +218,10 @@ SharedKeyCredentials getSharedKeyCredentials() {
return sharedKeyCredentials; return sharedKeyCredentials;
} }
AbfsThrottlingIntercept getIntercept() {
return intercept;
}
List<AbfsHttpHeader> createDefaultHeaders() { List<AbfsHttpHeader> createDefaultHeaders() {
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>(); final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion)); requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
@ -1277,6 +1283,14 @@ protected AbfsCounters getAbfsCounters() {
return abfsCounters; return abfsCounters;
} }
/**
* Getter for abfsConfiguration from AbfsClient.
* @return AbfsConfiguration instance
*/
protected AbfsConfiguration getAbfsConfiguration() {
return abfsConfiguration;
}
public int getNumLeaseThreads() { public int getNumLeaseThreads() {
return abfsConfiguration.getNumLeaseThreads(); return abfsConfiguration.getNumLeaseThreads();
} }

View File

@ -20,20 +20,23 @@
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.hadoop.util.Time.now;
class AbfsClientThrottlingAnalyzer { class AbfsClientThrottlingAnalyzer {
private static final Logger LOG = LoggerFactory.getLogger( private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingAnalyzer.class); AbfsClientThrottlingAnalyzer.class);
private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000;
private static final int MIN_ANALYSIS_PERIOD_MS = 1000; private static final int MIN_ANALYSIS_PERIOD_MS = 1000;
private static final int MAX_ANALYSIS_PERIOD_MS = 30000; private static final int MAX_ANALYSIS_PERIOD_MS = 30000;
private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1; private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1;
@ -50,42 +53,38 @@ class AbfsClientThrottlingAnalyzer {
private String name = null; private String name = null;
private Timer timer = null; private Timer timer = null;
private AtomicReference<AbfsOperationMetrics> blobMetrics = null; private AtomicReference<AbfsOperationMetrics> blobMetrics = null;
private AtomicLong lastExecutionTime = null;
private final AtomicBoolean isOperationOnAccountIdle = new AtomicBoolean(false);
private AbfsConfiguration abfsConfiguration = null;
private boolean accountLevelThrottlingEnabled = true;
private AbfsClientThrottlingAnalyzer() { private AbfsClientThrottlingAnalyzer() {
// hide default constructor // hide default constructor
} }
/**
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
* the specified name.
*
* @param name a name used to identify this instance.
* @throws IllegalArgumentException if name is null or empty.
*/
AbfsClientThrottlingAnalyzer(String name) throws IllegalArgumentException {
this(name, DEFAULT_ANALYSIS_PERIOD_MS);
}
/** /**
* Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with * Creates an instance of the <code>AbfsClientThrottlingAnalyzer</code> class with
* the specified name and period. * the specified name and period.
* *
* @param name A name used to identify this instance. * @param name A name used to identify this instance.
* @param period The frequency, in milliseconds, at which metrics are * @param abfsConfiguration The configuration set.
* analyzed.
* @throws IllegalArgumentException If name is null or empty. * @throws IllegalArgumentException If name is null or empty.
* If period is less than 1000 or greater than 30000 milliseconds. * If period is less than 1000 or greater than 30000 milliseconds.
*/ */
AbfsClientThrottlingAnalyzer(String name, int period) AbfsClientThrottlingAnalyzer(String name, AbfsConfiguration abfsConfiguration)
throws IllegalArgumentException { throws IllegalArgumentException {
Preconditions.checkArgument( Preconditions.checkArgument(
StringUtils.isNotEmpty(name), StringUtils.isNotEmpty(name),
"The argument 'name' cannot be null or empty."); "The argument 'name' cannot be null or empty.");
int period = abfsConfiguration.getAnalysisPeriod();
Preconditions.checkArgument( Preconditions.checkArgument(
period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS, period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS,
"The argument 'period' must be between 1000 and 30000."); "The argument 'period' must be between 1000 and 30000.");
this.name = name; this.name = name;
this.analysisPeriodMs = period; this.abfsConfiguration = abfsConfiguration;
this.accountLevelThrottlingEnabled = abfsConfiguration.accountThrottlingEnabled();
this.analysisPeriodMs = abfsConfiguration.getAnalysisPeriod();
this.lastExecutionTime = new AtomicLong(now());
this.blobMetrics = new AtomicReference<AbfsOperationMetrics>( this.blobMetrics = new AtomicReference<AbfsOperationMetrics>(
new AbfsOperationMetrics(System.currentTimeMillis())); new AbfsOperationMetrics(System.currentTimeMillis()));
this.timer = new Timer( this.timer = new Timer(
@ -95,6 +94,47 @@ private AbfsClientThrottlingAnalyzer() {
analysisPeriodMs); analysisPeriodMs);
} }
/**
* Resumes the timer if it was stopped.
*/
private void resumeTimer() {
blobMetrics = new AtomicReference<AbfsOperationMetrics>(
new AbfsOperationMetrics(System.currentTimeMillis()));
timer.schedule(new TimerTaskImpl(),
analysisPeriodMs,
analysisPeriodMs);
isOperationOnAccountIdle.set(false);
}
/**
* Synchronized method to suspend or resume timer.
* @param timerFunctionality resume or suspend.
* @param timerTask The timertask object.
* @return true or false.
*/
private synchronized boolean timerOrchestrator(TimerFunctionality timerFunctionality,
TimerTask timerTask) {
switch (timerFunctionality) {
case RESUME:
if (isOperationOnAccountIdle.get()) {
resumeTimer();
}
break;
case SUSPEND:
if (accountLevelThrottlingEnabled && (System.currentTimeMillis()
- lastExecutionTime.get() >= getOperationIdleTimeout())) {
isOperationOnAccountIdle.set(true);
timerTask.cancel();
timer.purge();
return true;
}
break;
default:
break;
}
return false;
}
/** /**
* Updates metrics with results from the current storage operation. * Updates metrics with results from the current storage operation.
* *
@ -104,12 +144,13 @@ private AbfsClientThrottlingAnalyzer() {
public void addBytesTransferred(long count, boolean isFailedOperation) { public void addBytesTransferred(long count, boolean isFailedOperation) {
AbfsOperationMetrics metrics = blobMetrics.get(); AbfsOperationMetrics metrics = blobMetrics.get();
if (isFailedOperation) { if (isFailedOperation) {
metrics.bytesFailed.addAndGet(count); metrics.addBytesFailed(count);
metrics.operationsFailed.incrementAndGet(); metrics.incrementOperationsFailed();
} else { } else {
metrics.bytesSuccessful.addAndGet(count); metrics.addBytesSuccessful(count);
metrics.operationsSuccessful.incrementAndGet(); metrics.incrementOperationsSuccessful();
} }
blobMetrics.set(metrics);
} }
/** /**
@ -117,6 +158,8 @@ public void addBytesTransferred(long count, boolean isFailedOperation) {
* @return true if Thread sleeps(Throttling occurs) else false. * @return true if Thread sleeps(Throttling occurs) else false.
*/ */
public boolean suspendIfNecessary() { public boolean suspendIfNecessary() {
lastExecutionTime.set(now());
timerOrchestrator(TimerFunctionality.RESUME, null);
int duration = sleepDuration; int duration = sleepDuration;
if (duration > 0) { if (duration > 0) {
try { try {
@ -134,19 +177,27 @@ int getSleepDuration() {
return sleepDuration; return sleepDuration;
} }
int getOperationIdleTimeout() {
return abfsConfiguration.getAccountOperationIdleTimeout();
}
AtomicBoolean getIsOperationOnAccountIdle() {
return isOperationOnAccountIdle;
}
private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics, private int analyzeMetricsAndUpdateSleepDuration(AbfsOperationMetrics metrics,
int sleepDuration) { int sleepDuration) {
final double percentageConversionFactor = 100; final double percentageConversionFactor = 100;
double bytesFailed = metrics.bytesFailed.get(); double bytesFailed = metrics.getBytesFailed().get();
double bytesSuccessful = metrics.bytesSuccessful.get(); double bytesSuccessful = metrics.getBytesSuccessful().get();
double operationsFailed = metrics.operationsFailed.get(); double operationsFailed = metrics.getOperationsFailed().get();
double operationsSuccessful = metrics.operationsSuccessful.get(); double operationsSuccessful = metrics.getOperationsSuccessful().get();
double errorPercentage = (bytesFailed <= 0) double errorPercentage = (bytesFailed <= 0)
? 0 ? 0
: (percentageConversionFactor : (percentageConversionFactor
* bytesFailed * bytesFailed
/ (bytesFailed + bytesSuccessful)); / (bytesFailed + bytesSuccessful));
long periodMs = metrics.endTime - metrics.startTime; long periodMs = metrics.getEndTime() - metrics.getStartTime();
double newSleepDuration; double newSleepDuration;
@ -238,10 +289,13 @@ public void run() {
} }
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
if (now - blobMetrics.get().startTime >= analysisPeriodMs) { if (timerOrchestrator(TimerFunctionality.SUSPEND, this)) {
return;
}
if (now - blobMetrics.get().getStartTime() >= analysisPeriodMs) {
AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet( AbfsOperationMetrics oldMetrics = blobMetrics.getAndSet(
new AbfsOperationMetrics(now)); new AbfsOperationMetrics(now));
oldMetrics.endTime = now; oldMetrics.setEndTime(now);
sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics, sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics,
sleepDuration); sleepDuration);
} }
@ -252,24 +306,4 @@ public void run() {
} }
} }
} }
/**
* Stores Abfs operation metrics during each analysis period.
*/
static class AbfsOperationMetrics {
private AtomicLong bytesFailed;
private AtomicLong bytesSuccessful;
private AtomicLong operationsFailed;
private AtomicLong operationsSuccessful;
private long endTime;
private long startTime;
AbfsOperationMetrics(long startTime) {
this.startTime = startTime;
this.bytesFailed = new AtomicLong();
this.bytesSuccessful = new AtomicLong();
this.operationsFailed = new AtomicLong();
this.operationsSuccessful = new AtomicLong();
}
}
} }

View File

@ -19,10 +19,12 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
@ -38,35 +40,89 @@
* and sleeps just enough to minimize errors, allowing optimal ingress and/or * and sleeps just enough to minimize errors, allowing optimal ingress and/or
* egress throughput. * egress throughput.
*/ */
public final class AbfsClientThrottlingIntercept { public final class AbfsClientThrottlingIntercept implements AbfsThrottlingIntercept {
private static final Logger LOG = LoggerFactory.getLogger( private static final Logger LOG = LoggerFactory.getLogger(
AbfsClientThrottlingIntercept.class); AbfsClientThrottlingIntercept.class);
private static final String RANGE_PREFIX = "bytes="; private static final String RANGE_PREFIX = "bytes=";
private static AbfsClientThrottlingIntercept singleton = null; private static AbfsClientThrottlingIntercept singleton; // singleton, initialized in static initialization block
private AbfsClientThrottlingAnalyzer readThrottler = null; private static final ReentrantLock LOCK = new ReentrantLock();
private AbfsClientThrottlingAnalyzer writeThrottler = null; private final AbfsClientThrottlingAnalyzer readThrottler;
private static boolean isAutoThrottlingEnabled = false; private final AbfsClientThrottlingAnalyzer writeThrottler;
private final String accountName;
// Hide default constructor // Hide default constructor
private AbfsClientThrottlingIntercept() { public AbfsClientThrottlingIntercept(String accountName, AbfsConfiguration abfsConfiguration) {
readThrottler = new AbfsClientThrottlingAnalyzer("read"); this.accountName = accountName;
writeThrottler = new AbfsClientThrottlingAnalyzer("write"); this.readThrottler = setAnalyzer("read " + accountName, abfsConfiguration);
this.writeThrottler = setAnalyzer("write " + accountName, abfsConfiguration);
LOG.debug("Client-side throttling is enabled for the ABFS file system for the account : {}", accountName);
} }
public static synchronized void initializeSingleton(boolean enableAutoThrottling) { // Hide default constructor
if (!enableAutoThrottling) { private AbfsClientThrottlingIntercept(AbfsConfiguration abfsConfiguration) {
return; //Account name is kept as empty as same instance is shared across all accounts
this.accountName = "";
this.readThrottler = setAnalyzer("read", abfsConfiguration);
this.writeThrottler = setAnalyzer("write", abfsConfiguration);
LOG.debug("Client-side throttling is enabled for the ABFS file system using singleton intercept");
} }
/**
* Sets the analyzer for the intercept.
* @param name Name of the analyzer.
* @param abfsConfiguration The configuration.
* @return AbfsClientThrottlingAnalyzer instance.
*/
private AbfsClientThrottlingAnalyzer setAnalyzer(String name, AbfsConfiguration abfsConfiguration) {
return new AbfsClientThrottlingAnalyzer(name, abfsConfiguration);
}
/**
* Returns the analyzer for read operations.
* @return AbfsClientThrottlingAnalyzer for read.
*/
AbfsClientThrottlingAnalyzer getReadThrottler() {
return readThrottler;
}
/**
* Returns the analyzer for write operations.
* @return AbfsClientThrottlingAnalyzer for write.
*/
AbfsClientThrottlingAnalyzer getWriteThrottler() {
return writeThrottler;
}
/**
* Creates a singleton object of the AbfsClientThrottlingIntercept.
* which is shared across all filesystem instances.
* @param abfsConfiguration configuration set.
* @return singleton object of intercept.
*/
static AbfsClientThrottlingIntercept initializeSingleton(AbfsConfiguration abfsConfiguration) {
if (singleton == null) { if (singleton == null) {
singleton = new AbfsClientThrottlingIntercept(); LOCK.lock();
isAutoThrottlingEnabled = true; try {
if (singleton == null) {
singleton = new AbfsClientThrottlingIntercept(abfsConfiguration);
LOG.debug("Client-side throttling is enabled for the ABFS file system."); LOG.debug("Client-side throttling is enabled for the ABFS file system.");
} }
} finally {
LOCK.unlock();
}
}
return singleton;
} }
static void updateMetrics(AbfsRestOperationType operationType, /**
* Updates the metrics for successful and failed read and write operations.
* @param operationType Only applicable for read and write operations.
* @param abfsHttpOperation Used for status code and data transferred.
*/
@Override
public void updateMetrics(AbfsRestOperationType operationType,
AbfsHttpOperation abfsHttpOperation) { AbfsHttpOperation abfsHttpOperation) {
if (!isAutoThrottlingEnabled || abfsHttpOperation == null) { if (abfsHttpOperation == null) {
return; return;
} }
@ -82,7 +138,7 @@ static void updateMetrics(AbfsRestOperationType operationType,
case Append: case Append:
contentLength = abfsHttpOperation.getBytesSent(); contentLength = abfsHttpOperation.getBytesSent();
if (contentLength > 0) { if (contentLength > 0) {
singleton.writeThrottler.addBytesTransferred(contentLength, writeThrottler.addBytesTransferred(contentLength,
isFailedOperation); isFailedOperation);
} }
break; break;
@ -90,7 +146,7 @@ static void updateMetrics(AbfsRestOperationType operationType,
String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE); String range = abfsHttpOperation.getConnection().getRequestProperty(HttpHeaderConfigurations.RANGE);
contentLength = getContentLengthIfKnown(range); contentLength = getContentLengthIfKnown(range);
if (contentLength > 0) { if (contentLength > 0) {
singleton.readThrottler.addBytesTransferred(contentLength, readThrottler.addBytesTransferred(contentLength,
isFailedOperation); isFailedOperation);
} }
break; break;
@ -104,21 +160,18 @@ static void updateMetrics(AbfsRestOperationType operationType,
* uses this to suspend the request, if necessary, to minimize errors and * uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput. * maximize throughput.
*/ */
static void sendingRequest(AbfsRestOperationType operationType, @Override
public void sendingRequest(AbfsRestOperationType operationType,
AbfsCounters abfsCounters) { AbfsCounters abfsCounters) {
if (!isAutoThrottlingEnabled) {
return;
}
switch (operationType) { switch (operationType) {
case ReadFile: case ReadFile:
if (singleton.readThrottler.suspendIfNecessary() if (readThrottler.suspendIfNecessary()
&& abfsCounters != null) { && abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1); abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
} }
break; break;
case Append: case Append:
if (singleton.writeThrottler.suspendIfNecessary() if (writeThrottler.suspendIfNecessary()
&& abfsCounters != null) { && abfsCounters != null) {
abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1); abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
} }

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
final class AbfsNoOpThrottlingIntercept implements AbfsThrottlingIntercept {
public static final AbfsNoOpThrottlingIntercept INSTANCE = new AbfsNoOpThrottlingIntercept();
private AbfsNoOpThrottlingIntercept() {
}
@Override
public void updateMetrics(final AbfsRestOperationType operationType,
final AbfsHttpOperation abfsHttpOperation) {
}
@Override
public void sendingRequest(final AbfsRestOperationType operationType,
final AbfsCounters abfsCounters) {
}
}

View File

@ -0,0 +1,139 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.concurrent.atomic.AtomicLong;
/**
* Stores Abfs operation metrics during each analysis period.
*/
class AbfsOperationMetrics {
/**
* No of bytes which could not be transferred due to a failed operation.
*/
private final AtomicLong bytesFailed;
/**
* No of bytes successfully transferred during a successful operation.
*/
private final AtomicLong bytesSuccessful;
/**
* Total no of failed operations.
*/
private final AtomicLong operationsFailed;
/**
* Total no of successful operations.
*/
private final AtomicLong operationsSuccessful;
/**
* Time when collection of metrics ended.
*/
private long endTime;
/**
* Time when the collection of metrics started.
*/
private final long startTime;
AbfsOperationMetrics(long startTime) {
this.startTime = startTime;
this.bytesFailed = new AtomicLong();
this.bytesSuccessful = new AtomicLong();
this.operationsFailed = new AtomicLong();
this.operationsSuccessful = new AtomicLong();
}
/**
*
* @return bytes failed to transfer.
*/
AtomicLong getBytesFailed() {
return bytesFailed;
}
/**
*
* @return bytes successfully transferred.
*/
AtomicLong getBytesSuccessful() {
return bytesSuccessful;
}
/**
*
* @return no of operations failed.
*/
AtomicLong getOperationsFailed() {
return operationsFailed;
}
/**
*
* @return no of successful operations.
*/
AtomicLong getOperationsSuccessful() {
return operationsSuccessful;
}
/**
*
* @return end time of metric collection.
*/
long getEndTime() {
return endTime;
}
/**
*
* @param endTime sets the end time.
*/
void setEndTime(final long endTime) {
this.endTime = endTime;
}
/**
*
* @return start time of metric collection.
*/
long getStartTime() {
return startTime;
}
void addBytesFailed(long bytes) {
this.getBytesFailed().addAndGet(bytes);
}
void addBytesSuccessful(long bytes) {
this.getBytesSuccessful().addAndGet(bytes);
}
void incrementOperationsFailed() {
this.getOperationsFailed().incrementAndGet();
}
void incrementOperationsSuccessful() {
this.getOperationsSuccessful().incrementAndGet();
}
}

View File

@ -45,6 +45,8 @@ public class AbfsRestOperation {
private final AbfsRestOperationType operationType; private final AbfsRestOperationType operationType;
// Blob FS client, which has the credentials, retry policy, and logs. // Blob FS client, which has the credentials, retry policy, and logs.
private final AbfsClient client; private final AbfsClient client;
// Return intercept instance
private final AbfsThrottlingIntercept intercept;
// the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE) // the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE)
private final String method; private final String method;
// full URL including query parameters // full URL including query parameters
@ -145,6 +147,7 @@ String getSasToken() {
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
this.sasToken = sasToken; this.sasToken = sasToken;
this.abfsCounters = client.getAbfsCounters(); this.abfsCounters = client.getAbfsCounters();
this.intercept = client.getIntercept();
} }
/** /**
@ -241,7 +244,8 @@ private void completeExecute(TracingContext tracingContext)
*/ */
private boolean executeHttpOperation(final int retryCount, private boolean executeHttpOperation(final int retryCount,
TracingContext tracingContext) throws AzureBlobFileSystemException { TracingContext tracingContext) throws AzureBlobFileSystemException {
AbfsHttpOperation httpOperation = null; AbfsHttpOperation httpOperation;
try { try {
// initialize the HTTP request and open the connection // initialize the HTTP request and open the connection
httpOperation = new AbfsHttpOperation(url, method, requestHeaders); httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
@ -278,8 +282,7 @@ private boolean executeHttpOperation(final int retryCount,
// dump the headers // dump the headers
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers", AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
httpOperation.getConnection().getRequestProperties()); httpOperation.getConnection().getRequestProperties());
AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters); intercept.sendingRequest(operationType, abfsCounters);
if (hasRequestBody) { if (hasRequestBody) {
// HttpUrlConnection requires // HttpUrlConnection requires
httpOperation.sendRequest(buffer, bufferOffset, bufferLength); httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
@ -317,7 +320,7 @@ private boolean executeHttpOperation(final int retryCount,
return false; return false;
} finally { } finally {
AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation); intercept.updateMetrics(operationType, httpOperation);
} }
LOG.debug("HttpRequest: {}: {}", operationType, httpOperation); LOG.debug("HttpRequest: {}: {}", operationType, httpOperation);

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* An interface for Abfs Throttling Interface.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface AbfsThrottlingIntercept {
/**
* Updates the metrics for successful and failed read and write operations.
* @param operationType Only applicable for read and write operations.
* @param abfsHttpOperation Used for status code and data transferred.
*/
void updateMetrics(AbfsRestOperationType operationType,
AbfsHttpOperation abfsHttpOperation);
/**
* Called before the request is sent. Client-side throttling
* uses this to suspend the request, if necessary, to minimize errors and
* maximize throughput.
* @param operationType Only applicable for read and write operations.
* @param abfsCounters Used for counters.
*/
void sendingRequest(AbfsRestOperationType operationType,
AbfsCounters abfsCounters);
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.util.WeakReferenceMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class to get an instance of throttling intercept class per account.
*/
final class AbfsThrottlingInterceptFactory {
private AbfsThrottlingInterceptFactory() {
}
private static AbfsConfiguration abfsConfig;
/**
* List of references notified of loss.
*/
private static List<String> lostReferences = new ArrayList<>();
private static final Logger LOG = LoggerFactory.getLogger(
AbfsThrottlingInterceptFactory.class);
/**
* Map which stores instance of ThrottlingIntercept class per account.
*/
private static WeakReferenceMap<String, AbfsThrottlingIntercept>
interceptMap = new WeakReferenceMap<>(
AbfsThrottlingInterceptFactory::factory,
AbfsThrottlingInterceptFactory::referenceLost);
/**
* Returns instance of throttling intercept.
* @param accountName Account name.
* @return instance of throttling intercept.
*/
private static AbfsClientThrottlingIntercept factory(final String accountName) {
return new AbfsClientThrottlingIntercept(accountName, abfsConfig);
}
/**
* Reference lost callback.
* @param accountName key lost.
*/
private static void referenceLost(String accountName) {
lostReferences.add(accountName);
}
/**
* Returns an instance of AbfsThrottlingIntercept.
*
* @param accountName The account for which we need instance of throttling intercept.
@param abfsConfiguration The object of abfsconfiguration class.
* @return Instance of AbfsThrottlingIntercept.
*/
static synchronized AbfsThrottlingIntercept getInstance(String accountName,
AbfsConfiguration abfsConfiguration) {
abfsConfig = abfsConfiguration;
AbfsThrottlingIntercept intercept;
if (!abfsConfiguration.isAutoThrottlingEnabled()) {
return AbfsNoOpThrottlingIntercept.INSTANCE;
}
// If singleton is enabled use a static instance of the intercept class for all accounts
if (!abfsConfiguration.accountThrottlingEnabled()) {
intercept = AbfsClientThrottlingIntercept.initializeSingleton(
abfsConfiguration);
} else {
// Return the instance from the map
intercept = interceptMap.get(accountName);
if (intercept == null) {
intercept = new AbfsClientThrottlingIntercept(accountName,
abfsConfiguration);
interceptMap.put(accountName, intercept);
}
}
return intercept;
}
}

View File

@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
public enum TimerFunctionality {
RESUME,
SUSPEND
}

View File

@ -767,6 +767,12 @@ Hflush() being the only documented API that can provide persistent data
transfer, Flush() also attempting to persist buffered data will lead to transfer, Flush() also attempting to persist buffered data will lead to
performance issues. performance issues.
### <a name="accountlevelthrottlingoptions"></a> Account level throttling Options
`fs.azure.account.operation.idle.timeout`: This value specifies the time after which the timer for the analyzer (read or
write) should be paused until no new request is made again. The default value for the same is 60 seconds.
### <a name="hnscheckconfigoptions"></a> HNS Check Options ### <a name="hnscheckconfigoptions"></a> HNS Check Options
Config `fs.azure.account.hns.enabled` provides an option to specify whether Config `fs.azure.account.hns.enabled` provides an option to specify whether
the storage account is HNS enabled or not. In case the config is not provided, the storage account is HNS enabled or not. In case the config is not provided,
@ -877,6 +883,9 @@ when there are too many writes from the same process.
tuned with this config considering each queued request holds a buffer. Set tuned with this config considering each queued request holds a buffer. Set
the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests. the value 3 or 4 times the value set for s.azure.write.max.concurrent.requests.
`fs.azure.analysis.period`: The time after which sleep duration is recomputed after analyzing metrics. The default value
for the same is 10 seconds.
### <a name="securityconfigoptions"></a> Security Options ### <a name="securityconfigoptions"></a> Security Options
`fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag `fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag
is made true. Irrespective of the flag, `AbfsClient` will use HTTPS if the secure is made true. Irrespective of the flag, `AbfsClient` will use HTTPS if the secure

View File

@ -24,6 +24,9 @@
public final class TestConfigurationKeys { public final class TestConfigurationKeys {
public static final String FS_AZURE_ACCOUNT_NAME = "fs.azure.account.name"; public static final String FS_AZURE_ACCOUNT_NAME = "fs.azure.account.name";
public static final String FS_AZURE_ABFS_ACCOUNT_NAME = "fs.azure.abfs.account.name"; public static final String FS_AZURE_ABFS_ACCOUNT_NAME = "fs.azure.abfs.account.name";
public static final String FS_AZURE_ABFS_ACCOUNT1_NAME = "fs.azure.abfs.account1.name";
public static final String FS_AZURE_ENABLE_AUTOTHROTTLING = "fs.azure.enable.autothrottling";
public static final String FS_AZURE_ANALYSIS_PERIOD = "fs.azure.analysis.period";
public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key"; public static final String FS_AZURE_ACCOUNT_KEY = "fs.azure.account.key";
public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs"; public static final String FS_AZURE_CONTRACT_TEST_URI = "fs.contract.test.fs.abfs";
public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled"; public static final String FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT = "fs.azure.test.namespace.enabled";

View File

@ -306,6 +306,11 @@ public static AbfsClient getMockAbfsClient(AbfsClient baseAbfsClientInstance,
when(client.getAccessToken()).thenCallRealMethod(); when(client.getAccessToken()).thenCallRealMethod();
when(client.getSharedKeyCredentials()).thenCallRealMethod(); when(client.getSharedKeyCredentials()).thenCallRealMethod();
when(client.createDefaultHeaders()).thenCallRealMethod(); when(client.createDefaultHeaders()).thenCallRealMethod();
when(client.getAbfsConfiguration()).thenReturn(abfsConfig);
when(client.getIntercept()).thenReturn(
AbfsThrottlingInterceptFactory.getInstance(
abfsConfig.getAccountName().substring(0,
abfsConfig.getAccountName().indexOf(DOT)), abfsConfig));
// override baseurl // override baseurl
client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration", client = TestAbfsClient.setAbfsClientField(client, "abfsConfiguration",

View File

@ -18,9 +18,15 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ANALYSIS_PERIOD;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -33,6 +39,15 @@ public class TestAbfsClientThrottlingAnalyzer {
+ ANALYSIS_PERIOD / 10; + ANALYSIS_PERIOD / 10;
private static final long MEGABYTE = 1024 * 1024; private static final long MEGABYTE = 1024 * 1024;
private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20; private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20;
private AbfsConfiguration abfsConfiguration;
public TestAbfsClientThrottlingAnalyzer() throws IOException, IllegalAccessException {
final Configuration configuration = new Configuration();
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
configuration.setInt(FS_AZURE_ANALYSIS_PERIOD, 1000);
this.abfsConfiguration = new AbfsConfiguration(configuration,
"dummy");
}
private void sleep(long milliseconds) { private void sleep(long milliseconds) {
try { try {
@ -82,8 +97,7 @@ private void validateLessThanOrEqual(long maxExpected, long actual) {
@Test @Test
public void testNoMetricUpdatesThenNoWaiting() { public void testNoMetricUpdatesThenNoWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test", "test", abfsConfiguration);
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration()); validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
validate(0, analyzer.getSleepDuration()); validate(0, analyzer.getSleepDuration());
@ -96,8 +110,7 @@ public void testNoMetricUpdatesThenNoWaiting() {
@Test @Test
public void testOnlySuccessThenNoWaiting() { public void testOnlySuccessThenNoWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test", "test", abfsConfiguration);
ANALYSIS_PERIOD);
analyzer.addBytesTransferred(8 * MEGABYTE, false); analyzer.addBytesTransferred(8 * MEGABYTE, false);
validate(0, analyzer.getSleepDuration()); validate(0, analyzer.getSleepDuration());
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
@ -112,8 +125,7 @@ public void testOnlySuccessThenNoWaiting() {
@Test @Test
public void testOnlyErrorsAndWaiting() { public void testOnlyErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test", "test", abfsConfiguration);
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration()); validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(4 * MEGABYTE, true); analyzer.addBytesTransferred(4 * MEGABYTE, true);
sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT);
@ -132,8 +144,7 @@ public void testOnlyErrorsAndWaiting() {
@Test @Test
public void testSuccessAndErrorsAndWaiting() { public void testSuccessAndErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test", "test", abfsConfiguration);
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration()); validate(0, analyzer.getSleepDuration());
analyzer.addBytesTransferred(8 * MEGABYTE, false); analyzer.addBytesTransferred(8 * MEGABYTE, false);
analyzer.addBytesTransferred(2 * MEGABYTE, true); analyzer.addBytesTransferred(2 * MEGABYTE, true);
@ -157,8 +168,7 @@ public void testSuccessAndErrorsAndWaiting() {
@Test @Test
public void testManySuccessAndErrorsAndWaiting() { public void testManySuccessAndErrorsAndWaiting() {
AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer( AbfsClientThrottlingAnalyzer analyzer = new AbfsClientThrottlingAnalyzer(
"test", "test", abfsConfiguration);
ANALYSIS_PERIOD);
validate(0, analyzer.getSleepDuration()); validate(0, analyzer.getSleepDuration());
final int numberOfRequests = 20; final int numberOfRequests = 20;
for (int i = 0; i < numberOfRequests; i++) { for (int i = 0; i < numberOfRequests; i++) {

View File

@ -18,13 +18,35 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MIN_BACKOFF_INTERVAL;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_AUTOTHROTTLING;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ABFS_ACCOUNT1_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.TEST_CONFIGURATION_FILE_NAME;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.fs.FSDataInputStream;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.mockito.Mockito;
import java.net.URI;
import java.util.Random; import java.util.Random;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -41,6 +63,9 @@ public class TestExponentialRetryPolicy extends AbstractAbfsIntegrationTest {
private final int noRetryCount = 0; private final int noRetryCount = 0;
private final int retryCount = new Random().nextInt(maxRetryCount); private final int retryCount = new Random().nextInt(maxRetryCount);
private final int retryCountBeyondMax = maxRetryCount + 1; private final int retryCountBeyondMax = maxRetryCount + 1;
private static final String TEST_PATH = "/testfile";
private static final double MULTIPLYING_FACTOR = 1.5;
private static final int ANALYSIS_PERIOD = 10000;
public TestExponentialRetryPolicy() throws Exception { public TestExponentialRetryPolicy() throws Exception {
@ -67,6 +92,173 @@ public void testDefaultMaxIORetryCount() throws Exception {
testMaxIOConfig(abfsConfig); testMaxIOConfig(abfsConfig);
} }
@Test
public void testThrottlingIntercept() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
final Configuration configuration = new Configuration();
configuration.addResource(TEST_CONFIGURATION_FILE_NAME);
configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, false);
// On disabling throttling AbfsNoOpThrottlingIntercept object is returned
AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration,
"dummy.dfs.core.windows.net");
AbfsThrottlingIntercept intercept;
AbfsClient abfsClient = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration);
intercept = abfsClient.getIntercept();
Assertions.assertThat(intercept)
.describedAs("AbfsNoOpThrottlingIntercept instance expected")
.isInstanceOf(AbfsNoOpThrottlingIntercept.class);
configuration.setBoolean(FS_AZURE_ENABLE_AUTOTHROTTLING, true);
configuration.setBoolean(FS_AZURE_ACCOUNT_LEVEL_THROTTLING_ENABLED, true);
// On disabling throttling AbfsClientThrottlingIntercept object is returned
AbfsConfiguration abfsConfiguration1 = new AbfsConfiguration(configuration,
"dummy1.dfs.core.windows.net");
AbfsClient abfsClient1 = TestAbfsClient.createTestClientFromCurrentContext(fs.getAbfsStore().getClient(), abfsConfiguration1);
intercept = abfsClient1.getIntercept();
Assertions.assertThat(intercept)
.describedAs("AbfsClientThrottlingIntercept instance expected")
.isInstanceOf(AbfsClientThrottlingIntercept.class);
}
@Test
public void testCreateMultipleAccountThrottling() throws Exception {
Configuration config = new Configuration(getRawConfiguration());
String accountName = config.get(FS_AZURE_ACCOUNT_NAME);
if (accountName == null) {
// check if accountName is set using different config key
accountName = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
}
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
accountName != null && !accountName.isEmpty());
Configuration rawConfig1 = new Configuration();
rawConfig1.addResource(TEST_CONFIGURATION_FILE_NAME);
AbfsRestOperation successOp = mock(AbfsRestOperation.class);
AbfsHttpOperation http500Op = mock(AbfsHttpOperation.class);
when(http500Op.getStatusCode()).thenReturn(HTTP_INTERNAL_ERROR);
when(successOp.getResult()).thenReturn(http500Op);
AbfsConfiguration configuration = Mockito.mock(AbfsConfiguration.class);
when(configuration.getAnalysisPeriod()).thenReturn(ANALYSIS_PERIOD);
when(configuration.isAutoThrottlingEnabled()).thenReturn(true);
when(configuration.accountThrottlingEnabled()).thenReturn(false);
AbfsThrottlingIntercept instance1 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
accountName1 != null && !accountName1.isEmpty());
AbfsThrottlingIntercept instance2 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
//if singleton is enabled, for different accounts both the instances should return same value
Assertions.assertThat(instance1)
.describedAs(
"if singleton is enabled, for different accounts both the instances should return same value")
.isEqualTo(instance2);
when(configuration.accountThrottlingEnabled()).thenReturn(true);
AbfsThrottlingIntercept instance3 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
AbfsThrottlingIntercept instance4 = AbfsThrottlingInterceptFactory.getInstance(accountName1, configuration);
AbfsThrottlingIntercept instance5 = AbfsThrottlingInterceptFactory.getInstance(accountName, configuration);
//if singleton is not enabled, for different accounts instances should return different value
Assertions.assertThat(instance3)
.describedAs(
"iff singleton is not enabled, for different accounts instances should return different value")
.isNotEqualTo(instance4);
//if singleton is not enabled, for same accounts instances should return same value
Assertions.assertThat(instance3)
.describedAs(
"if singleton is not enabled, for same accounts instances should return same value")
.isEqualTo(instance5);
}
@Test
public void testOperationOnAccountIdle() throws Exception {
//Get the filesystem.
AzureBlobFileSystem fs = getFileSystem();
AbfsClient client = fs.getAbfsStore().getClient();
AbfsConfiguration configuration1 = client.getAbfsConfiguration();
Assume.assumeTrue(configuration1.isAutoThrottlingEnabled());
Assume.assumeTrue(configuration1.accountThrottlingEnabled());
AbfsClientThrottlingIntercept accountIntercept
= (AbfsClientThrottlingIntercept) client.getIntercept();
final byte[] b = new byte[2 * MIN_BUFFER_SIZE];
new Random().nextBytes(b);
Path testPath = path(TEST_PATH);
//Do an operation on the filesystem.
try (FSDataOutputStream stream = fs.create(testPath)) {
stream.write(b);
}
//Don't perform any operation on the account.
int sleepTime = (int) ((getAbfsConfig().getAccountOperationIdleTimeout()) * MULTIPLYING_FACTOR);
Thread.sleep(sleepTime);
try (FSDataInputStream streamRead = fs.open(testPath)) {
streamRead.read(b);
}
//Perform operations on another account.
AzureBlobFileSystem fs1 = new AzureBlobFileSystem();
Configuration config = new Configuration(getRawConfiguration());
String accountName1 = config.get(FS_AZURE_ABFS_ACCOUNT1_NAME);
assumeTrue("Not set: " + FS_AZURE_ABFS_ACCOUNT1_NAME,
accountName1 != null && !accountName1.isEmpty());
final String abfsUrl1 = this.getFileSystemName() + "12" + "@" + accountName1;
URI defaultUri1 = null;
defaultUri1 = new URI("abfss", abfsUrl1, null, null, null);
fs1.initialize(defaultUri1, getRawConfiguration());
AbfsClient client1 = fs1.getAbfsStore().getClient();
AbfsClientThrottlingIntercept accountIntercept1
= (AbfsClientThrottlingIntercept) client1.getIntercept();
try (FSDataOutputStream stream1 = fs1.create(testPath)) {
stream1.write(b);
}
//Verify the write analyzer for first account is idle but the read analyzer is not idle.
Assertions.assertThat(accountIntercept.getWriteThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs("Write analyzer for first account should be idle the first time")
.isTrue();
Assertions.assertThat(
accountIntercept.getReadThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs("Read analyzer for first account should not be idle")
.isFalse();
//Verify the write analyzer for second account is not idle.
Assertions.assertThat(
accountIntercept1.getWriteThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs("Write analyzer for second account should not be idle")
.isFalse();
//Again perform an operation on the first account.
try (FSDataOutputStream stream2 = fs.create(testPath)) {
stream2.write(b);
}
//Verify the write analyzer on first account is not idle.
Assertions.assertThat(
accountIntercept.getWriteThrottler()
.getIsOperationOnAccountIdle()
.get())
.describedAs(
"Write analyzer for first account should not be idle second time")
.isFalse();
}
@Test @Test
public void testAbfsConfigConstructor() throws Exception { public void testAbfsConfigConstructor() throws Exception {
// Ensure we choose expected values that are not defaults // Ensure we choose expected values that are not defaults