diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ff07174d1a2..d1e84cb0639 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -18,6 +18,11 @@ Release 2.7.0 - UNRELEASED Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys, Alexander Stojanovic, Brian Swan, and Min Wei via cnauroth) + HADOOP-10728. Metrics system for Windows Azure Storage Filesystem. + (Dexter Bradshaw, Mostafa Elhemali, Xi Fang, Johannes Klein, David Lao, + Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys, + Alexander Stojanovich, Brian Swan, and Min Wei via cnauroth) + IMPROVEMENTS HADOOP-11156. DelegateToFileSystem should implement diff --git a/hadoop-tools/hadoop-azure/.gitignore b/hadoop-tools/hadoop-azure/.gitignore index 09c10b19eb6..837b481682a 100644 --- a/hadoop-tools/hadoop-azure/.gitignore +++ b/hadoop-tools/hadoop-azure/.gitignore @@ -1 +1,2 @@ -.checkstyle \ No newline at end of file +.checkstyle +bin/ \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/README.txt b/hadoop-tools/hadoop-azure/README.txt index 4a067478f9f..73306d36299 100644 --- a/hadoop-tools/hadoop-azure/README.txt +++ b/hadoop-tools/hadoop-azure/README.txt @@ -12,9 +12,13 @@ Unit tests ============= Most of the tests will run without additional configuration. For complete testing, configuration in src/test/resources is required: - src/test/resources/azure-test.xml - src/test/resources/log4j.properties + + src/test/resources/azure-test.xml -> Defines Azure storage dependencies, including account information +The other files in src/test/resources do not normally need alteration: + log4j.properties -> Test logging setup + hadoop-metrics2-azure-file-system.properties -> used to wire up instrumentation for testing + From command-line ------------------ Basic execution: @@ -59,6 +63,12 @@ Enable the Azure emulator tests by setting fs.azure.test.emulator -> true in src\test\resources\azure-test.xml +Known issues: + Symptom: When running tests for emulator, you see the following failure message + com.microsoft.windowsazure.storage.StorageException: The value for one of the HTTP headers is not in the correct format. + Issue: The emulator can get into a confused state. + Fix: Restart the Azure Emulator. Ensure it is v3.2 or later. + Running tests against live Azure storage ------------------------------------------------------------------------- In order to run WASB unit tests against a live Azure Storage account, add credentials to @@ -101,4 +111,8 @@ Eclipse: NOTE: - After any change to the checkstyle rules xml, use window|preferences|checkstyle|{refresh}|OK - \ No newline at end of file +============= +Javadoc +============= +Command-line +> mvn javadoc:javadoc \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/pom.xml b/hadoop-tools/hadoop-azure/pom.xml index 376deb1027b..97c10fc38cb 100644 --- a/hadoop-tools/hadoop-azure/pom.xml +++ b/hadoop-tools/hadoop-azure/pom.xml @@ -37,22 +37,6 @@ - - - - src/test/resources - - log4j.properties - - - - src/test/resources - - azure-test.xml - - - - org.codehaus.mojo @@ -198,5 +182,11 @@ test-jar + + org.mockito + mockito-all + test + + diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java index c5b9afe6d80..5afbbbed0d7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/AzureNativeFileSystemStore.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.fs.azure; - import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER; import java.io.BufferedInputStream; @@ -46,6 +45,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; +import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater; +import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater; +import org.apache.hadoop.fs.azure.metrics.ResponseReceivedMetricUpdater; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.mortbay.util.ajax.JSON; @@ -69,8 +72,15 @@ import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption; import com.microsoft.windowsazure.storage.blob.ListBlobItem; import com.microsoft.windowsazure.storage.core.Utility; + +/** + * Core implementation of Windows Azure Filesystem for Hadoop. + * Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage + * + */ @InterfaceAudience.Private -class AzureNativeFileSystemStore implements NativeFileSystemStore { +@VisibleForTesting +public class AzureNativeFileSystemStore implements NativeFileSystemStore { /** * Configuration knob on whether we do block-level MD5 validation on @@ -169,6 +179,8 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore { private boolean isAnonymousCredentials = false; // Set to true if we are connecting using shared access signatures. private boolean connectingUsingSAS = false; + private AzureFileSystemInstrumentation instrumentation; + private BandwidthGaugeUpdater bandwidthGaugeUpdater; private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer(); private boolean suppressRetryPolicy = false; @@ -301,6 +313,11 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore { this.storageInteractionLayer = storageInteractionLayer; } + @VisibleForTesting + public BandwidthGaugeUpdater getBandwidthGaugeUpdater() { + return bandwidthGaugeUpdater; + } + /** * Check if concurrent reads and writes on the same blob are allowed. * @@ -325,12 +342,18 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore { * if URI or job object is null, or invalid scheme. */ @Override - public void initialize(URI uri, Configuration conf) throws AzureException { + public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws AzureException { if (null == this.storageInteractionLayer) { this.storageInteractionLayer = new StorageInterfaceImpl(); } + this.instrumentation = instrumentation; + this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation); + if (null == this.storageInteractionLayer) { + this.storageInteractionLayer = new StorageInterfaceImpl(); + } + // Check that URI exists. // if (null == uri) { @@ -775,8 +798,10 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore { throw new AzureException(errMsg); } + instrumentation.setAccountName(accountName); String containerName = getContainerFromAuthority(sessionUri); - + instrumentation.setContainerName(containerName); + // Check whether this is a storage emulator account. if (isStorageEmulatorAccount(accountName)) { // It is an emulator account, connect to it with no credentials. @@ -1522,6 +1547,11 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore { selfThrottlingWriteFactor); } + ResponseReceivedMetricUpdater.hook( + operationContext, + instrumentation, + bandwidthGaugeUpdater); + // Bind operation context to receive send request callbacks on this // operation. // If reads concurrent to OOB writes are allowed, the interception will @@ -1535,6 +1565,8 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore { operationContext = testHookOperationContext .modifyOperationContext(operationContext); } + + ErrorMetricUpdater.hook(operationContext, instrumentation); // Return the operation context. return operationContext; @@ -2218,5 +2250,14 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore { @Override public void close() { + bandwidthGaugeUpdater.close(); + } + + // Finalizer to ensure complete shutdown + @Override + protected void finalize() throws Throwable { + LOG.debug("finalize() called"); + close(); + super.finalize(); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java index 30e6b3091e0..2b695732283 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeAzureFileSystem.java @@ -31,12 +31,13 @@ import java.util.Date; import java.util.Set; import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BufferedFSInputStream; import org.apache.hadoop.fs.FSDataInputStream; @@ -45,12 +46,14 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Progressable; - import com.google.common.annotations.VisibleForTesting; import com.microsoft.windowsazure.storage.core.Utility; @@ -369,8 +372,12 @@ public class NativeAzureFileSystem extends FileSystem { private AzureNativeFileSystemStore actualStore; private Path workingDir; private long blockSize = MAX_AZURE_BLOCK_SIZE; + private AzureFileSystemInstrumentation instrumentation; private static boolean suppressRetryPolicy = false; + // A counter to create unique (within-process) names for my metrics sources. + private static AtomicInteger metricsSourceNameCounter = new AtomicInteger(); + public NativeAzureFileSystem() { // set store in initialize() } @@ -396,6 +403,20 @@ public class NativeAzureFileSystem extends FileSystem { suppressRetryPolicy = false; } + /** + * Creates a new metrics source name that's unique within this process. + */ + @VisibleForTesting + public static String newMetricsSourceName() { + int number = metricsSourceNameCounter.incrementAndGet(); + final String baseName = "AzureFileSystemMetrics"; + if (number == 1) { // No need for a suffix for the first one + return baseName; + } else { + return baseName + number; + } + } + /** * Checks if the given URI scheme is a scheme that's affiliated with the Azure * File System. @@ -459,7 +480,16 @@ public class NativeAzureFileSystem extends FileSystem { store = createDefaultStore(conf); } - store.initialize(uri, conf); + // Make sure the metrics system is available before interacting with Azure + AzureFileSystemMetricsSystem.fileSystemStarted(); + String sourceName = newMetricsSourceName(), + sourceDesc = "Azure Storage Volume File System metrics"; + instrumentation = DefaultMetricsSystem.instance().register(sourceName, + sourceDesc, new AzureFileSystemInstrumentation(conf)); + AzureFileSystemMetricsSystem.registerSource(sourceName, sourceDesc, + instrumentation); + + store.initialize(uri, conf, instrumentation); setConf(conf); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() @@ -535,9 +565,19 @@ public class NativeAzureFileSystem extends FileSystem { * @return The store object. */ @VisibleForTesting - AzureNativeFileSystemStore getStore() { + public AzureNativeFileSystemStore getStore() { return actualStore; } + + /** + * Gets the metrics source for this file system. + * This is mainly here for unit testing purposes. + * + * @return the metrics source. + */ + public AzureFileSystemInstrumentation getInstrumentation() { + return instrumentation; + } /** This optional operation is not yet supported. */ @Override @@ -622,6 +662,10 @@ public class NativeAzureFileSystem extends FileSystem { // Construct the data output stream from the buffered output stream. FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); + + // Increment the counter + instrumentation.fileCreated(); + // Return data output stream to caller. return fsOut; } @@ -682,6 +726,7 @@ public class NativeAzureFileSystem extends FileSystem { store.updateFolderLastModifiedTime(parentKey); } } + instrumentation.fileDeleted(); store.delete(key); } else { // The path specifies a folder. Recursively delete all entries under the @@ -724,6 +769,7 @@ public class NativeAzureFileSystem extends FileSystem { p.getKey().lastIndexOf(PATH_DELIMITER)); if (!p.isDir()) { store.delete(key + suffix); + instrumentation.fileDeleted(); } else { // Recursively delete contents of the sub-folders. Notice this also // deletes the blob for the directory. @@ -740,6 +786,7 @@ public class NativeAzureFileSystem extends FileSystem { String parentKey = pathToKey(parent); store.updateFolderLastModifiedTime(parentKey); } + instrumentation.directoryDeleted(); } // File or directory was successfully deleted. @@ -972,6 +1019,8 @@ public class NativeAzureFileSystem extends FileSystem { store.updateFolderLastModifiedTime(key, lastModified); } + instrumentation.directoryCreated(); + // otherwise throws exception return true; } @@ -1293,6 +1342,19 @@ public class NativeAzureFileSystem extends FileSystem { super.close(); // Close the store store.close(); + + // Notify the metrics system that this file system is closed, which may + // trigger one final metrics push to get the accurate final file system + // metrics out. + + long startTime = System.currentTimeMillis(); + + AzureFileSystemMetricsSystem.fileSystemClosed(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Submitting metrics when file system closed took " + + (System.currentTimeMillis() - startTime) + " ms."); + } } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java index 0fb3c2274a3..4e1d0b67257 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/NativeFileSystemStore.java @@ -26,6 +26,7 @@ import java.util.Date; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; import org.apache.hadoop.fs.permission.PermissionStatus; import com.google.common.annotations.VisibleForTesting; @@ -38,7 +39,7 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private interface NativeFileSystemStore { - void initialize(URI uri, Configuration conf) throws IOException; + void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IOException; void storeEmptyFolder(String key, PermissionStatus permissionStatus) throws AzureException; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java new file mode 100644 index 00000000000..e389d7c227d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemInstrumentation.java @@ -0,0 +1,395 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; + +/** + * A metrics source for the WASB file system to track all the metrics we care + * about for getting a clear picture of the performance/reliability/interaction + * of the Hadoop cluster with Azure Storage. + */ +@Metrics(about="Metrics for WASB", context="azureFileSystem") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public final class AzureFileSystemInstrumentation implements MetricsSource { + + public static final String METRIC_TAG_FILESYSTEM_ID = "wasbFileSystemId"; + public static final String METRIC_TAG_ACCOUNT_NAME = "accountName"; + public static final String METRIC_TAG_CONTAINTER_NAME = "containerName"; + + public static final String WASB_WEB_RESPONSES = "wasb_web_responses"; + public static final String WASB_BYTES_WRITTEN = + "wasb_bytes_written_last_second"; + public static final String WASB_BYTES_READ = + "wasb_bytes_read_last_second"; + public static final String WASB_RAW_BYTES_UPLOADED = + "wasb_raw_bytes_uploaded"; + public static final String WASB_RAW_BYTES_DOWNLOADED = + "wasb_raw_bytes_downloaded"; + public static final String WASB_FILES_CREATED = "wasb_files_created"; + public static final String WASB_FILES_DELETED = "wasb_files_deleted"; + public static final String WASB_DIRECTORIES_CREATED = "wasb_directories_created"; + public static final String WASB_DIRECTORIES_DELETED = "wasb_directories_deleted"; + public static final String WASB_UPLOAD_RATE = + "wasb_maximum_upload_bytes_per_second"; + public static final String WASB_DOWNLOAD_RATE = + "wasb_maximum_download_bytes_per_second"; + public static final String WASB_UPLOAD_LATENCY = + "wasb_average_block_upload_latency_ms"; + public static final String WASB_DOWNLOAD_LATENCY = + "wasb_average_block_download_latency_ms"; + public static final String WASB_CLIENT_ERRORS = "wasb_client_errors"; + public static final String WASB_SERVER_ERRORS = "wasb_server_errors"; + + /** + * Config key for how big the rolling window size for latency metrics should + * be (in seconds). + */ + private static final String KEY_ROLLING_WINDOW_SIZE = "fs.azure.metrics.rolling.window.size"; + + private final MetricsRegistry registry = + new MetricsRegistry("azureFileSystem") + .setContext("azureFileSystem"); + private final MutableCounterLong numberOfWebResponses = + registry.newCounter( + WASB_WEB_RESPONSES, + "Total number of web responses obtained from Azure Storage", + 0L); + private AtomicLong inMemoryNumberOfWebResponses = new AtomicLong(0); + private final MutableCounterLong numberOfFilesCreated = + registry.newCounter( + WASB_FILES_CREATED, + "Total number of files created through the WASB file system.", + 0L); + private final MutableCounterLong numberOfFilesDeleted = + registry.newCounter( + WASB_FILES_DELETED, + "Total number of files deleted through the WASB file system.", + 0L); + private final MutableCounterLong numberOfDirectoriesCreated = + registry.newCounter( + WASB_DIRECTORIES_CREATED, + "Total number of directories created through the WASB file system.", + 0L); + private final MutableCounterLong numberOfDirectoriesDeleted = + registry.newCounter( + WASB_DIRECTORIES_DELETED, + "Total number of directories deleted through the WASB file system.", + 0L); + private final MutableGaugeLong bytesWrittenInLastSecond = + registry.newGauge( + WASB_BYTES_WRITTEN, + "Total number of bytes written to Azure Storage during the last second.", + 0L); + private final MutableGaugeLong bytesReadInLastSecond = + registry.newGauge( + WASB_BYTES_READ, + "Total number of bytes read from Azure Storage during the last second.", + 0L); + private final MutableGaugeLong maximumUploadBytesPerSecond = + registry.newGauge( + WASB_UPLOAD_RATE, + "The maximum upload rate encountered to Azure Storage in bytes/second.", + 0L); + private final MutableGaugeLong maximumDownloadBytesPerSecond = + registry.newGauge( + WASB_DOWNLOAD_RATE, + "The maximum download rate encountered to Azure Storage in bytes/second.", + 0L); + private final MutableCounterLong rawBytesUploaded = + registry.newCounter( + WASB_RAW_BYTES_UPLOADED, + "Total number of raw bytes (including overhead) uploaded to Azure" + + " Storage.", + 0L); + private final MutableCounterLong rawBytesDownloaded = + registry.newCounter( + WASB_RAW_BYTES_DOWNLOADED, + "Total number of raw bytes (including overhead) downloaded from Azure" + + " Storage.", + 0L); + private final MutableCounterLong clientErrors = + registry.newCounter( + WASB_CLIENT_ERRORS, + "Total number of client-side errors by WASB (excluding 404).", + 0L); + private final MutableCounterLong serverErrors = + registry.newCounter( + WASB_SERVER_ERRORS, + "Total number of server-caused errors by WASB.", + 0L); + private final MutableGaugeLong averageBlockUploadLatencyMs; + private final MutableGaugeLong averageBlockDownloadLatencyMs; + private long currentMaximumUploadBytesPerSecond; + private long currentMaximumDownloadBytesPerSecond; + private static final int DEFAULT_LATENCY_ROLLING_AVERAGE_WINDOW = + 5; // seconds + private final RollingWindowAverage currentBlockUploadLatency; + private final RollingWindowAverage currentBlockDownloadLatency; + private UUID fileSystemInstanceId; + + public AzureFileSystemInstrumentation(Configuration conf) { + fileSystemInstanceId = UUID.randomUUID(); + registry.tag("wasbFileSystemId", + "A unique identifier for the file ", + fileSystemInstanceId.toString()); + final int rollingWindowSizeInSeconds = + conf.getInt(KEY_ROLLING_WINDOW_SIZE, + DEFAULT_LATENCY_ROLLING_AVERAGE_WINDOW); + averageBlockUploadLatencyMs = + registry.newGauge( + WASB_UPLOAD_LATENCY, + String.format("The average latency in milliseconds of uploading a single block" + + ". The average latency is calculated over a %d-second rolling" + + " window.", rollingWindowSizeInSeconds), + 0L); + averageBlockDownloadLatencyMs = + registry.newGauge( + WASB_DOWNLOAD_LATENCY, + String.format("The average latency in milliseconds of downloading a single block" + + ". The average latency is calculated over a %d-second rolling" + + " window.", rollingWindowSizeInSeconds), + 0L); + currentBlockUploadLatency = + new RollingWindowAverage(rollingWindowSizeInSeconds * 1000); + currentBlockDownloadLatency = + new RollingWindowAverage(rollingWindowSizeInSeconds * 1000); + } + + /** + * The unique identifier for this file system in the metrics. + */ + public UUID getFileSystemInstanceId() { + return fileSystemInstanceId; + } + + /** + * Get the metrics registry information. + */ + public MetricsInfo getMetricsRegistryInfo() { + return registry.info(); + } + + /** + * Sets the account name to tag all the metrics with. + * @param accountName The account name. + */ + public void setAccountName(String accountName) { + registry.tag("accountName", + "Name of the Azure Storage account that these metrics are going against", + accountName); + } + + /** + * Sets the container name to tag all the metrics with. + * @param containerName The container name. + */ + public void setContainerName(String containerName) { + registry.tag("containerName", + "Name of the Azure Storage container that these metrics are going against", + containerName); + } + + /** + * Indicate that we just got a web response from Azure Storage. This should + * be called for every web request/response we do (to get accurate metrics + * of how we're hitting the storage service). + */ + public void webResponse() { + numberOfWebResponses.incr(); + inMemoryNumberOfWebResponses.incrementAndGet(); + } + + /** + * Gets the current number of web responses obtained from Azure Storage. + * @return The number of web responses. + */ + public long getCurrentWebResponses() { + return inMemoryNumberOfWebResponses.get(); + } + + /** + * Indicate that we just created a file through WASB. + */ + public void fileCreated() { + numberOfFilesCreated.incr(); + } + + /** + * Indicate that we just deleted a file through WASB. + */ + public void fileDeleted() { + numberOfFilesDeleted.incr(); + } + + /** + * Indicate that we just created a directory through WASB. + */ + public void directoryCreated() { + numberOfDirectoriesCreated.incr(); + } + + /** + * Indicate that we just deleted a directory through WASB. + */ + public void directoryDeleted() { + numberOfDirectoriesDeleted.incr(); + } + + /** + * Sets the current gauge value for how many bytes were written in the last + * second. + * @param currentBytesWritten The number of bytes. + */ + public void updateBytesWrittenInLastSecond(long currentBytesWritten) { + bytesWrittenInLastSecond.set(currentBytesWritten); + } + + /** + * Sets the current gauge value for how many bytes were read in the last + * second. + * @param currentBytesRead The number of bytes. + */ + public void updateBytesReadInLastSecond(long currentBytesRead) { + bytesReadInLastSecond.set(currentBytesRead); + } + + /** + * Record the current bytes-per-second upload rate seen. + * @param bytesPerSecond The bytes per second. + */ + public synchronized void currentUploadBytesPerSecond(long bytesPerSecond) { + if (bytesPerSecond > currentMaximumUploadBytesPerSecond) { + currentMaximumUploadBytesPerSecond = bytesPerSecond; + maximumUploadBytesPerSecond.set(bytesPerSecond); + } + } + + /** + * Record the current bytes-per-second download rate seen. + * @param bytesPerSecond The bytes per second. + */ + public synchronized void currentDownloadBytesPerSecond(long bytesPerSecond) { + if (bytesPerSecond > currentMaximumDownloadBytesPerSecond) { + currentMaximumDownloadBytesPerSecond = bytesPerSecond; + maximumDownloadBytesPerSecond.set(bytesPerSecond); + } + } + + /** + * Indicate that we just uploaded some data to Azure storage. + * @param numberOfBytes The raw number of bytes uploaded (including overhead). + */ + public void rawBytesUploaded(long numberOfBytes) { + rawBytesUploaded.incr(numberOfBytes); + } + + /** + * Indicate that we just downloaded some data to Azure storage. + * @param numberOfBytes The raw number of bytes downloaded (including overhead). + */ + public void rawBytesDownloaded(long numberOfBytes) { + rawBytesDownloaded.incr(numberOfBytes); + } + + /** + * Indicate that we just uploaded a block and record its latency. + * @param latency The latency in milliseconds. + */ + public void blockUploaded(long latency) { + currentBlockUploadLatency.addPoint(latency); + } + + /** + * Indicate that we just downloaded a block and record its latency. + * @param latency The latency in milliseconds. + */ + public void blockDownloaded(long latency) { + currentBlockDownloadLatency.addPoint(latency); + } + + /** + * Indicate that we just encountered a client-side error. + */ + public void clientErrorEncountered() { + clientErrors.incr(); + } + + /** + * Indicate that we just encountered a server-caused error. + */ + public void serverErrorEncountered() { + serverErrors.incr(); + } + + /** + * Get the current rolling average of the upload latency. + * @return rolling average of upload latency in milliseconds. + */ + public long getBlockUploadLatency() { + return currentBlockUploadLatency.getCurrentAverage(); + } + + /** + * Get the current rolling average of the download latency. + * @return rolling average of download latency in milliseconds. + */ + public long getBlockDownloadLatency() { + return currentBlockDownloadLatency.getCurrentAverage(); + } + + /** + * Get the current maximum upload bandwidth. + * @return maximum upload bandwidth in bytes per second. + */ + public long getCurrentMaximumUploadBandwidth() { + return currentMaximumUploadBytesPerSecond; + } + + /** + * Get the current maximum download bandwidth. + * @return maximum download bandwidth in bytes per second. + */ + public long getCurrentMaximumDownloadBandwidth() { + return currentMaximumDownloadBytesPerSecond; + + } + + @Override + public void getMetrics(MetricsCollector builder, boolean all) { + averageBlockDownloadLatencyMs.set( + currentBlockDownloadLatency.getCurrentAverage()); + averageBlockUploadLatencyMs.set( + currentBlockUploadLatency.getCurrentAverage()); + registry.snapshot(builder.addRecord(registry.info().name()), true); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java new file mode 100644 index 00000000000..a5f29c1f33d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/AzureFileSystemMetricsSystem.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; + +/** + * AzureFileSystemMetricsSystem + */ +@InterfaceAudience.Private +public final class AzureFileSystemMetricsSystem { + private static MetricsSystemImpl instance; + private static int numFileSystems; + + //private ctor + private AzureFileSystemMetricsSystem(){ + + } + + public static synchronized void fileSystemStarted() { + if (numFileSystems == 0) { + instance = new MetricsSystemImpl(); + instance.init("azure-file-system"); + } + numFileSystems++; + } + + public static synchronized void fileSystemClosed() { + if (instance != null) { + instance.publishMetricsNow(); + } + if (numFileSystems == 1) { + instance.stop(); + instance.shutdown(); + instance = null; + } + numFileSystems--; + } + + public static void registerSource(String name, String desc, + MetricsSource source) { + // Register the source with the name appended with -WasbSystem + // so that the name is globally unique. + instance.register(name + "-WasbSystem", desc, source); + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java new file mode 100644 index 00000000000..699fde7dee7 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/BandwidthGaugeUpdater.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import java.util.ArrayList; +import java.util.Date; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Internal implementation class to help calculate the current bytes + * uploaded/downloaded and the maximum bandwidth gauges. + */ +@InterfaceAudience.Private +public final class BandwidthGaugeUpdater { + public static final Log LOG = LogFactory + .getLog(BandwidthGaugeUpdater.class); + + public static final String THREAD_NAME = "AzureNativeFilesystemStore-UploadBandwidthUpdater"; + + private static final int DEFAULT_WINDOW_SIZE_MS = 1000; + private static final int PROCESS_QUEUE_INITIAL_CAPACITY = 1000; + private int windowSizeMs; + private ArrayList allBlocksWritten = + createNewToProcessQueue(); + private ArrayList allBlocksRead = + createNewToProcessQueue(); + private final Object blocksWrittenLock = new Object(); + private final Object blocksReadLock = new Object(); + private final AzureFileSystemInstrumentation instrumentation; + private Thread uploadBandwidthUpdater; + private volatile boolean suppressAutoUpdate = false; + + /** + * Create a new updater object with default values. + * @param instrumentation The metrics source to update. + */ + public BandwidthGaugeUpdater(AzureFileSystemInstrumentation instrumentation) { + this(instrumentation, DEFAULT_WINDOW_SIZE_MS, false); + } + + /** + * Create a new updater object with some overrides (used in unit tests). + * @param instrumentation The metrics source to update. + * @param windowSizeMs The window size to use for calculating bandwidth + * (in milliseconds). + * @param manualUpdateTrigger If true, then this object won't create the + * auto-update threads, and will wait for manual + * calls to triggerUpdate to occur. + */ + public BandwidthGaugeUpdater(AzureFileSystemInstrumentation instrumentation, + int windowSizeMs, boolean manualUpdateTrigger) { + this.windowSizeMs = windowSizeMs; + this.instrumentation = instrumentation; + if (!manualUpdateTrigger) { + uploadBandwidthUpdater = new Thread(new UploadBandwidthUpdater(), THREAD_NAME); + uploadBandwidthUpdater.setDaemon(true); + uploadBandwidthUpdater.start(); + } + } + + /** + * Indicate that a block has been uploaded. + * @param startDate The exact time the upload started. + * @param endDate The exact time the upload ended. + * @param length The number of bytes uploaded in the block. + */ + public void blockUploaded(Date startDate, Date endDate, long length) { + synchronized (blocksWrittenLock) { + allBlocksWritten.add(new BlockTransferWindow(startDate, endDate, length)); + } + } + + /** + * Indicate that a block has been downloaded. + * @param startDate The exact time the download started. + * @param endDate The exact time the download ended. + * @param length The number of bytes downloaded in the block. + */ + public void blockDownloaded(Date startDate, Date endDate, long length) { + synchronized (blocksReadLock) { + allBlocksRead.add(new BlockTransferWindow(startDate, endDate, length)); + } + } + + /** + * Creates a new ArrayList to hold incoming block transfer notifications + * before they're processed. + * @return The newly created ArrayList. + */ + private static ArrayList createNewToProcessQueue() { + return new ArrayList(PROCESS_QUEUE_INITIAL_CAPACITY); + } + + /** + * Update the metrics source gauge for how many bytes were transferred + * during the last time window. + * @param updateWrite If true, update the write (upload) counter. + * Otherwise update the read (download) counter. + * @param bytes The number of bytes transferred. + */ + private void updateBytesTransferred(boolean updateWrite, long bytes) { + if (updateWrite) { + instrumentation.updateBytesWrittenInLastSecond(bytes); + } + else { + instrumentation.updateBytesReadInLastSecond(bytes); + } + } + + /** + * Update the metrics source gauge for what the current transfer rate + * is. + * @param updateWrite If true, update the write (upload) counter. + * Otherwise update the read (download) counter. + * @param bytesPerSecond The number of bytes per second we're seeing. + */ + private void updateBytesTransferRate(boolean updateWrite, long bytesPerSecond) { + if (updateWrite) { + instrumentation.currentUploadBytesPerSecond(bytesPerSecond); + } + else { + instrumentation.currentDownloadBytesPerSecond(bytesPerSecond); + } + } + + /** + * For unit test purposes, suppresses auto-update of the metrics + * from the dedicated thread. + */ + public void suppressAutoUpdate() { + suppressAutoUpdate = true; + } + + /** + * Resumes auto-update (undo suppressAutoUpdate). + */ + public void resumeAutoUpdate() { + suppressAutoUpdate = false; + } + + /** + * Triggers the update of the metrics gauge based on all the blocks + * uploaded/downloaded so far. This is typically done periodically in a + * dedicated update thread, but exposing as public for unit test purposes. + * + * @param updateWrite If true, we'll update the write (upload) metrics. + * Otherwise we'll update the read (download) ones. + */ + public void triggerUpdate(boolean updateWrite) { + ArrayList toProcess = null; + synchronized (updateWrite ? blocksWrittenLock : blocksReadLock) { + if (updateWrite && !allBlocksWritten.isEmpty()) { + toProcess = allBlocksWritten; + allBlocksWritten = createNewToProcessQueue(); + } else if (!updateWrite && !allBlocksRead.isEmpty()) { + toProcess = allBlocksRead; + allBlocksRead = createNewToProcessQueue(); + } + } + + // Check to see if we have any blocks to process. + if (toProcess == null) { + // Nothing to process, set the current bytes and rate to zero. + updateBytesTransferred(updateWrite, 0); + updateBytesTransferRate(updateWrite, 0); + return; + } + + // The cut-off time for when we want to calculate rates is one + // window size ago from now. + long cutoffTime = new Date().getTime() - windowSizeMs; + + // Go through all the blocks we're processing, and calculate the + // total number of bytes processed as well as the maximum transfer + // rate we experienced for any single block during our time window. + long maxSingleBlockTransferRate = 0; + long bytesInLastSecond = 0; + for (BlockTransferWindow currentWindow : toProcess) { + long windowDuration = currentWindow.getEndDate().getTime() + - currentWindow.getStartDate().getTime(); + if (windowDuration == 0) { + // Edge case, assume it took 1 ms but we were too fast + windowDuration = 1; + } + if (currentWindow.getStartDate().getTime() > cutoffTime) { + // This block was transferred fully within our time window, + // just add its bytes to the total. + bytesInLastSecond += currentWindow.bytesTransferred; + } else if (currentWindow.getEndDate().getTime() > cutoffTime) { + // This block started its transfer before our time window, + // interpolate to estimate how many bytes from that block + // were actually transferred during our time window. + long adjustedBytes = (currentWindow.getBytesTransferred() + * (currentWindow.getEndDate().getTime() - cutoffTime)) + / windowDuration; + bytesInLastSecond += adjustedBytes; + } + // Calculate the transfer rate for this block. + long currentBlockTransferRate = + (currentWindow.getBytesTransferred() * 1000) / windowDuration; + maxSingleBlockTransferRate = + Math.max(maxSingleBlockTransferRate, currentBlockTransferRate); + } + updateBytesTransferred(updateWrite, bytesInLastSecond); + // The transfer rate we saw in the last second is a tricky concept to + // define: If we saw two blocks, one 2 MB block transferred in 0.2 seconds, + // and one 4 MB block transferred in 0.2 seconds, then the maximum rate + // is 20 MB/s (the 4 MB block), the average of the two blocks is 15 MB/s, + // and the aggregate rate is 6 MB/s (total of 6 MB transferred in one + // second). As a first cut, I'm taking the definition to be the maximum + // of aggregate or of any single block's rate (so in the example case it's + // 6 MB/s). + long aggregateTransferRate = bytesInLastSecond; + long maxObservedTransferRate = + Math.max(aggregateTransferRate, maxSingleBlockTransferRate); + updateBytesTransferRate(updateWrite, maxObservedTransferRate); + } + + /** + * A single block transfer. + */ + private static final class BlockTransferWindow { + private final Date startDate; + private final Date endDate; + private final long bytesTransferred; + + public BlockTransferWindow(Date startDate, Date endDate, + long bytesTransferred) { + this.startDate = startDate; + this.endDate = endDate; + this.bytesTransferred = bytesTransferred; + } + + public Date getStartDate() { return startDate; } + public Date getEndDate() { return endDate; } + public long getBytesTransferred() { return bytesTransferred; } + } + + /** + * The auto-update thread. + */ + private final class UploadBandwidthUpdater implements Runnable { + @Override + public void run() { + try { + while (true) { + Thread.sleep(windowSizeMs); + if (!suppressAutoUpdate) { + triggerUpdate(true); + triggerUpdate(false); + } + } + } catch (InterruptedException e) { + } + } + } + + public void close() { + if (uploadBandwidthUpdater != null) { + // Interrupt and join the updater thread in death. + uploadBandwidthUpdater.interrupt(); + try { + uploadBandwidthUpdater.join(); + } catch (InterruptedException e) { + } + uploadBandwidthUpdater = null; + } + } + +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java new file mode 100644 index 00000000000..d33e8c447fe --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ErrorMetricUpdater.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import static java.net.HttpURLConnection.HTTP_NOT_FOUND; //404 +import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; //400 +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; //500 + +import org.apache.hadoop.classification.InterfaceAudience; + +import com.microsoft.windowsazure.storage.OperationContext; +import com.microsoft.windowsazure.storage.RequestResult; +import com.microsoft.windowsazure.storage.ResponseReceivedEvent; +import com.microsoft.windowsazure.storage.StorageEvent; + + +/** + * An event listener to the ResponseReceived event from Azure Storage that will + * update error metrics appropriately when it gets that event. + */ +@InterfaceAudience.Private +public final class ErrorMetricUpdater extends StorageEvent { + private final AzureFileSystemInstrumentation instrumentation; + private final OperationContext operationContext; + + private ErrorMetricUpdater(OperationContext operationContext, + AzureFileSystemInstrumentation instrumentation) { + this.instrumentation = instrumentation; + this.operationContext = operationContext; + } + + /** + * Hooks a new listener to the given operationContext that will update the + * error metrics for the WASB file system appropriately in response to + * ResponseReceived events. + * + * @param operationContext The operationContext to hook. + * @param instrumentation The metrics source to update. + */ + public static void hook( + OperationContext operationContext, + AzureFileSystemInstrumentation instrumentation) { + ErrorMetricUpdater listener = + new ErrorMetricUpdater(operationContext, + instrumentation); + operationContext.getResponseReceivedEventHandler().addListener(listener); + } + + @Override + public void eventOccurred(ResponseReceivedEvent eventArg) { + RequestResult currentResult = operationContext.getLastResult(); + int statusCode = currentResult.getStatusCode(); + // Check if it's a client-side error: a 4xx status + // We exclude 404 because it happens frequently during the normal + // course of operation (each call to exists() would generate that + // if it's not found). + if (statusCode >= HTTP_BAD_REQUEST && statusCode < HTTP_INTERNAL_ERROR + && statusCode != HTTP_NOT_FOUND) { + instrumentation.clientErrorEncountered(); + } else if (statusCode >= HTTP_INTERNAL_ERROR) { + // It's a server error: a 5xx status. Could be an Azure Storage + // bug or (more likely) throttling. + instrumentation.serverErrorEncountered(); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java new file mode 100644 index 00000000000..e3f5d4402b6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/ResponseReceivedMetricUpdater.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import java.net.HttpURLConnection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; + +import com.microsoft.windowsazure.storage.Constants.HeaderConstants; +import com.microsoft.windowsazure.storage.OperationContext; +import com.microsoft.windowsazure.storage.RequestResult; +import com.microsoft.windowsazure.storage.ResponseReceivedEvent; +import com.microsoft.windowsazure.storage.StorageEvent; + + +/** + * An event listener to the ResponseReceived event from Azure Storage that will + * update metrics appropriately. + * + */ +@InterfaceAudience.Private +public final class ResponseReceivedMetricUpdater extends StorageEvent { + + public static final Log LOG = LogFactory.getLog(ResponseReceivedMetricUpdater.class); + + private final AzureFileSystemInstrumentation instrumentation; + private final BandwidthGaugeUpdater blockUploadGaugeUpdater; + + private ResponseReceivedMetricUpdater(OperationContext operationContext, + AzureFileSystemInstrumentation instrumentation, + BandwidthGaugeUpdater blockUploadGaugeUpdater) { + this.instrumentation = instrumentation; + this.blockUploadGaugeUpdater = blockUploadGaugeUpdater; + } + + /** + * Hooks a new listener to the given operationContext that will update the + * metrics for the WASB file system appropriately in response to + * ResponseReceived events. + * + * @param operationContext The operationContext to hook. + * @param instrumentation The metrics source to update. + * @param blockUploadGaugeUpdater The blockUploadGaugeUpdater to use. + */ + public static void hook( + OperationContext operationContext, + AzureFileSystemInstrumentation instrumentation, + BandwidthGaugeUpdater blockUploadGaugeUpdater) { + ResponseReceivedMetricUpdater listener = + new ResponseReceivedMetricUpdater(operationContext, + instrumentation, blockUploadGaugeUpdater); + operationContext.getResponseReceivedEventHandler().addListener(listener); + } + + /** + * Get the content length of the request in the given HTTP connection. + * @param connection The connection. + * @return The content length, or zero if not found. + */ + private long getRequestContentLength(HttpURLConnection connection) { + String lengthString = connection.getRequestProperty( + HeaderConstants.CONTENT_LENGTH); + if (lengthString != null){ + return Long.parseLong(lengthString); + } + else{ + return 0; + } + } + + /** + * Gets the content length of the response in the given HTTP connection. + * @param connection The connection. + * @return The content length. + */ + private long getResponseContentLength(HttpURLConnection connection) { + return connection.getContentLength(); + } + + /** + * Handle the response-received event from Azure SDK. + */ + @Override + public void eventOccurred(ResponseReceivedEvent eventArg) { + instrumentation.webResponse(); + if (!(eventArg.getConnectionObject() instanceof HttpURLConnection)) { + // Typically this shouldn't happen, but just let it pass + return; + } + HttpURLConnection connection = + (HttpURLConnection) eventArg.getConnectionObject(); + RequestResult currentResult = eventArg.getRequestResult(); + if (currentResult == null) { + // Again, typically shouldn't happen, but let it pass + return; + } + + long requestLatency = currentResult.getStopDate().getTime() + - currentResult.getStartDate().getTime(); + + if (currentResult.getStatusCode() == HttpURLConnection.HTTP_CREATED + && connection.getRequestMethod().equalsIgnoreCase("PUT")) { + // If it's a PUT with an HTTP_CREATED status then it's a successful + // block upload. + long length = getRequestContentLength(connection); + if (length > 0) { + blockUploadGaugeUpdater.blockUploaded( + currentResult.getStartDate(), + currentResult.getStopDate(), + length); + instrumentation.rawBytesUploaded(length); + instrumentation.blockUploaded(requestLatency); + } + } else if (currentResult.getStatusCode() == HttpURLConnection.HTTP_PARTIAL + && connection.getRequestMethod().equalsIgnoreCase("GET")) { + // If it's a GET with an HTTP_PARTIAL status then it's a successful + // block download. + long length = getResponseContentLength(connection); + if (length > 0) { + blockUploadGaugeUpdater.blockDownloaded( + currentResult.getStartDate(), + currentResult.getStopDate(), + length); + instrumentation.rawBytesDownloaded(length); + instrumentation.blockDownloaded(requestLatency); + } + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java new file mode 100644 index 00000000000..184907a9e7b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/RollingWindowAverage.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import java.util.ArrayDeque; +import java.util.Date; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Helper class to calculate rolling-window averages. + * Used to calculate rolling-window metrics in AzureNativeFileSystem. + */ +@InterfaceAudience.Private +final class RollingWindowAverage { + private final ArrayDeque currentPoints = + new ArrayDeque(); + private final long windowSizeMs; + + /** + * Create a new rolling-window average for the given window size. + * @param windowSizeMs The size of the window in milliseconds. + */ + public RollingWindowAverage(long windowSizeMs) { + this.windowSizeMs = windowSizeMs; + } + + /** + * Add a new data point that just happened. + * @param value The value of the data point. + */ + public synchronized void addPoint(long value) { + currentPoints.offer(new DataPoint(new Date(), value)); + cleanupOldPoints(); + } + + /** + * Get the current average. + * @return The current average. + */ + public synchronized long getCurrentAverage() { + cleanupOldPoints(); + if (currentPoints.isEmpty()) { + return 0; + } + long sum = 0; + for (DataPoint current : currentPoints) { + sum += current.getValue(); + } + return sum / currentPoints.size(); + } + + /** + * Clean up points that don't count any more (are before our + * rolling window) from our current queue of points. + */ + private void cleanupOldPoints() { + Date cutoffTime = new Date(new Date().getTime() - windowSizeMs); + while (!currentPoints.isEmpty() + && currentPoints.peekFirst().getEventTime().before(cutoffTime)) { + currentPoints.removeFirst(); + } + } + + /** + * A single data point. + */ + private static class DataPoint { + private final Date eventTime; + private final long value; + + public DataPoint(Date eventTime, long value) { + this.eventTime = eventTime; + this.value = value; + } + + public Date getEventTime() { + return eventTime; + } + + public long getValue() { + return value; + } + + + } +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html new file mode 100644 index 00000000000..5e8d6a84693 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/metrics/package.html @@ -0,0 +1,28 @@ + + + + + + +

+Infrastructure for a Metrics2 source that provides information on Windows +Azure Filesystem for Hadoop instances. +

+ + + diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java index 81339541bdd..02738e7efb5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/AzureBlobStorageTestAccount.java @@ -27,9 +27,18 @@ import java.util.Date; import java.util.EnumSet; import java.util.GregorianCalendar; import java.util.TimeZone; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.commons.configuration.SubsetConfiguration; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation; +import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem; +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.MetricsSink; +import org.apache.hadoop.metrics2.MetricsTag; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import com.microsoft.windowsazure.storage.AccessCondition; import com.microsoft.windowsazure.storage.CloudStorageAccount; @@ -76,7 +85,10 @@ public final class AzureBlobStorageTestAccount { private NativeAzureFileSystem fs; private AzureNativeFileSystemStore storage; private MockStorageInterface mockStorage; - + private static final ConcurrentLinkedQueue allMetrics = + new ConcurrentLinkedQueue(); + + private AzureBlobStorageTestAccount(NativeAzureFileSystem fs, CloudStorageAccount account, CloudBlobContainer container) { this.account = account; @@ -124,6 +136,10 @@ public final class AzureBlobStorageTestAccount { this.fs = fs; this.mockStorage = mockStorage; } + + private static void addRecord(MetricsRecord record) { + allMetrics.add(record); + } public static String getMockContainerUri() { return String.format("http://%s/%s", @@ -141,6 +157,47 @@ public final class AzureBlobStorageTestAccount { // Remove the first SEPARATOR return toMockUri(path.toUri().getRawPath().substring(1)); } + + public Number getLatestMetricValue(String metricName, Number defaultValue) + throws IndexOutOfBoundsException{ + boolean found = false; + Number ret = null; + for (MetricsRecord currentRecord : allMetrics) { + // First check if this record is coming for my file system. + if (wasGeneratedByMe(currentRecord)) { + for (AbstractMetric currentMetric : currentRecord.metrics()) { + if (currentMetric.name().equalsIgnoreCase(metricName)) { + found = true; + ret = currentMetric.value(); + break; + } + } + } + } + if (!found) { + if (defaultValue != null) { + return defaultValue; + } + throw new IndexOutOfBoundsException(metricName); + } + return ret; + } + + /** + * Checks if the given record was generated by my WASB file system instance. + * @param currentRecord The metrics record to check. + * @return + */ + private boolean wasGeneratedByMe(MetricsRecord currentRecord) { + String myFsId = fs.getInstrumentation().getFileSystemInstanceId().toString(); + for (MetricsTag currentTag : currentRecord.tags()) { + if (currentTag.name().equalsIgnoreCase("wasbFileSystemId")) { + return currentTag.value().equals(myFsId); + } + } + return false; + } + /** * Gets the blob reference to the given blob key. @@ -236,7 +293,6 @@ public final class AzureBlobStorageTestAccount { public static AzureBlobStorageTestAccount createOutOfBandStore( int uploadBlockSize, int downloadBlockSize) throws Exception { - CloudBlobContainer container = null; Configuration conf = createTestConfiguration(); CloudStorageAccount account = createTestAccount(conf); @@ -262,11 +318,25 @@ public final class AzureBlobStorageTestAccount { // Set account URI and initialize Azure file system. URI accountUri = createAccountUri(accountName, containerName); + // Set up instrumentation. + // + AzureFileSystemMetricsSystem.fileSystemStarted(); + String sourceName = NativeAzureFileSystem.newMetricsSourceName(); + String sourceDesc = "Azure Storage Volume File System metrics"; + + AzureFileSystemInstrumentation instrumentation = + DefaultMetricsSystem.instance().register(sourceName, + sourceDesc, new AzureFileSystemInstrumentation(conf)); + + AzureFileSystemMetricsSystem.registerSource( + sourceName, sourceDesc, instrumentation); + + // Create a new AzureNativeFileSystemStore object. AzureNativeFileSystemStore testStorage = new AzureNativeFileSystemStore(); // Initialize the store with the throttling feedback interfaces. - testStorage.initialize(accountUri, conf); + testStorage.initialize(accountUri, conf, instrumentation); // Create test account initializing the appropriate member variables. AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount( @@ -722,5 +792,20 @@ public final class AzureBlobStorageTestAccount { public MockStorageInterface getMockStorage() { return mockStorage; } + + public static class StandardCollector implements MetricsSink { + @Override + public void init(SubsetConfiguration conf) { + } + + @Override + public void putMetrics(MetricsRecord record) { + addRecord(record); + } + + @Override + public void flush() { + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java index 88d976ce8e9..6e89822028c 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/TestAzureFileSystemErrorConditions.java @@ -33,11 +33,9 @@ import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext; -import org.apache.hadoop.fs.permission.FsPermission; import org.junit.Test; import com.microsoft.windowsazure.storage.OperationContext; diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java new file mode 100644 index 00000000000..12694173c3a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/AzureMetricsTestUtil.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_BYTES_READ; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_BYTES_WRITTEN; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_RAW_BYTES_DOWNLOADED; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_RAW_BYTES_UPLOADED; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_WEB_RESPONSES; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +public final class AzureMetricsTestUtil { + public static long getLongGaugeValue(AzureFileSystemInstrumentation instrumentation, + String gaugeName) { + return getLongGauge(gaugeName, getMetrics(instrumentation)); + } + + /** + * Gets the current value of the given counter. + */ + public static long getLongCounterValue(AzureFileSystemInstrumentation instrumentation, + String counterName) { + return getLongCounter(counterName, getMetrics(instrumentation)); + } + + + /** + * Gets the current value of the wasb_bytes_written_last_second counter. + */ + public static long getCurrentBytesWritten(AzureFileSystemInstrumentation instrumentation) { + return getLongGaugeValue(instrumentation, WASB_BYTES_WRITTEN); + } + + /** + * Gets the current value of the wasb_bytes_read_last_second counter. + */ + public static long getCurrentBytesRead(AzureFileSystemInstrumentation instrumentation) { + return getLongGaugeValue(instrumentation, WASB_BYTES_READ); + } + + /** + * Gets the current value of the wasb_raw_bytes_uploaded counter. + */ + public static long getCurrentTotalBytesWritten( + AzureFileSystemInstrumentation instrumentation) { + return getLongCounterValue(instrumentation, WASB_RAW_BYTES_UPLOADED); + } + + /** + * Gets the current value of the wasb_raw_bytes_downloaded counter. + */ + public static long getCurrentTotalBytesRead( + AzureFileSystemInstrumentation instrumentation) { + return getLongCounterValue(instrumentation, WASB_RAW_BYTES_DOWNLOADED); + } + + /** + * Gets the current value of the asv_web_responses counter. + */ + public static long getCurrentWebResponses( + AzureFileSystemInstrumentation instrumentation) { + return getLongCounter(WASB_WEB_RESPONSES, getMetrics(instrumentation)); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java new file mode 100644 index 00000000000..35004d60d7a --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestAzureFileSystemInstrumentation.java @@ -0,0 +1,546 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_CLIENT_ERRORS; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DIRECTORIES_CREATED; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DOWNLOAD_LATENCY; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DOWNLOAD_RATE; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_FILES_CREATED; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_FILES_DELETED; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_SERVER_ERRORS; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_UPLOAD_LATENCY; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_UPLOAD_RATE; +import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_WEB_RESPONSES; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNotNull; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.verify; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Date; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; +import org.apache.hadoop.fs.azure.AzureException; +import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsTag; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestAzureFileSystemInstrumentation { + private FileSystem fs; + private AzureBlobStorageTestAccount testAccount; + + @Before + public void setUp() throws Exception { + testAccount = AzureBlobStorageTestAccount.create(); + if (testAccount != null) { + fs = testAccount.getFileSystem(); + } + assumeNotNull(testAccount); + } + + @After + public void tearDown() throws Exception { + if (testAccount != null) { + testAccount.cleanup(); + testAccount = null; + fs = null; + } + } + + @Test + public void testMetricTags() throws Exception { + String accountName = + testAccount.getRealAccount().getBlobEndpoint() + .getAuthority(); + String containerName = + testAccount.getRealContainer().getName(); + MetricsRecordBuilder myMetrics = getMyMetrics(); + verify(myMetrics).add(argThat( + new TagMatcher("accountName", accountName) + )); + verify(myMetrics).add(argThat( + new TagMatcher("containerName", containerName) + )); + verify(myMetrics).add(argThat( + new TagMatcher("Context", "azureFileSystem") + )); + verify(myMetrics).add(argThat( + new TagExistsMatcher("wasbFileSystemId") + )); + } + + + @Test + public void testMetricsOnMkdirList() throws Exception { + long base = getBaseWebResponses(); + + // Create a directory + assertTrue(fs.mkdirs(new Path("a"))); + // At the time of writing, it takes 1 request to create the actual directory, + // plus 2 requests per level to check that there's no blob with that name and + // 1 request per level above to create it if it doesn't exist. + // So for the path above (/user//a), it takes 2 requests each to check + // there's no blob called /user, no blob called /user/ and no blob + // called /user//a, and then 3 request for the creation of the three + // levels, and then 2 requests for checking/stamping the version of AS, + // totaling 11. + // Also, there's the initial 1 request for container check so total is 12. + base = assertWebResponsesInRange(base, 1, 12); + assertEquals(1, + AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_DIRECTORIES_CREATED)); + + // List the root contents + assertEquals(1, fs.listStatus(new Path("/")).length); + base = assertWebResponsesEquals(base, 1); + + assertNoErrors(); + } + + private BandwidthGaugeUpdater getBandwidthGaugeUpdater() { + NativeAzureFileSystem azureFs = (NativeAzureFileSystem)fs; + AzureNativeFileSystemStore azureStore = azureFs.getStore(); + return azureStore.getBandwidthGaugeUpdater(); + } + + private static byte[] nonZeroByteArray(int size) { + byte[] data = new byte[size]; + Arrays.fill(data, (byte)5); + return data; + } + + @Test + public void testMetricsOnFileCreateRead() throws Exception { + long base = getBaseWebResponses(); + + assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation())); + + Path filePath = new Path("/metricsTest_webResponses"); + final int FILE_SIZE = 1000; + + // Suppress auto-update of bandwidth metrics so we get + // to update them exactly when we want to. + getBandwidthGaugeUpdater().suppressAutoUpdate(); + + // Create a file + Date start = new Date(); + OutputStream outputStream = fs.create(filePath); + outputStream.write(nonZeroByteArray(FILE_SIZE)); + outputStream.close(); + long uploadDurationMs = new Date().getTime() - start.getTime(); + + // The exact number of requests/responses that happen to create a file + // can vary - at the time of writing this code it takes 10 + // requests/responses for the 1000 byte file (33 for 100 MB), + // plus the initial container-check request but that + // can very easily change in the future. Just assert that we do roughly + // more than 2 but less than 15. + logOpResponseCount("Creating a 1K file", base); + base = assertWebResponsesInRange(base, 2, 15); + getBandwidthGaugeUpdater().triggerUpdate(true); + long bytesWritten = AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation()); + assertTrue("The bytes written in the last second " + bytesWritten + + " is pretty far from the expected range of around " + FILE_SIZE + + " bytes plus a little overhead.", + bytesWritten > (FILE_SIZE / 2) && bytesWritten < (FILE_SIZE * 2)); + long totalBytesWritten = AzureMetricsTestUtil.getCurrentTotalBytesWritten(getInstrumentation()); + assertTrue("The total bytes written " + totalBytesWritten + + " is pretty far from the expected range of around " + FILE_SIZE + + " bytes plus a little overhead.", + totalBytesWritten >= FILE_SIZE && totalBytesWritten < (FILE_SIZE * 2)); + long uploadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_UPLOAD_RATE); + System.out.println("Upload rate: " + uploadRate + " bytes/second."); + long expectedRate = (FILE_SIZE * 1000L) / uploadDurationMs; + assertTrue("The upload rate " + uploadRate + + " is below the expected range of around " + expectedRate + + " bytes/second that the unit test observed. This should never be" + + " the case since the test underestimates the rate by looking at " + + " end-to-end time instead of just block upload time.", + uploadRate >= expectedRate); + long uploadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), + WASB_UPLOAD_LATENCY); + System.out.println("Upload latency: " + uploadLatency); + long expectedLatency = uploadDurationMs; // We're uploading less than a block. + assertTrue("The upload latency " + uploadLatency + + " should be greater than zero now that I've just uploaded a file.", + uploadLatency > 0); + assertTrue("The upload latency " + uploadLatency + + " is more than the expected range of around " + expectedLatency + + " milliseconds that the unit test observed. This should never be" + + " the case since the test overestimates the latency by looking at " + + " end-to-end time instead of just block upload time.", + uploadLatency <= expectedLatency); + + // Read the file + start = new Date(); + InputStream inputStream = fs.open(filePath); + int count = 0; + while (inputStream.read() >= 0) { + count++; + } + inputStream.close(); + long downloadDurationMs = new Date().getTime() - start.getTime(); + assertEquals(FILE_SIZE, count); + + // Again, exact number varies. At the time of writing this code + // it takes 4 request/responses, so just assert a rough range between + // 1 and 10. + logOpResponseCount("Reading a 1K file", base); + base = assertWebResponsesInRange(base, 1, 10); + getBandwidthGaugeUpdater().triggerUpdate(false); + long totalBytesRead = AzureMetricsTestUtil.getCurrentTotalBytesRead(getInstrumentation()); + assertEquals(FILE_SIZE, totalBytesRead); + long bytesRead = AzureMetricsTestUtil.getCurrentBytesRead(getInstrumentation()); + assertTrue("The bytes read in the last second " + bytesRead + + " is pretty far from the expected range of around " + FILE_SIZE + + " bytes plus a little overhead.", + bytesRead > (FILE_SIZE / 2) && bytesRead < (FILE_SIZE * 2)); + long downloadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_DOWNLOAD_RATE); + System.out.println("Download rate: " + downloadRate + " bytes/second."); + expectedRate = (FILE_SIZE * 1000L) / downloadDurationMs; + assertTrue("The download rate " + downloadRate + + " is below the expected range of around " + expectedRate + + " bytes/second that the unit test observed. This should never be" + + " the case since the test underestimates the rate by looking at " + + " end-to-end time instead of just block download time.", + downloadRate >= expectedRate); + long downloadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), + WASB_DOWNLOAD_LATENCY); + System.out.println("Download latency: " + downloadLatency); + expectedLatency = downloadDurationMs; // We're downloading less than a block. + assertTrue("The download latency " + downloadLatency + + " should be greater than zero now that I've just downloaded a file.", + downloadLatency > 0); + assertTrue("The download latency " + downloadLatency + + " is more than the expected range of around " + expectedLatency + + " milliseconds that the unit test observed. This should never be" + + " the case since the test overestimates the latency by looking at " + + " end-to-end time instead of just block download time.", + downloadLatency <= expectedLatency); + + assertNoErrors(); + } + + @Test + public void testMetricsOnBigFileCreateRead() throws Exception { + long base = getBaseWebResponses(); + + assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation())); + + Path filePath = new Path("/metricsTest_webResponses"); + final int FILE_SIZE = 100 * 1024 * 1024; + + // Suppress auto-update of bandwidth metrics so we get + // to update them exactly when we want to. + getBandwidthGaugeUpdater().suppressAutoUpdate(); + + // Create a file + OutputStream outputStream = fs.create(filePath); + outputStream.write(new byte[FILE_SIZE]); + outputStream.close(); + + // The exact number of requests/responses that happen to create a file + // can vary - at the time of writing this code it takes 34 + // requests/responses for the 100 MB file, + // plus the initial container check request, but that + // can very easily change in the future. Just assert that we do roughly + // more than 20 but less than 50. + logOpResponseCount("Creating a 100 MB file", base); + base = assertWebResponsesInRange(base, 20, 50); + getBandwidthGaugeUpdater().triggerUpdate(true); + long totalBytesWritten = AzureMetricsTestUtil.getCurrentTotalBytesWritten(getInstrumentation()); + assertTrue("The total bytes written " + totalBytesWritten + + " is pretty far from the expected range of around " + FILE_SIZE + + " bytes plus a little overhead.", + totalBytesWritten >= FILE_SIZE && totalBytesWritten < (FILE_SIZE * 2)); + long uploadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_UPLOAD_RATE); + System.out.println("Upload rate: " + uploadRate + " bytes/second."); + long uploadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), + WASB_UPLOAD_LATENCY); + System.out.println("Upload latency: " + uploadLatency); + assertTrue("The upload latency " + uploadLatency + + " should be greater than zero now that I've just uploaded a file.", + uploadLatency > 0); + + // Read the file + InputStream inputStream = fs.open(filePath); + int count = 0; + while (inputStream.read() >= 0) { + count++; + } + inputStream.close(); + assertEquals(FILE_SIZE, count); + + // Again, exact number varies. At the time of writing this code + // it takes 27 request/responses, so just assert a rough range between + // 20 and 40. + logOpResponseCount("Reading a 100 MB file", base); + base = assertWebResponsesInRange(base, 20, 40); + getBandwidthGaugeUpdater().triggerUpdate(false); + long totalBytesRead = AzureMetricsTestUtil.getCurrentTotalBytesRead(getInstrumentation()); + assertEquals(FILE_SIZE, totalBytesRead); + long downloadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_DOWNLOAD_RATE); + System.out.println("Download rate: " + downloadRate + " bytes/second."); + long downloadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), + WASB_DOWNLOAD_LATENCY); + System.out.println("Download latency: " + downloadLatency); + assertTrue("The download latency " + downloadLatency + + " should be greater than zero now that I've just downloaded a file.", + downloadLatency > 0); + } + + @Test + public void testMetricsOnFileRename() throws Exception { + long base = getBaseWebResponses(); + + Path originalPath = new Path("/metricsTest_RenameStart"); + Path destinationPath = new Path("/metricsTest_RenameFinal"); + + // Create an empty file + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_CREATED)); + assertTrue(fs.createNewFile(originalPath)); + logOpResponseCount("Creating an empty file", base); + base = assertWebResponsesInRange(base, 2, 20); + assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_CREATED)); + + // Rename the file + assertTrue(fs.rename(originalPath, destinationPath)); + // Varies: at the time of writing this code it takes 7 requests/responses. + logOpResponseCount("Renaming a file", base); + base = assertWebResponsesInRange(base, 2, 15); + + assertNoErrors(); + } + + @Test + public void testMetricsOnFileExistsDelete() throws Exception { + long base = getBaseWebResponses(); + + Path filePath = new Path("/metricsTest_delete"); + + // Check existence + assertFalse(fs.exists(filePath)); + // At the time of writing this code it takes 2 requests/responses to + // check existence, which seems excessive, plus initial request for + // container check. + logOpResponseCount("Checking file existence for non-existent file", base); + base = assertWebResponsesInRange(base, 1, 3); + + // Create an empty file + assertTrue(fs.createNewFile(filePath)); + base = getCurrentWebResponses(); + + // Check existence again + assertTrue(fs.exists(filePath)); + logOpResponseCount("Checking file existence for existent file", base); + base = assertWebResponsesInRange(base, 1, 2); + + // Delete the file + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_DELETED)); + assertTrue(fs.delete(filePath, false)); + // At the time of writing this code it takes 4 requests/responses to + // delete, which seems excessive. Check for range 1-4 for now. + logOpResponseCount("Deleting a file", base); + base = assertWebResponsesInRange(base, 1, 4); + assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_DELETED)); + + assertNoErrors(); + } + + @Test + public void testMetricsOnDirRename() throws Exception { + long base = getBaseWebResponses(); + + Path originalDirName = new Path("/metricsTestDirectory_RenameStart"); + Path innerFileName = new Path(originalDirName, "innerFile"); + Path destDirName = new Path("/metricsTestDirectory_RenameFinal"); + + // Create an empty directory + assertTrue(fs.mkdirs(originalDirName)); + base = getCurrentWebResponses(); + + // Create an inner file + assertTrue(fs.createNewFile(innerFileName)); + base = getCurrentWebResponses(); + + // Rename the directory + assertTrue(fs.rename(originalDirName, destDirName)); + // At the time of writing this code it takes 11 requests/responses + // to rename the directory with one file. Check for range 1-20 for now. + logOpResponseCount("Renaming a directory", base); + base = assertWebResponsesInRange(base, 1, 20); + + assertNoErrors(); + } + + @Test + public void testClientErrorMetrics() throws Exception { + String directoryName = "metricsTestDirectory_ClientError"; + Path directoryPath = new Path("/" + directoryName); + assertTrue(fs.mkdirs(directoryPath)); + String leaseID = testAccount.acquireShortLease(directoryName); + try { + try { + fs.delete(directoryPath, true); + assertTrue("Should've thrown.", false); + } catch (AzureException ex) { + assertTrue("Unexpected exception: " + ex, + ex.getMessage().contains("lease")); + } + assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_CLIENT_ERRORS)); + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_SERVER_ERRORS)); + } finally { + testAccount.releaseLease(leaseID, directoryName); + } + } + + private void logOpResponseCount(String opName, long base) { + System.out.println(opName + " took " + (getCurrentWebResponses() - base) + + " web responses to complete."); + } + + /** + * Gets (and asserts) the value of the wasb_web_responses counter just + * after the creation of the file system object. + */ + private long getBaseWebResponses() { + // The number of requests should start at 0 + return assertWebResponsesEquals(0, 0); + } + + /** + * Gets the current value of the wasb_web_responses counter. + */ + private long getCurrentWebResponses() { + return AzureMetricsTestUtil.getCurrentWebResponses(getInstrumentation()); + } + + /** + * Checks that the wasb_web_responses counter is at the given value. + * @param base The base value (before the operation of interest). + * @param expected The expected value for the operation of interest. + * @return The new base value now. + */ + private long assertWebResponsesEquals(long base, long expected) { + assertCounter(WASB_WEB_RESPONSES, base + expected, getMyMetrics()); + return base + expected; + } + + private void assertNoErrors() { + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_CLIENT_ERRORS)); + assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_SERVER_ERRORS)); + } + + /** + * Checks that the wasb_web_responses counter is in the given range. + * @param base The base value (before the operation of interest). + * @param inclusiveLowerLimit The lower limit for what it should increase by. + * @param inclusiveUpperLimit The upper limit for what it should increase by. + * @return The new base value now. + */ + private long assertWebResponsesInRange(long base, + long inclusiveLowerLimit, + long inclusiveUpperLimit) { + long currentResponses = getCurrentWebResponses(); + long justOperation = currentResponses - base; + assertTrue(String.format( + "Web responses expected in range [%d, %d], but was %d.", + inclusiveLowerLimit, inclusiveUpperLimit, justOperation), + justOperation >= inclusiveLowerLimit && + justOperation <= inclusiveUpperLimit); + return currentResponses; + } + + /** + * Gets the metrics for the file system object. + * @return The metrics record. + */ + private MetricsRecordBuilder getMyMetrics() { + return getMetrics(getInstrumentation()); + } + + private AzureFileSystemInstrumentation getInstrumentation() { + return ((NativeAzureFileSystem)fs).getInstrumentation(); + } + + /** + * A matcher class for asserting that we got a tag with a given + * value. + */ + private static class TagMatcher extends TagExistsMatcher { + private final String tagValue; + + public TagMatcher(String tagName, String tagValue) { + super(tagName); + this.tagValue = tagValue; + } + + @Override + public boolean matches(MetricsTag toMatch) { + return toMatch.value().equals(tagValue); + } + + @Override + public void describeTo(Description desc) { + super.describeTo(desc); + desc.appendText(" with value " + tagValue); + } + } + + /** + * A matcher class for asserting that we got a tag with any value. + */ + private static class TagExistsMatcher extends BaseMatcher { + private final String tagName; + + public TagExistsMatcher(String tagName) { + this.tagName = tagName; + } + + @Override + public boolean matches(Object toMatch) { + MetricsTag asTag = (MetricsTag)toMatch; + return asTag.name().equals(tagName) && matches(asTag); + } + + protected boolean matches(MetricsTag toMatch) { + return true; + } + + @Override + public void describeTo(Description desc) { + desc.appendText("Has tag " + tagName); + } + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestBandwidthGaugeUpdater.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestBandwidthGaugeUpdater.java new file mode 100644 index 00000000000..ef3442227f3 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestBandwidthGaugeUpdater.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeNotNull; + +import java.util.Date; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; +import org.junit.Assume; +import org.junit.Test; + +public class TestBandwidthGaugeUpdater { + @Test + public void testSingleThreaded() throws Exception { + AzureFileSystemInstrumentation instrumentation = + new AzureFileSystemInstrumentation(new Configuration()); + BandwidthGaugeUpdater updater = + new BandwidthGaugeUpdater(instrumentation, 1000, true); + updater.triggerUpdate(true); + assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(instrumentation)); + updater.blockUploaded(new Date(), new Date(), 150); + updater.triggerUpdate(true); + assertEquals(150, AzureMetricsTestUtil.getCurrentBytesWritten(instrumentation)); + updater.blockUploaded(new Date(new Date().getTime() - 10000), + new Date(), 200); + updater.triggerUpdate(true); + long currentBytes = AzureMetricsTestUtil.getCurrentBytesWritten(instrumentation); + assertTrue( + "We expect around (200/10 = 20) bytes written as the gauge value." + + "Got " + currentBytes, + currentBytes > 18 && currentBytes < 22); + updater.close(); + } + + @Test + public void testMultiThreaded() throws Exception { + final AzureFileSystemInstrumentation instrumentation = + new AzureFileSystemInstrumentation(new Configuration()); + final BandwidthGaugeUpdater updater = + new BandwidthGaugeUpdater(instrumentation, 1000, true); + Thread[] threads = new Thread[10]; + for (int i = 0; i < threads.length; i++) { + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + updater.blockDownloaded(new Date(), new Date(), 10); + updater.blockDownloaded(new Date(0), new Date(0), 10); + } + }); + } + for (Thread t : threads) { + t.start(); + } + for (Thread t : threads) { + t.join(); + } + updater.triggerUpdate(false); + assertEquals(10 * threads.length, AzureMetricsTestUtil.getCurrentBytesRead(instrumentation)); + updater.close(); + } + + @Test + public void testFinalizerThreadShutdown() throws Exception { + + // force cleanup of any existing wasb filesystems + System.gc(); + System.runFinalization(); + + int nUpdaterThreadsStart = getWasbThreadCount(); + assertTrue("Existing WASB threads have not been cleared", nUpdaterThreadsStart == 0); + + final int nFilesystemsToSpawn = 10; + AzureBlobStorageTestAccount testAccount = null; + + for(int i = 0; i < nFilesystemsToSpawn; i++){ + testAccount = AzureBlobStorageTestAccount.createMock(); + testAccount.getFileSystem(); + } + + int nUpdaterThreadsAfterSpawn = getWasbThreadCount(); + Assume.assumeTrue("Background threads should have spawned.", nUpdaterThreadsAfterSpawn == 10); + + testAccount = null; //clear the last reachable reference + + // force cleanup + System.gc(); + System.runFinalization(); + + int nUpdaterThreadsAfterCleanup = getWasbThreadCount(); + assertTrue("Finalizers should have reduced the thread count. ", nUpdaterThreadsAfterCleanup == 0 ); + } + + private int getWasbThreadCount() { + int c = 0; + Map stacksStart = Thread.getAllStackTraces(); + for (Thread t : stacksStart.keySet()){ + if(t.getName().equals(BandwidthGaugeUpdater.THREAD_NAME)) + { + c++; + } + } + return c; + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestNativeAzureFileSystemMetricsSystem.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestNativeAzureFileSystemMetricsSystem.java new file mode 100644 index 00000000000..f44613d91d4 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestNativeAzureFileSystemMetricsSystem.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import static org.junit.Assert.*; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount; +import org.apache.hadoop.fs.azure.NativeAzureFileSystem; +import org.junit.*; + +/** + * Tests that the WASB-specific metrics system is working correctly. + */ +public class TestNativeAzureFileSystemMetricsSystem { + private static final String WASB_FILES_CREATED = "wasb_files_created"; + + private static int getFilesCreated(AzureBlobStorageTestAccount testAccount) { + return testAccount.getLatestMetricValue(WASB_FILES_CREATED, 0).intValue(); + } + + /** + * Tests that when we have multiple file systems created/destroyed + * metrics from each are published correctly. + * @throws Exception + */ + @Test + public void testMetricsAcrossFileSystems() + throws Exception { + AzureBlobStorageTestAccount a1, a2, a3; + + a1 = AzureBlobStorageTestAccount.createMock(); + assertEquals(0, getFilesCreated(a1)); + a2 = AzureBlobStorageTestAccount.createMock(); + assertEquals(0, getFilesCreated(a2)); + a1.getFileSystem().create(new Path("/foo")).close(); + a1.getFileSystem().create(new Path("/bar")).close(); + a2.getFileSystem().create(new Path("/baz")).close(); + assertEquals(0, getFilesCreated(a1)); + assertEquals(0, getFilesCreated(a2)); + a1.closeFileSystem(); // Causes the file system to close, which publishes metrics + a2.closeFileSystem(); + + assertEquals(2, getFilesCreated(a1)); + assertEquals(1, getFilesCreated(a2)); + a3 = AzureBlobStorageTestAccount.createMock(); + assertEquals(0, getFilesCreated(a3)); + a3.closeFileSystem(); + assertEquals(0, getFilesCreated(a3)); + } + + + @Test + public void testMetricsSourceNames() { + String name1 = NativeAzureFileSystem.newMetricsSourceName(); + String name2 = NativeAzureFileSystem.newMetricsSourceName(); + assertTrue(name1.startsWith("AzureFileSystemMetrics")); + assertTrue(name2.startsWith("AzureFileSystemMetrics")); + assertTrue(!name1.equals(name2)); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestRollingWindowAverage.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestRollingWindowAverage.java new file mode 100644 index 00000000000..9b1fb8e60e1 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azure/metrics/TestRollingWindowAverage.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azure.metrics; + +import static org.junit.Assert.*; +import org.junit.*; + +public class TestRollingWindowAverage { + /** + * Tests the basic functionality of the class. + */ + @Test + public void testBasicFunctionality() throws Exception { + RollingWindowAverage average = new RollingWindowAverage(100); + assertEquals(0, average.getCurrentAverage()); // Nothing there yet. + average.addPoint(5); + assertEquals(5, average.getCurrentAverage()); // One point in there. + Thread.sleep(50); + average.addPoint(15); + assertEquals(10, average.getCurrentAverage()); // Two points in there. + Thread.sleep(60); + assertEquals(15, average.getCurrentAverage()); // One point retired. + Thread.sleep(50); + assertEquals(0, average.getCurrentAverage()); // Both points retired. + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml index fb2aa20e4ab..7eeff92f33e 100644 --- a/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml +++ b/hadoop-tools/hadoop-azure/src/test/resources/azure-test.xml @@ -26,6 +26,7 @@ org.apache.hadoop.fs.azure.NativeAzureFileSystem +