diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index ab8136a6f8d..cfdb245b090 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -1502,13 +1502,21 @@ public class ContractTestUtils extends Assert { * printing some useful results in the process. */ public static final class NanoTimer { - private final long startTime; + private long startTime; private long endTime; public NanoTimer() { startTime = now(); } + /** + * Reset the timer. Equivalent to the reset button of a stopwatch. + */ + public void reset() { + endTime = 0; + startTime = now(); + } + /** * End the operation. * @return the duration of the operation diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index f4aff955172..4f997f2e9cd 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -158,6 +158,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private static final String KEY_SELF_THROTTLE_READ_FACTOR = "fs.azure.selfthrottling.read.factor"; private static final String KEY_SELF_THROTTLE_WRITE_FACTOR = "fs.azure.selfthrottling.write.factor"; + private static final String KEY_AUTO_THROTTLE_ENABLE = "fs.azure.autothrottling.enable"; + private static final String KEY_ENABLE_STORAGE_CLIENT_LOGGING = "fs.azure.storage.client.logging"; /** @@ -239,10 +241,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // Retry parameter defaults. // - private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 1 * 1000; // 1s + private static final int DEFAULT_MIN_BACKOFF_INTERVAL = 3 * 1000; // 1s private static final int DEFAULT_MAX_BACKOFF_INTERVAL = 30 * 1000; // 30s - private static final int DEFAULT_BACKOFF_INTERVAL = 1 * 1000; // 1s - private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 15; + private static final int DEFAULT_BACKOFF_INTERVAL = 3 * 1000; // 1s + private static final int DEFAULT_MAX_RETRY_ATTEMPTS = 30; private static final int DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL = 3 * 1000; private static final int DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL = 90 * 1000; @@ -256,6 +258,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private static final float DEFAULT_SELF_THROTTLE_READ_FACTOR = 1.0f; private static final float DEFAULT_SELF_THROTTLE_WRITE_FACTOR = 1.0f; + private static final boolean DEFAULT_AUTO_THROTTLE_ENABLE = false; + private static final int STORAGE_CONNECTION_TIMEOUT_DEFAULT = 90; /** @@ -283,7 +287,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private boolean connectingUsingSAS = false; private AzureFileSystemInstrumentation instrumentation; private BandwidthGaugeUpdater bandwidthGaugeUpdater; - private final static JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer(); + private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer(); private boolean suppressRetryPolicy = false; private boolean canCreateOrModifyContainer = false; @@ -308,6 +312,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private float selfThrottlingReadFactor; private float selfThrottlingWriteFactor; + private boolean autoThrottlingEnabled; + private TestHookOperationContext testHookOperationContext = null; // Set if we're running against a storage emulator.. @@ -481,7 +487,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { "Cannot initialize WASB file system, conf is null"); } - if(!conf.getBoolean( + if (!conf.getBoolean( NativeAzureFileSystem.SKIP_AZURE_METRICS_PROPERTY_NAME, false)) { //If not skip azure metrics, create bandwidthGaugeUpdater this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation); @@ -664,9 +670,9 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private String getHTTPScheme() { String sessionScheme = sessionUri.getScheme(); // Check if we're on a secure URI scheme: wasbs or the legacy asvs scheme. - if (sessionScheme != null && - (sessionScheme.equalsIgnoreCase("asvs") || - sessionScheme.equalsIgnoreCase("wasbs"))) { + if (sessionScheme != null + && (sessionScheme.equalsIgnoreCase("asvs") + || sessionScheme.equalsIgnoreCase("wasbs"))) { return HTTPS_SCHEME; } else { // At this point the scheme should be either null or asv or wasb. @@ -766,6 +772,18 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { selfThrottlingWriteFactor = sessionConfiguration.getFloat( KEY_SELF_THROTTLE_WRITE_FACTOR, DEFAULT_SELF_THROTTLE_WRITE_FACTOR); + if (!selfThrottlingEnabled) { + autoThrottlingEnabled = sessionConfiguration.getBoolean( + KEY_AUTO_THROTTLE_ENABLE, + DEFAULT_AUTO_THROTTLE_ENABLE); + if (autoThrottlingEnabled) { + ClientThrottlingIntercept.initializeSingleton(); + } + } else { + // cannot enable both self-throttling and client-throttling + autoThrottlingEnabled = false; + } + OperationContext.setLoggingEnabledByDefault(sessionConfiguration. getBoolean(KEY_ENABLE_STORAGE_CLIENT_LOGGING, false)); @@ -839,8 +857,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { CloudStorageAccount.getDevelopmentStorageAccount(); storageInteractionLayer.createBlobClient(account); } else { - blobEndPoint = new URI(getHTTPScheme() + "://" + - accountName); + blobEndPoint = new URI(getHTTPScheme() + "://" + accountName); storageInteractionLayer.createBlobClient(blobEndPoint, credentials); } suppressRetryPolicyInClientIfNeeded(); @@ -951,7 +968,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { * @throws AzureException * @throws IOException */ - private void createAzureStorageSession () + private void createAzureStorageSession() throws AzureException, IOException { // Make sure this object was properly initialized with references to @@ -1128,8 +1145,8 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { myDir = verifyAndConvertToStandardFormat(currentDir); } catch (URISyntaxException ex) { throw new AzureException(String.format( - "The directory %s specified in the configuration entry %s is not" + - " a valid URI.", + "The directory %s specified in the configuration entry %s is not" + + " a valid URI.", currentDir, configVar)); } if (myDir != null) { @@ -1159,8 +1176,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { public boolean isKeyForDirectorySet(String key, Set dirSet) { String defaultFS = FileSystem.getDefaultUri(sessionConfiguration).toString(); for (String dir : dirSet) { - if (dir.isEmpty() || - key.startsWith(dir + "/")) { + if (dir.isEmpty() || key.startsWith(dir + "/")) { return true; } @@ -1168,7 +1184,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // system. // try { - URI uriPageBlobDir = new URI (dir); + URI uriPageBlobDir = new URI(dir); if (null == uriPageBlobDir.getAuthority()) { // Concatenate the default file system prefix with the relative // page blob directory path. @@ -1424,7 +1440,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { throws StorageException { if (blob instanceof CloudPageBlobWrapper){ return new PageBlobOutputStream( - (CloudPageBlobWrapper)blob, getInstrumentedContext(), sessionConfiguration); + (CloudPageBlobWrapper) blob, getInstrumentedContext(), sessionConfiguration); } else { // Handle both ClouldBlockBlobWrapperImpl and (only for the test code path) @@ -1739,12 +1755,13 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { private Iterable listRootBlobs(boolean includeMetadata, boolean useFlatBlobListing) throws StorageException, URISyntaxException { return rootDirectory.listBlobs( - null, useFlatBlobListing, - includeMetadata ? - EnumSet.of(BlobListingDetails.METADATA) : - EnumSet.noneOf(BlobListingDetails.class), null, - getInstrumentedContext()); + useFlatBlobListing, + includeMetadata + ? EnumSet.of(BlobListingDetails.METADATA) + : EnumSet.noneOf(BlobListingDetails.class), + null, + getInstrumentedContext()); } /** @@ -1771,11 +1788,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { Iterable list = rootDirectory.listBlobs(aPrefix, useFlatBlobListing, - includeMetadata ? - EnumSet.of(BlobListingDetails.METADATA) : - EnumSet.noneOf(BlobListingDetails.class), - null, - getInstrumentedContext()); + includeMetadata + ? EnumSet.of(BlobListingDetails.METADATA) + : EnumSet.noneOf(BlobListingDetails.class), + null, + getInstrumentedContext()); return list; } @@ -1941,9 +1958,11 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { if (selfThrottlingEnabled) { SelfThrottlingIntercept.hook(operationContext, selfThrottlingReadFactor, selfThrottlingWriteFactor); + } else if (autoThrottlingEnabled) { + ClientThrottlingIntercept.hook(operationContext); } - if(bandwidthGaugeUpdater != null) { + if (bandwidthGaugeUpdater != null) { //bandwidthGaugeUpdater is null when we config to skip azure metrics ResponseReceivedMetricUpdater.hook( operationContext, @@ -2446,10 +2465,10 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { // 1. It's a BlobNotFound exception AND // 2. It got there after one-or-more retries THEN // we swallow the exception. - if (e.getErrorCode() != null && - "BlobNotFound".equals(e.getErrorCode()) && - operationContext.getRequestResults().size() > 1 && - operationContext.getRequestResults().get(0).getException() != null) { + if (e.getErrorCode() != null + && "BlobNotFound".equals(e.getErrorCode()) + && operationContext.getRequestResults().size() > 1 + && operationContext.getRequestResults().get(0).getException() != null) { LOG.debug("Swallowing delete exception on retry: {}", e.getMessage()); return; } else { @@ -2496,7 +2515,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { return delete(key, null); } catch (IOException e) { Throwable t = e.getCause(); - if(t != null && t instanceof StorageException) { + if (t != null && t instanceof StorageException) { StorageException se = (StorageException) t; if ("LeaseIdMissing".equals(se.getErrorCode())){ SelfRenewingLease lease = null; @@ -2509,7 +2528,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { throw e3; } finally { try { - if(lease != null){ + if (lease != null){ lease.free(); } } catch (Exception e4){ @@ -2561,8 +2580,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { srcBlob = getBlobReference(srcKey); if (!srcBlob.exists(getInstrumentedContext())) { - throw new AzureException ("Source blob " + srcKey + - " does not exist."); + throw new AzureException("Source blob " + srcKey + " does not exist."); } /** @@ -2600,19 +2618,19 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { if (se.getHttpStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) { int copyBlobMinBackoff = sessionConfiguration.getInt( KEY_COPYBLOB_MIN_BACKOFF_INTERVAL, - DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL); + DEFAULT_COPYBLOB_MIN_BACKOFF_INTERVAL); int copyBlobMaxBackoff = sessionConfiguration.getInt( KEY_COPYBLOB_MAX_BACKOFF_INTERVAL, - DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL); + DEFAULT_COPYBLOB_MAX_BACKOFF_INTERVAL); int copyBlobDeltaBackoff = sessionConfiguration.getInt( KEY_COPYBLOB_BACKOFF_INTERVAL, - DEFAULT_COPYBLOB_BACKOFF_INTERVAL); + DEFAULT_COPYBLOB_BACKOFF_INTERVAL); int copyBlobMaxRetries = sessionConfiguration.getInt( KEY_COPYBLOB_MAX_IO_RETRIES, - DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS); + DEFAULT_COPYBLOB_MAX_RETRY_ATTEMPTS); BlobRequestOptions options = new BlobRequestOptions(); options.setRetryPolicyFactory(new RetryExponentialRetry( @@ -2631,7 +2649,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { InputStream ipStream = null; OutputStream opStream = null; try { - if(srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){ + if (srcBlob.getProperties().getBlobType() == BlobType.PAGE_BLOB){ ipStream = openInputStream(srcBlob); opStream = openOutputStream(dstBlob); byte[] buffer = new byte[PageBlobFormatHelpers.PAGE_SIZE]; @@ -2817,7 +2835,7 @@ public class AzureNativeFileSystemStore implements NativeFileSystemStore { @Override public void close() { - if(bandwidthGaugeUpdater != null) { + if (bandwidthGaugeUpdater != null) { bandwidthGaugeUpdater.close(); bandwidthGaugeUpdater = null; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java new file mode 100644 index 00000000000..6da64e124ef --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlobOperationDescriptor.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import com.microsoft.azure.storage.Constants.HeaderConstants; +import org.apache.hadoop.classification.InterfaceAudience; +import java.net.HttpURLConnection; +import java.net.URL; + +/** + * Determines the operation type (PutBlock, PutPage, GetBlob, etc) of Azure + * Storage operations. This is used by the handlers of the SendingRequestEvent + * and ResponseReceivedEvent exposed by the Azure Storage SDK to identify + * operation types (since the type of operation is not exposed by the SDK). + */ +@InterfaceAudience.Private +final class BlobOperationDescriptor { + + private BlobOperationDescriptor() { + // hide default constructor + } + + /** + * Gets the content length for the Azure Storage operation from the + * 'x-ms-range' header, if set. + * @param range the value of the 'x-ms-range' header. + * @return the content length, or zero if not set. + */ + private static long getContentLengthIfKnown(String range) { + long contentLength = 0; + // Format is "bytes=%d-%d" + if (range != null && range.startsWith("bytes=")) { + String[] offsets = range.substring("bytes=".length()).split("-"); + if (offsets.length == 2) { + contentLength = Long.parseLong(offsets[1]) - Long.parseLong(offsets[0]) + + 1; + } + } + return contentLength; + } + + /** + * Gets the content length for the Azure Storage operation, or returns zero if + * unknown. + * @param conn the connection object for the Azure Storage operation. + * @param operationType the Azure Storage operation type. + * @return the content length, or zero if unknown. + */ + static long getContentLengthIfKnown(HttpURLConnection conn, + OperationType operationType) { + long contentLength = 0; + switch (operationType) { + case AppendBlock: + case PutBlock: + String lengthString = conn.getRequestProperty( + HeaderConstants.CONTENT_LENGTH); + contentLength = (lengthString != null) + ? Long.parseLong(lengthString) + : 0; + break; + case PutPage: + case GetBlob: + contentLength = BlobOperationDescriptor.getContentLengthIfKnown( + conn.getRequestProperty("x-ms-range")); + break; + default: + break; + } + return contentLength; + } + + /** + * Gets the operation type of an Azure Storage operation. + * + * @param conn the connection object for the Azure Storage operation. + * @return the operation type. + */ + static OperationType getOperationType(HttpURLConnection conn) { + OperationType operationType = OperationType.Unknown; + String method = conn.getRequestMethod(); + String compValue = getQueryParameter(conn.getURL(), + "comp"); + + if (method.equalsIgnoreCase("PUT")) { + if (compValue != null) { + switch (compValue) { + case "metadata": + operationType = OperationType.SetMetadata; + break; + case "properties": + operationType = OperationType.SetProperties; + break; + case "block": + operationType = OperationType.PutBlock; + break; + case "page": + String pageWrite = conn.getRequestProperty("x-ms-page-write"); + if (pageWrite != null && pageWrite.equalsIgnoreCase( + "UPDATE")) { + operationType = OperationType.PutPage; + } + break; + case "appendblock": + operationType = OperationType.AppendBlock; + break; + case "blocklist": + operationType = OperationType.PutBlockList; + break; + default: + break; + } + } else { + String blobType = conn.getRequestProperty("x-ms-blob-type"); + if (blobType != null + && (blobType.equalsIgnoreCase("PageBlob") + || blobType.equalsIgnoreCase("BlockBlob") + || blobType.equalsIgnoreCase("AppendBlob"))) { + operationType = OperationType.CreateBlob; + } else if (blobType == null) { + String resType = getQueryParameter(conn.getURL(), + "restype"); + if (resType != null + && resType.equalsIgnoreCase("container")) { + operationType = operationType.CreateContainer; + } + } + } + } else if (method.equalsIgnoreCase("GET")) { + if (compValue != null) { + switch (compValue) { + case "list": + operationType = OperationType.ListBlobs; + break; + + case "metadata": + operationType = OperationType.GetMetadata; + break; + case "blocklist": + operationType = OperationType.GetBlockList; + break; + case "pagelist": + operationType = OperationType.GetPageList; + break; + default: + break; + } + } else if (conn.getRequestProperty("x-ms-range") != null) { + operationType = OperationType.GetBlob; + } + } else if (method.equalsIgnoreCase("HEAD")) { + operationType = OperationType.GetProperties; + } else if (method.equalsIgnoreCase("DELETE")) { + String resType = getQueryParameter(conn.getURL(), + "restype"); + if (resType != null + && resType.equalsIgnoreCase("container")) { + operationType = operationType.DeleteContainer; + } else { + operationType = OperationType.DeleteBlob; + } + } + return operationType; + } + + private static String getQueryParameter(URL url, String queryParameterName) { + String query = (url != null) ? url.getQuery(): null; + + if (query == null) { + return null; + } + + String searchValue = queryParameterName + "="; + + int offset = query.indexOf(searchValue); + String value = null; + if (offset != -1) { + int beginIndex = offset + searchValue.length(); + int endIndex = query.indexOf('&', beginIndex); + value = (endIndex == -1) + ? query.substring(beginIndex) + : query.substring(beginIndex, endIndex); + } + return value; + } + + @InterfaceAudience.Private + enum OperationType { + AppendBlock, + CreateBlob, + CreateContainer, + DeleteBlob, + DeleteContainer, + GetBlob, + GetBlockList, + GetMetadata, + GetPageList, + GetProperties, + ListBlobs, + PutBlock, + PutBlockList, + PutPage, + SetMetadata, + SetProperties, + Unknown + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java new file mode 100644 index 00000000000..aa7ac2e1d7b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingAnalyzer.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Throttles storage operations to minimize errors and maximum throughput. This + * improves throughput by as much as 35% when the service throttles requests due + * to exceeding account level ingress or egress limits. + */ +@InterfaceAudience.Private +class ClientThrottlingAnalyzer { + private static final Logger LOG = LoggerFactory.getLogger( + ClientThrottlingAnalyzer.class); + private static final int DEFAULT_ANALYSIS_PERIOD_MS = 10 * 1000; + private static final int MIN_ANALYSIS_PERIOD_MS = 1000; + private static final int MAX_ANALYSIS_PERIOD_MS = 30000; + private static final double MIN_ACCEPTABLE_ERROR_PERCENTAGE = .1; + private static final double MAX_EQUILIBRIUM_ERROR_PERCENTAGE = 1; + private static final double RAPID_SLEEP_DECREASE_FACTOR = .75; + private static final double RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS = 150 + * 1000; + private static final double SLEEP_DECREASE_FACTOR = .975; + private static final double SLEEP_INCREASE_FACTOR = 1.05; + private int analysisPeriodMs; + + private volatile int sleepDuration = 0; + private long consecutiveNoErrorCount = 0; + private String name = null; + private Timer timer = null; + private AtomicReference blobMetrics = null; + + private ClientThrottlingAnalyzer() { + // hide default constructor + } + + /** + * Creates an instance of the ClientThrottlingAnalyzer class with + * the specified name. + * + * @param name a name used to identify this instance. + * + * @throws IllegalArgumentException if name is null or empty. + */ + ClientThrottlingAnalyzer(String name) throws IllegalArgumentException { + this(name, DEFAULT_ANALYSIS_PERIOD_MS); + } + + /** + * Creates an instance of the ClientThrottlingAnalyzer class with + * the specified name and period. + * + * @param name A name used to identify this instance. + * + * @param period The frequency, in milliseconds, at which metrics are + * analyzed. + * + * @throws IllegalArgumentException + * If name is null or empty. + * If period is less than 1000 or greater than 30000 milliseconds. + */ + ClientThrottlingAnalyzer(String name, int period) + throws IllegalArgumentException { + Preconditions.checkArgument( + StringUtils.isNotEmpty(name), + "The argument 'name' cannot be null or empty."); + Preconditions.checkArgument( + period >= MIN_ANALYSIS_PERIOD_MS && period <= MAX_ANALYSIS_PERIOD_MS, + "The argument 'period' must be between 1000 and 30000."); + this.name = name; + this.analysisPeriodMs = period; + this.blobMetrics = new AtomicReference( + new BlobOperationMetrics(System.currentTimeMillis())); + this.timer = new Timer( + String.format("wasb-timer-client-throttling-analyzer-%s", name)); + this.timer.schedule(new TimerTaskImpl(), + analysisPeriodMs, + analysisPeriodMs); + } + + /** + * Updates metrics with results from the current storage operation. + * + * @param count The count of bytes transferred. + * + * @param isFailedOperation True if the operation failed; otherwise false. + */ + public void addBytesTransferred(long count, boolean isFailedOperation) { + BlobOperationMetrics metrics = blobMetrics.get(); + if (isFailedOperation) { + metrics.bytesFailed.addAndGet(count); + metrics.operationsFailed.incrementAndGet(); + } else { + metrics.bytesSuccessful.addAndGet(count); + metrics.operationsSuccessful.incrementAndGet(); + } + } + + /** + * Suspends the current storage operation, as necessary, to reduce throughput. + */ + public void suspendIfNecessary() { + int duration = sleepDuration; + if (duration > 0) { + try { + Thread.sleep(duration); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } + } + + @VisibleForTesting + int getSleepDuration() { + return sleepDuration; + } + + private int analyzeMetricsAndUpdateSleepDuration(BlobOperationMetrics metrics, + int sleepDuration) { + final double percentageConversionFactor = 100; + double bytesFailed = metrics.bytesFailed.get(); + double bytesSuccessful = metrics.bytesSuccessful.get(); + double operationsFailed = metrics.operationsFailed.get(); + double operationsSuccessful = metrics.operationsSuccessful.get(); + double errorPercentage = (bytesFailed <= 0) + ? 0 + : percentageConversionFactor + * bytesFailed + / (bytesFailed + bytesSuccessful); + long periodMs = metrics.endTime - metrics.startTime; + + double newSleepDuration; + + if (errorPercentage < MIN_ACCEPTABLE_ERROR_PERCENTAGE) { + ++consecutiveNoErrorCount; + // Decrease sleepDuration in order to increase throughput. + double reductionFactor = + (consecutiveNoErrorCount * analysisPeriodMs + >= RAPID_SLEEP_DECREASE_TRANSITION_PERIOD_MS) + ? RAPID_SLEEP_DECREASE_FACTOR + : SLEEP_DECREASE_FACTOR; + + newSleepDuration = sleepDuration * reductionFactor; + } else if (errorPercentage < MAX_EQUILIBRIUM_ERROR_PERCENTAGE) { + // Do not modify sleepDuration in order to stabilize throughput. + newSleepDuration = sleepDuration; + } else { + // Increase sleepDuration in order to minimize error rate. + consecutiveNoErrorCount = 0; + + // Increase sleep duration in order to reduce throughput and error rate. + // First, calculate target throughput: bytesSuccessful / periodMs. + // Next, calculate time required to send *all* data (assuming next period + // is similar to previous) at the target throughput: (bytesSuccessful + // + bytesFailed) * periodMs / bytesSuccessful. Next, subtract periodMs to + // get the total additional delay needed. + double additionalDelayNeeded = 5 * analysisPeriodMs; + if (bytesSuccessful > 0) { + additionalDelayNeeded = (bytesSuccessful + bytesFailed) + * periodMs + / bytesSuccessful + - periodMs; + } + + // amortize the additional delay needed across the estimated number of + // requests during the next period + newSleepDuration = additionalDelayNeeded + / (operationsFailed + operationsSuccessful); + + final double maxSleepDuration = analysisPeriodMs; + final double minSleepDuration = sleepDuration * SLEEP_INCREASE_FACTOR; + + // Add 1 ms to avoid rounding down and to decrease proximity to the server + // side ingress/egress limit. Ensure that the new sleep duration is + // larger than the current one to more quickly reduce the number of + // errors. Don't allow the sleep duration to grow unbounded, after a + // certain point throttling won't help, for example, if there are far too + // many tasks/containers/nodes no amount of throttling will help. + newSleepDuration = Math.max(newSleepDuration, minSleepDuration) + 1; + newSleepDuration = Math.min(newSleepDuration, maxSleepDuration); + } + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "%5.5s, %10d, %10d, %10d, %10d, %6.2f, %5d, %5d, %5d", + name, + (int) bytesFailed, + (int) bytesSuccessful, + (int) operationsFailed, + (int) operationsSuccessful, + errorPercentage, + periodMs, + (int) sleepDuration, + (int) newSleepDuration)); + } + + return (int) newSleepDuration; + } + + /** + * Timer callback implementation for periodically analyzing metrics. + */ + class TimerTaskImpl extends TimerTask { + private AtomicInteger doingWork = new AtomicInteger(0); + + /** + * Periodically analyzes a snapshot of the blob storage metrics and updates + * the sleepDuration in order to appropriately throttle storage operations. + */ + @Override + public void run() { + boolean doWork = false; + try { + doWork = doingWork.compareAndSet(0, 1); + + // prevent concurrent execution of this task + if (!doWork) { + return; + } + + long now = System.currentTimeMillis(); + if (now - blobMetrics.get().startTime >= analysisPeriodMs) { + BlobOperationMetrics oldMetrics = blobMetrics.getAndSet( + new BlobOperationMetrics(now)); + oldMetrics.endTime = now; + sleepDuration = analyzeMetricsAndUpdateSleepDuration(oldMetrics, + sleepDuration); + } + } + finally { + if (doWork) { + doingWork.set(0); + } + } + } + } + + /** + * Stores blob operation metrics during each analysis period. + */ + static class BlobOperationMetrics { + private AtomicLong bytesFailed; + private AtomicLong bytesSuccessful; + private AtomicLong operationsFailed; + private AtomicLong operationsSuccessful; + private long endTime; + private long startTime; + + BlobOperationMetrics(long startTime) { + this.startTime = startTime; + this.bytesFailed = new AtomicLong(); + this.bytesSuccessful = new AtomicLong(); + this.operationsFailed = new AtomicLong(); + this.operationsSuccessful = new AtomicLong(); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java new file mode 100644 index 00000000000..9da993bd237 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/ClientThrottlingIntercept.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import com.microsoft.azure.storage.ErrorReceivingResponseEvent; +import com.microsoft.azure.storage.OperationContext; +import com.microsoft.azure.storage.RequestResult; +import com.microsoft.azure.storage.ResponseReceivedEvent; +import com.microsoft.azure.storage.SendingRequestEvent; +import com.microsoft.azure.storage.StorageEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +import java.net.HttpURLConnection; + +/** + * Throttles Azure Storage read and write operations to achieve maximum + * throughput by minimizing errors. The errors occur when the account ingress + * or egress limits are exceeded and the server-side throttles requests. + * Server-side throttling causes the retry policy to be used, but the retry + * policy sleeps for long periods of time causing the total ingress or egress + * throughput to be as much as 35% lower than optimal. The retry policy is also + * after the fact, in that it applies after a request fails. On the other hand, + * the client-side throttling implemented here happens before requests are made + * and sleeps just enough to minimize errors, allowing optimal ingress and/or + * egress throughput. + */ +@InterfaceAudience.Private +final class ClientThrottlingIntercept { + private static final Logger LOG = LoggerFactory.getLogger( + ClientThrottlingIntercept.class); + private static ClientThrottlingIntercept singleton = null; + private ClientThrottlingAnalyzer readThrottler = null; + private ClientThrottlingAnalyzer writeThrottler = null; + + // Hide default constructor + private ClientThrottlingIntercept() { + readThrottler = new ClientThrottlingAnalyzer("read"); + writeThrottler = new ClientThrottlingAnalyzer("write"); + LOG.debug("Client-side throttling is enabled for the WASB file system."); + } + + static synchronized void initializeSingleton() { + if (singleton == null) { + singleton = new ClientThrottlingIntercept(); + } + } + + static void hook(OperationContext context) { + context.getErrorReceivingResponseEventHandler().addListener( + new ErrorReceivingResponseEventHandler()); + context.getSendingRequestEventHandler().addListener( + new SendingRequestEventHandler()); + context.getResponseReceivedEventHandler().addListener( + new ResponseReceivedEventHandler()); + } + + private static void updateMetrics(HttpURLConnection conn, + RequestResult result) { + BlobOperationDescriptor.OperationType operationType + = BlobOperationDescriptor.getOperationType(conn); + int status = result.getStatusCode(); + long contentLength = 0; + // If the socket is terminated prior to receiving a response, the HTTP + // status may be 0 or -1. A status less than 200 or greater than or equal + // to 500 is considered an error. + boolean isFailedOperation = (status < HttpURLConnection.HTTP_OK + || status >= java.net.HttpURLConnection.HTTP_INTERNAL_ERROR); + + switch (operationType) { + case AppendBlock: + case PutBlock: + case PutPage: + contentLength = BlobOperationDescriptor.getContentLengthIfKnown(conn, + operationType); + if (contentLength > 0) { + singleton.writeThrottler.addBytesTransferred(contentLength, + isFailedOperation); + } + break; + case GetBlob: + contentLength = BlobOperationDescriptor.getContentLengthIfKnown(conn, + operationType); + if (contentLength > 0) { + singleton.readThrottler.addBytesTransferred(contentLength, + isFailedOperation); + } + break; + default: + break; + } + } + + /** + * Called when a network error occurs before the HTTP status and response + * headers are received. Client-side throttling uses this to collect metrics. + * + * @param event The connection, operation, and request state. + */ + public static void errorReceivingResponse(ErrorReceivingResponseEvent event) { + updateMetrics((HttpURLConnection) event.getConnectionObject(), + event.getRequestResult()); + } + + /** + * Called before the Azure Storage SDK sends a request. Client-side throttling + * uses this to suspend the request, if necessary, to minimize errors and + * maximize throughput. + * + * @param event The connection, operation, and request state. + */ + public static void sendingRequest(SendingRequestEvent event) { + BlobOperationDescriptor.OperationType operationType + = BlobOperationDescriptor.getOperationType( + (HttpURLConnection) event.getConnectionObject()); + switch (operationType) { + case GetBlob: + singleton.readThrottler.suspendIfNecessary(); + break; + case AppendBlock: + case PutBlock: + case PutPage: + singleton.writeThrottler.suspendIfNecessary(); + break; + default: + break; + } + } + + /** + * Called after the Azure Storage SDK receives a response. Client-side + * throttling uses this to collect metrics. + * + * @param event The connection, operation, and request state. + */ + public static void responseReceived(ResponseReceivedEvent event) { + updateMetrics((HttpURLConnection) event.getConnectionObject(), + event.getRequestResult()); + } + + /** + * The ErrorReceivingResponseEvent is fired when the Azure Storage SDK + * encounters a network error before the HTTP status and response headers are + * received. + */ + @InterfaceAudience.Private + static class ErrorReceivingResponseEventHandler + extends StorageEvent { + + /** + * Called when a network error occurs before the HTTP status and response + * headers are received. Client-side throttling uses this to collect + * metrics. + * + * @param event The connection, operation, and request state. + */ + @Override + public void eventOccurred(ErrorReceivingResponseEvent event) { + singleton.errorReceivingResponse(event); + } + } + + /** + * The SendingRequestEvent is fired before the Azure Storage SDK sends a + * request. + */ + @InterfaceAudience.Private + static class SendingRequestEventHandler + extends StorageEvent { + + /** + * Called before the Azure Storage SDK sends a request. Client-side + * throttling uses this to suspend the request, if necessary, to minimize + * errors and maximize throughput. + * + * @param event The connection, operation, and request state. + */ + @Override + public void eventOccurred(SendingRequestEvent event) { + singleton.sendingRequest(event); + } + } + + /** + * The ResponseReceivedEvent is fired after the Azure Storage SDK receives a + * response. + */ + @InterfaceAudience.Private + static class ResponseReceivedEventHandler + extends StorageEvent { + + /** + * Called after the Azure Storage SDK receives a response. Client-side + * throttling uses this + * to collect metrics. + * + * @param event The connection, operation, and request state. + */ + @Override + public void eventOccurred(ResponseReceivedEvent event) { + singleton.responseReceived(event); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java index 51867cd728a..d04a19ca3e3 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AbstractWasbTestBase.java @@ -67,4 +67,8 @@ public abstract class AbstractWasbTestBase { protected abstract AzureBlobStorageTestAccount createTestAccount() throws Exception; + + protected AzureBlobStorageTestAccount getTestAccount() { + return testAccount; + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java new file mode 100644 index 00000000000..07d4ebc8632 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestBlobOperationDescriptor.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import com.microsoft.azure.storage.OperationContext; +import com.microsoft.azure.storage.ResponseReceivedEvent; +import com.microsoft.azure.storage.SendingRequestEvent; +import com.microsoft.azure.storage.StorageEvent; +import com.microsoft.azure.storage.blob.BlobInputStream; +import com.microsoft.azure.storage.blob.BlobOutputStream; +import com.microsoft.azure.storage.blob.CloudAppendBlob; +import com.microsoft.azure.storage.blob.CloudBlobContainer; +import com.microsoft.azure.storage.blob.CloudBlockBlob; +import com.microsoft.azure.storage.blob.CloudPageBlob; +import org.apache.hadoop.classification.InterfaceAudience; +import org.junit.Test; + +import java.net.HttpURLConnection; + +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertEquals; + +/** + * Tests for BlobOperationDescriptor. + */ +public class TestBlobOperationDescriptor extends AbstractWasbTestBase { + private BlobOperationDescriptor.OperationType lastOperationTypeReceived; + private BlobOperationDescriptor.OperationType lastOperationTypeSent; + private long lastContentLengthReceived; + + @Override + protected AzureBlobStorageTestAccount createTestAccount() throws Exception { + return AzureBlobStorageTestAccount.create(); + } + + @Test + public void testAppendBlockOperations() throws Exception { + CloudBlobContainer container = getTestAccount().getRealContainer(); + + OperationContext context = new OperationContext(); + context.getResponseReceivedEventHandler().addListener( + new ResponseReceivedEventHandler()); + context.getSendingRequestEventHandler().addListener( + new SendingRequestEventHandler()); + + CloudAppendBlob appendBlob = container.getAppendBlobReference( + "testAppendBlockOperations"); + assertNull(lastOperationTypeSent); + assertNull(lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + try ( + BlobOutputStream output + = appendBlob.openWriteNew(null, null, context); + ) { + assertEquals(BlobOperationDescriptor.OperationType.CreateBlob, + lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + String message = "this is a test"; + output.write(message.getBytes("UTF-8")); + output.flush(); + assertEquals(BlobOperationDescriptor.OperationType.AppendBlock, + lastOperationTypeSent); + assertEquals(BlobOperationDescriptor.OperationType.AppendBlock, + lastOperationTypeReceived); + assertEquals(message.length(), lastContentLengthReceived); + } + } + + @Test + public void testPutBlockOperations() throws Exception { + CloudBlobContainer container = getTestAccount().getRealContainer(); + + OperationContext context = new OperationContext(); + context.getResponseReceivedEventHandler().addListener( + new ResponseReceivedEventHandler()); + context.getSendingRequestEventHandler().addListener( + new SendingRequestEventHandler()); + + CloudBlockBlob blockBlob = container.getBlockBlobReference( + "testPutBlockOperations"); + assertNull(lastOperationTypeSent); + assertNull(lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + try ( + BlobOutputStream output + = blockBlob.openOutputStream(null, + null, + context); + ) { + assertNull(lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + String message = "this is a test"; + output.write(message.getBytes("UTF-8")); + output.flush(); + assertEquals(BlobOperationDescriptor.OperationType.PutBlock, + lastOperationTypeSent); + assertEquals(BlobOperationDescriptor.OperationType.PutBlock, + lastOperationTypeReceived); + assertEquals(message.length(), lastContentLengthReceived); + } + assertEquals(BlobOperationDescriptor.OperationType.PutBlockList, + lastOperationTypeSent); + assertEquals(BlobOperationDescriptor.OperationType.PutBlockList, + lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + } + + @Test + public void testPutPageOperations() throws Exception { + CloudBlobContainer container = getTestAccount().getRealContainer(); + + OperationContext context = new OperationContext(); + context.getResponseReceivedEventHandler().addListener( + new ResponseReceivedEventHandler()); + context.getSendingRequestEventHandler().addListener( + new SendingRequestEventHandler()); + + CloudPageBlob pageBlob = container.getPageBlobReference( + "testPutPageOperations"); + assertNull(lastOperationTypeSent); + assertNull(lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + try ( + BlobOutputStream output = pageBlob.openWriteNew(1024, + null, + null, + context); + ) { + assertEquals(BlobOperationDescriptor.OperationType.CreateBlob, + lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + final int pageSize = 512; + byte[] buffer = new byte[pageSize]; + output.write(buffer); + output.flush(); + assertEquals(BlobOperationDescriptor.OperationType.PutPage, + lastOperationTypeSent); + assertEquals(BlobOperationDescriptor.OperationType.PutPage, + lastOperationTypeReceived); + assertEquals(buffer.length, lastContentLengthReceived); + } + } + + @Test + public void testGetBlobOperations() throws Exception { + CloudBlobContainer container = getTestAccount().getRealContainer(); + + OperationContext context = new OperationContext(); + context.getResponseReceivedEventHandler().addListener( + new ResponseReceivedEventHandler()); + context.getSendingRequestEventHandler().addListener( + new SendingRequestEventHandler()); + + CloudBlockBlob blockBlob = container.getBlockBlobReference( + "testGetBlobOperations"); + assertNull(lastOperationTypeSent); + assertNull(lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + String message = "this is a test"; + + try ( + BlobOutputStream output = blockBlob.openOutputStream(null, + null, + context); + ) { + assertNull(lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + output.write(message.getBytes("UTF-8")); + output.flush(); + assertEquals(BlobOperationDescriptor.OperationType.PutBlock, + lastOperationTypeSent); + assertEquals(BlobOperationDescriptor.OperationType.PutBlock, + lastOperationTypeReceived); + assertEquals(message.length(), lastContentLengthReceived); + } + assertEquals(BlobOperationDescriptor.OperationType.PutBlockList, + lastOperationTypeSent); + assertEquals(BlobOperationDescriptor.OperationType.PutBlockList, + lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + try ( + BlobInputStream input = blockBlob.openInputStream(null, + null, + context); + ) { + assertEquals(BlobOperationDescriptor.OperationType.GetProperties, + lastOperationTypeSent); + assertEquals(BlobOperationDescriptor.OperationType.GetProperties, + lastOperationTypeReceived); + assertEquals(0, lastContentLengthReceived); + + byte[] buffer = new byte[1024]; + int numBytesRead = input.read(buffer); + assertEquals(BlobOperationDescriptor.OperationType.GetBlob, + lastOperationTypeSent); + assertEquals(BlobOperationDescriptor.OperationType.GetBlob, + lastOperationTypeReceived); + assertEquals(message.length(), lastContentLengthReceived); + assertEquals(numBytesRead, lastContentLengthReceived); + } + } + + /** + * Called after the Azure Storage SDK receives a response. + * + * @param event The connection, operation, and request state. + */ + private void responseReceived(ResponseReceivedEvent event) { + HttpURLConnection conn = (HttpURLConnection) event.getConnectionObject(); + BlobOperationDescriptor.OperationType operationType + = BlobOperationDescriptor.getOperationType(conn); + lastOperationTypeReceived = operationType; + + switch (operationType) { + case AppendBlock: + case PutBlock: + case PutPage: + lastContentLengthReceived + = BlobOperationDescriptor.getContentLengthIfKnown(conn, + operationType); + break; + case GetBlob: + lastContentLengthReceived + = BlobOperationDescriptor.getContentLengthIfKnown(conn, + operationType); + break; + default: + lastContentLengthReceived = 0; + break; + } + } + + /** + * Called before the Azure Storage SDK sends a request. + * + * @param event The connection, operation, and request state. + */ + private void sendingRequest(SendingRequestEvent event) { + this.lastOperationTypeSent + = BlobOperationDescriptor.getOperationType( + (HttpURLConnection) event.getConnectionObject()); + } + + /** + * The ResponseReceivedEvent is fired after the Azure Storage SDK receives a + * response. + */ + @InterfaceAudience.Private + class ResponseReceivedEventHandler + extends StorageEvent { + + /** + * Called after the Azure Storage SDK receives a response. + * + * @param event The connection, operation, and request state. + */ + @Override + public void eventOccurred(ResponseReceivedEvent event) { + responseReceived(event); + } + } + + /** + * The SendingRequestEvent is fired before the Azure Storage SDK sends a + * request. + */ + @InterfaceAudience.Private + class SendingRequestEventHandler extends StorageEvent { + + /** + * Called before the Azure Storage SDK sends a request. + * + * @param event The connection, operation, and request state. + */ + @Override + public void eventOccurred(SendingRequestEvent event) { + sendingRequest(event); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java new file mode 100644 index 00000000000..307e5af5775 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestClientThrottlingAnalyzer.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure; + +import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer; +import org.junit.Test; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; + +/** + * Tests for ClientThrottlingAnalyzer. + */ +public class TestClientThrottlingAnalyzer { + private static final int ANALYSIS_PERIOD = 1000; + private static final int ANALYSIS_PERIOD_PLUS_10_PERCENT = ANALYSIS_PERIOD + + ANALYSIS_PERIOD / 10; + private static final long MEGABYTE = 1024 * 1024; + private static final int MAX_ACCEPTABLE_PERCENT_DIFFERENCE = 20; + + private void sleep(long milliseconds) { + try { + Thread.sleep(milliseconds); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private void fuzzyValidate(long expected, long actual, double percentage) { + final double lowerBound = Math.max(expected - percentage / 100 * expected, 0); + final double upperBound = expected + percentage / 100 * expected; + + assertTrue( + String.format( + "The actual value %1$d is not within the expected range: " + + "[%2$.2f, %3$.2f].", + actual, + lowerBound, + upperBound), + actual >= lowerBound && actual <= upperBound); + } + + private void validate(long expected, long actual) { + assertEquals( + String.format("The actual value %1$d is not the expected value %2$d.", + actual, + expected), + expected, actual); + } + + private void validateLessThanOrEqual(long maxExpected, long actual) { + assertTrue( + String.format( + "The actual value %1$d is not less than or equal to the maximum" + + " expected value %2$d.", + actual, + maxExpected), + actual < maxExpected); + } + + /** + * Ensure that there is no waiting (sleepDuration = 0) if the metrics have + * never been updated. This validates proper initialization of + * ClientThrottlingAnalyzer. + */ + @Test + public void testNoMetricUpdatesThenNoWaiting() { + ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + validate(0, analyzer.getSleepDuration()); + } + + /** + * Ensure that there is no waiting (sleepDuration = 0) if the metrics have + * only been updated with successful requests. + */ + @Test + public void testOnlySuccessThenNoWaiting() { + ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + analyzer.addBytesTransferred(8 * MEGABYTE, false); + validate(0, analyzer.getSleepDuration()); + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + validate(0, analyzer.getSleepDuration()); + } + + /** + * Ensure that there is waiting (sleepDuration != 0) if the metrics have + * only been updated with failed requests. Also ensure that the + * sleepDuration decreases over time. + */ + @Test + public void testOnlyErrorsAndWaiting() { + ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + analyzer.addBytesTransferred(4 * MEGABYTE, true); + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + final int expectedSleepDuration1 = 1100; + validateLessThanOrEqual(expectedSleepDuration1, analyzer.getSleepDuration()); + sleep(10 * ANALYSIS_PERIOD); + final int expectedSleepDuration2 = 900; + validateLessThanOrEqual(expectedSleepDuration2, analyzer.getSleepDuration()); + } + + /** + * Ensure that there is waiting (sleepDuration != 0) if the metrics have + * only been updated with both successful and failed requests. Also ensure + * that the sleepDuration decreases over time. + */ + @Test + public void testSuccessAndErrorsAndWaiting() { + ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + analyzer.addBytesTransferred(8 * MEGABYTE, false); + analyzer.addBytesTransferred(2 * MEGABYTE, true); + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + NanoTimer timer = new NanoTimer(); + analyzer.suspendIfNecessary(); + final int expectedElapsedTime = 126; + fuzzyValidate(expectedElapsedTime, + timer.elapsedTimeMs(), + MAX_ACCEPTABLE_PERCENT_DIFFERENCE); + sleep(10 * ANALYSIS_PERIOD); + final int expectedSleepDuration = 110; + validateLessThanOrEqual(expectedSleepDuration, analyzer.getSleepDuration()); + } + + /** + * Ensure that there is waiting (sleepDuration != 0) if the metrics have + * only been updated with many successful and failed requests. Also ensure + * that the sleepDuration decreases to zero over time. + */ + @Test + public void testManySuccessAndErrorsAndWaiting() { + ClientThrottlingAnalyzer analyzer = new ClientThrottlingAnalyzer( + "test", + ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + final int numberOfRequests = 20; + for (int i = 0; i < numberOfRequests; i++) { + analyzer.addBytesTransferred(8 * MEGABYTE, false); + analyzer.addBytesTransferred(2 * MEGABYTE, true); + } + sleep(ANALYSIS_PERIOD_PLUS_10_PERCENT); + NanoTimer timer = new NanoTimer(); + analyzer.suspendIfNecessary(); + fuzzyValidate(7, + timer.elapsedTimeMs(), + MAX_ACCEPTABLE_PERCENT_DIFFERENCE); + sleep(10 * ANALYSIS_PERIOD); + validate(0, analyzer.getSleepDuration()); + } +}