HADOOP-10728. Metrics system for Windows Azure Storage Filesystem. Contributed by Dexter Bradshaw, Mostafa Elhemali, Xi Fang, Johannes Klein, David Lao, Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys, Alexander Stojanovic, Brian Swan, and Min Wei.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1605187 13f79535-47bb-0310-9956-ffa450edef68
(cherry picked from commit 0d91576ec3)

Conflicts:
	hadoop-common-project/hadoop-common/CHANGES.txt
This commit is contained in:
Chris Nauroth 2014-06-24 20:52:44 +00:00 committed by cnauroth
parent 80d005c66f
commit b928b8c775
23 changed files with 2230 additions and 34 deletions

View File

@ -18,6 +18,11 @@ Release 2.7.0 - UNRELEASED
Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys, Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys,
Alexander Stojanovic, Brian Swan, and Min Wei via cnauroth) Alexander Stojanovic, Brian Swan, and Min Wei via cnauroth)
HADOOP-10728. Metrics system for Windows Azure Storage Filesystem.
(Dexter Bradshaw, Mostafa Elhemali, Xi Fang, Johannes Klein, David Lao,
Mike Liddell, Chuan Liu, Lengning Liu, Ivan Mitic, Michael Rys,
Alexander Stojanovich, Brian Swan, and Min Wei via cnauroth)
IMPROVEMENTS IMPROVEMENTS
HADOOP-11156. DelegateToFileSystem should implement HADOOP-11156. DelegateToFileSystem should implement

View File

@ -1 +1,2 @@
.checkstyle .checkstyle
bin/

View File

@ -12,8 +12,12 @@ Unit tests
============= =============
Most of the tests will run without additional configuration. Most of the tests will run without additional configuration.
For complete testing, configuration in src/test/resources is required: For complete testing, configuration in src/test/resources is required:
src/test/resources/azure-test.xml
src/test/resources/log4j.properties src/test/resources/azure-test.xml -> Defines Azure storage dependencies, including account information
The other files in src/test/resources do not normally need alteration:
log4j.properties -> Test logging setup
hadoop-metrics2-azure-file-system.properties -> used to wire up instrumentation for testing
From command-line From command-line
------------------ ------------------
@ -59,6 +63,12 @@ Enable the Azure emulator tests by setting
fs.azure.test.emulator -> true fs.azure.test.emulator -> true
in src\test\resources\azure-test.xml in src\test\resources\azure-test.xml
Known issues:
Symptom: When running tests for emulator, you see the following failure message
com.microsoft.windowsazure.storage.StorageException: The value for one of the HTTP headers is not in the correct format.
Issue: The emulator can get into a confused state.
Fix: Restart the Azure Emulator. Ensure it is v3.2 or later.
Running tests against live Azure storage Running tests against live Azure storage
------------------------------------------------------------------------- -------------------------------------------------------------------------
In order to run WASB unit tests against a live Azure Storage account, add credentials to In order to run WASB unit tests against a live Azure Storage account, add credentials to
@ -101,4 +111,8 @@ Eclipse:
NOTE: NOTE:
- After any change to the checkstyle rules xml, use window|preferences|checkstyle|{refresh}|OK - After any change to the checkstyle rules xml, use window|preferences|checkstyle|{refresh}|OK
=============
Javadoc
=============
Command-line
> mvn javadoc:javadoc

View File

@ -37,22 +37,6 @@
</properties> </properties>
<build> <build>
<testResources>
<testResource>
<directory>src/test/resources</directory>
<includes>
<include>log4j.properties</include>
</includes>
</testResource>
<testResource>
<directory>src/test/resources</directory>
<includes>
<include>azure-test.xml</include>
</includes>
</testResource>
</testResources>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
@ -198,5 +182,11 @@
<type>test-jar</type> <type>test-jar</type>
</dependency> </dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
</project> </project>

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.fs.azure; package org.apache.hadoop.fs.azure;
import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER; import static org.apache.hadoop.fs.azure.NativeAzureFileSystem.PATH_DELIMITER;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@ -46,6 +45,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobContainerWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlobDirectoryWrapper;
import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper; import org.apache.hadoop.fs.azure.StorageInterface.CloudBlockBlobWrapper;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.BandwidthGaugeUpdater;
import org.apache.hadoop.fs.azure.metrics.ErrorMetricUpdater;
import org.apache.hadoop.fs.azure.metrics.ResponseReceivedMetricUpdater;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
@ -69,8 +72,15 @@ import com.microsoft.windowsazure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.windowsazure.storage.blob.ListBlobItem; import com.microsoft.windowsazure.storage.blob.ListBlobItem;
import com.microsoft.windowsazure.storage.core.Utility; import com.microsoft.windowsazure.storage.core.Utility;
/**
* Core implementation of Windows Azure Filesystem for Hadoop.
* Provides the bridging logic between Hadoop's abstract filesystem and Azure Storage
*
*/
@InterfaceAudience.Private @InterfaceAudience.Private
class AzureNativeFileSystemStore implements NativeFileSystemStore { @VisibleForTesting
public class AzureNativeFileSystemStore implements NativeFileSystemStore {
/** /**
* Configuration knob on whether we do block-level MD5 validation on * Configuration knob on whether we do block-level MD5 validation on
@ -169,6 +179,8 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore {
private boolean isAnonymousCredentials = false; private boolean isAnonymousCredentials = false;
// Set to true if we are connecting using shared access signatures. // Set to true if we are connecting using shared access signatures.
private boolean connectingUsingSAS = false; private boolean connectingUsingSAS = false;
private AzureFileSystemInstrumentation instrumentation;
private BandwidthGaugeUpdater bandwidthGaugeUpdater;
private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer(); private static final JSON PERMISSION_JSON_SERIALIZER = createPermissionJsonSerializer();
private boolean suppressRetryPolicy = false; private boolean suppressRetryPolicy = false;
@ -301,6 +313,11 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore {
this.storageInteractionLayer = storageInteractionLayer; this.storageInteractionLayer = storageInteractionLayer;
} }
@VisibleForTesting
public BandwidthGaugeUpdater getBandwidthGaugeUpdater() {
return bandwidthGaugeUpdater;
}
/** /**
* Check if concurrent reads and writes on the same blob are allowed. * Check if concurrent reads and writes on the same blob are allowed.
* *
@ -325,12 +342,18 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore {
* if URI or job object is null, or invalid scheme. * if URI or job object is null, or invalid scheme.
*/ */
@Override @Override
public void initialize(URI uri, Configuration conf) throws AzureException { public void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws AzureException {
if (null == this.storageInteractionLayer) { if (null == this.storageInteractionLayer) {
this.storageInteractionLayer = new StorageInterfaceImpl(); this.storageInteractionLayer = new StorageInterfaceImpl();
} }
this.instrumentation = instrumentation;
this.bandwidthGaugeUpdater = new BandwidthGaugeUpdater(instrumentation);
if (null == this.storageInteractionLayer) {
this.storageInteractionLayer = new StorageInterfaceImpl();
}
// Check that URI exists. // Check that URI exists.
// //
if (null == uri) { if (null == uri) {
@ -775,7 +798,9 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore {
throw new AzureException(errMsg); throw new AzureException(errMsg);
} }
instrumentation.setAccountName(accountName);
String containerName = getContainerFromAuthority(sessionUri); String containerName = getContainerFromAuthority(sessionUri);
instrumentation.setContainerName(containerName);
// Check whether this is a storage emulator account. // Check whether this is a storage emulator account.
if (isStorageEmulatorAccount(accountName)) { if (isStorageEmulatorAccount(accountName)) {
@ -1522,6 +1547,11 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore {
selfThrottlingWriteFactor); selfThrottlingWriteFactor);
} }
ResponseReceivedMetricUpdater.hook(
operationContext,
instrumentation,
bandwidthGaugeUpdater);
// Bind operation context to receive send request callbacks on this // Bind operation context to receive send request callbacks on this
// operation. // operation.
// If reads concurrent to OOB writes are allowed, the interception will // If reads concurrent to OOB writes are allowed, the interception will
@ -1536,6 +1566,8 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore {
.modifyOperationContext(operationContext); .modifyOperationContext(operationContext);
} }
ErrorMetricUpdater.hook(operationContext, instrumentation);
// Return the operation context. // Return the operation context.
return operationContext; return operationContext;
} }
@ -2218,5 +2250,14 @@ class AzureNativeFileSystemStore implements NativeFileSystemStore {
@Override @Override
public void close() { public void close() {
bandwidthGaugeUpdater.close();
}
// Finalizer to ensure complete shutdown
@Override
protected void finalize() throws Throwable {
LOG.debug("finalize() called");
close();
super.finalize();
} }
} }

View File

@ -31,12 +31,13 @@ import java.util.Date;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BufferedFSInputStream; import org.apache.hadoop.fs.BufferedFSInputStream;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
@ -45,12 +46,14 @@ import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.microsoft.windowsazure.storage.core.Utility; import com.microsoft.windowsazure.storage.core.Utility;
@ -369,7 +372,11 @@ public class NativeAzureFileSystem extends FileSystem {
private AzureNativeFileSystemStore actualStore; private AzureNativeFileSystemStore actualStore;
private Path workingDir; private Path workingDir;
private long blockSize = MAX_AZURE_BLOCK_SIZE; private long blockSize = MAX_AZURE_BLOCK_SIZE;
private AzureFileSystemInstrumentation instrumentation;
private static boolean suppressRetryPolicy = false; private static boolean suppressRetryPolicy = false;
// A counter to create unique (within-process) names for my metrics sources.
private static AtomicInteger metricsSourceNameCounter = new AtomicInteger();
public NativeAzureFileSystem() { public NativeAzureFileSystem() {
// set store in initialize() // set store in initialize()
@ -396,6 +403,20 @@ public class NativeAzureFileSystem extends FileSystem {
suppressRetryPolicy = false; suppressRetryPolicy = false;
} }
/**
* Creates a new metrics source name that's unique within this process.
*/
@VisibleForTesting
public static String newMetricsSourceName() {
int number = metricsSourceNameCounter.incrementAndGet();
final String baseName = "AzureFileSystemMetrics";
if (number == 1) { // No need for a suffix for the first one
return baseName;
} else {
return baseName + number;
}
}
/** /**
* Checks if the given URI scheme is a scheme that's affiliated with the Azure * Checks if the given URI scheme is a scheme that's affiliated with the Azure
* File System. * File System.
@ -459,7 +480,16 @@ public class NativeAzureFileSystem extends FileSystem {
store = createDefaultStore(conf); store = createDefaultStore(conf);
} }
store.initialize(uri, conf); // Make sure the metrics system is available before interacting with Azure
AzureFileSystemMetricsSystem.fileSystemStarted();
String sourceName = newMetricsSourceName(),
sourceDesc = "Azure Storage Volume File System metrics";
instrumentation = DefaultMetricsSystem.instance().register(sourceName,
sourceDesc, new AzureFileSystemInstrumentation(conf));
AzureFileSystemMetricsSystem.registerSource(sourceName, sourceDesc,
instrumentation);
store.initialize(uri, conf, instrumentation);
setConf(conf); setConf(conf);
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser() this.workingDir = new Path("/user", UserGroupInformation.getCurrentUser()
@ -535,10 +565,20 @@ public class NativeAzureFileSystem extends FileSystem {
* @return The store object. * @return The store object.
*/ */
@VisibleForTesting @VisibleForTesting
AzureNativeFileSystemStore getStore() { public AzureNativeFileSystemStore getStore() {
return actualStore; return actualStore;
} }
/**
* Gets the metrics source for this file system.
* This is mainly here for unit testing purposes.
*
* @return the metrics source.
*/
public AzureFileSystemInstrumentation getInstrumentation() {
return instrumentation;
}
/** This optional operation is not yet supported. */ /** This optional operation is not yet supported. */
@Override @Override
public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) public FSDataOutputStream append(Path f, int bufferSize, Progressable progress)
@ -622,6 +662,10 @@ public class NativeAzureFileSystem extends FileSystem {
// Construct the data output stream from the buffered output stream. // Construct the data output stream from the buffered output stream.
FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics); FSDataOutputStream fsOut = new FSDataOutputStream(bufOutStream, statistics);
// Increment the counter
instrumentation.fileCreated();
// Return data output stream to caller. // Return data output stream to caller.
return fsOut; return fsOut;
} }
@ -682,6 +726,7 @@ public class NativeAzureFileSystem extends FileSystem {
store.updateFolderLastModifiedTime(parentKey); store.updateFolderLastModifiedTime(parentKey);
} }
} }
instrumentation.fileDeleted();
store.delete(key); store.delete(key);
} else { } else {
// The path specifies a folder. Recursively delete all entries under the // The path specifies a folder. Recursively delete all entries under the
@ -724,6 +769,7 @@ public class NativeAzureFileSystem extends FileSystem {
p.getKey().lastIndexOf(PATH_DELIMITER)); p.getKey().lastIndexOf(PATH_DELIMITER));
if (!p.isDir()) { if (!p.isDir()) {
store.delete(key + suffix); store.delete(key + suffix);
instrumentation.fileDeleted();
} else { } else {
// Recursively delete contents of the sub-folders. Notice this also // Recursively delete contents of the sub-folders. Notice this also
// deletes the blob for the directory. // deletes the blob for the directory.
@ -740,6 +786,7 @@ public class NativeAzureFileSystem extends FileSystem {
String parentKey = pathToKey(parent); String parentKey = pathToKey(parent);
store.updateFolderLastModifiedTime(parentKey); store.updateFolderLastModifiedTime(parentKey);
} }
instrumentation.directoryDeleted();
} }
// File or directory was successfully deleted. // File or directory was successfully deleted.
@ -972,6 +1019,8 @@ public class NativeAzureFileSystem extends FileSystem {
store.updateFolderLastModifiedTime(key, lastModified); store.updateFolderLastModifiedTime(key, lastModified);
} }
instrumentation.directoryCreated();
// otherwise throws exception // otherwise throws exception
return true; return true;
} }
@ -1293,6 +1342,19 @@ public class NativeAzureFileSystem extends FileSystem {
super.close(); super.close();
// Close the store // Close the store
store.close(); store.close();
// Notify the metrics system that this file system is closed, which may
// trigger one final metrics push to get the accurate final file system
// metrics out.
long startTime = System.currentTimeMillis();
AzureFileSystemMetricsSystem.fileSystemClosed();
if (LOG.isDebugEnabled()) {
LOG.debug("Submitting metrics when file system closed took "
+ (System.currentTimeMillis() - startTime) + " ms.");
}
} }
/** /**

View File

@ -26,6 +26,7 @@ import java.util.Date;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -38,7 +39,7 @@ import com.google.common.annotations.VisibleForTesting;
@InterfaceAudience.Private @InterfaceAudience.Private
interface NativeFileSystemStore { interface NativeFileSystemStore {
void initialize(URI uri, Configuration conf) throws IOException; void initialize(URI uri, Configuration conf, AzureFileSystemInstrumentation instrumentation) throws IOException;
void storeEmptyFolder(String key, PermissionStatus permissionStatus) void storeEmptyFolder(String key, PermissionStatus permissionStatus)
throws AzureException; throws AzureException;

View File

@ -0,0 +1,395 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
/**
* A metrics source for the WASB file system to track all the metrics we care
* about for getting a clear picture of the performance/reliability/interaction
* of the Hadoop cluster with Azure Storage.
*/
@Metrics(about="Metrics for WASB", context="azureFileSystem")
@InterfaceAudience.Public
@InterfaceStability.Evolving
public final class AzureFileSystemInstrumentation implements MetricsSource {
public static final String METRIC_TAG_FILESYSTEM_ID = "wasbFileSystemId";
public static final String METRIC_TAG_ACCOUNT_NAME = "accountName";
public static final String METRIC_TAG_CONTAINTER_NAME = "containerName";
public static final String WASB_WEB_RESPONSES = "wasb_web_responses";
public static final String WASB_BYTES_WRITTEN =
"wasb_bytes_written_last_second";
public static final String WASB_BYTES_READ =
"wasb_bytes_read_last_second";
public static final String WASB_RAW_BYTES_UPLOADED =
"wasb_raw_bytes_uploaded";
public static final String WASB_RAW_BYTES_DOWNLOADED =
"wasb_raw_bytes_downloaded";
public static final String WASB_FILES_CREATED = "wasb_files_created";
public static final String WASB_FILES_DELETED = "wasb_files_deleted";
public static final String WASB_DIRECTORIES_CREATED = "wasb_directories_created";
public static final String WASB_DIRECTORIES_DELETED = "wasb_directories_deleted";
public static final String WASB_UPLOAD_RATE =
"wasb_maximum_upload_bytes_per_second";
public static final String WASB_DOWNLOAD_RATE =
"wasb_maximum_download_bytes_per_second";
public static final String WASB_UPLOAD_LATENCY =
"wasb_average_block_upload_latency_ms";
public static final String WASB_DOWNLOAD_LATENCY =
"wasb_average_block_download_latency_ms";
public static final String WASB_CLIENT_ERRORS = "wasb_client_errors";
public static final String WASB_SERVER_ERRORS = "wasb_server_errors";
/**
* Config key for how big the rolling window size for latency metrics should
* be (in seconds).
*/
private static final String KEY_ROLLING_WINDOW_SIZE = "fs.azure.metrics.rolling.window.size";
private final MetricsRegistry registry =
new MetricsRegistry("azureFileSystem")
.setContext("azureFileSystem");
private final MutableCounterLong numberOfWebResponses =
registry.newCounter(
WASB_WEB_RESPONSES,
"Total number of web responses obtained from Azure Storage",
0L);
private AtomicLong inMemoryNumberOfWebResponses = new AtomicLong(0);
private final MutableCounterLong numberOfFilesCreated =
registry.newCounter(
WASB_FILES_CREATED,
"Total number of files created through the WASB file system.",
0L);
private final MutableCounterLong numberOfFilesDeleted =
registry.newCounter(
WASB_FILES_DELETED,
"Total number of files deleted through the WASB file system.",
0L);
private final MutableCounterLong numberOfDirectoriesCreated =
registry.newCounter(
WASB_DIRECTORIES_CREATED,
"Total number of directories created through the WASB file system.",
0L);
private final MutableCounterLong numberOfDirectoriesDeleted =
registry.newCounter(
WASB_DIRECTORIES_DELETED,
"Total number of directories deleted through the WASB file system.",
0L);
private final MutableGaugeLong bytesWrittenInLastSecond =
registry.newGauge(
WASB_BYTES_WRITTEN,
"Total number of bytes written to Azure Storage during the last second.",
0L);
private final MutableGaugeLong bytesReadInLastSecond =
registry.newGauge(
WASB_BYTES_READ,
"Total number of bytes read from Azure Storage during the last second.",
0L);
private final MutableGaugeLong maximumUploadBytesPerSecond =
registry.newGauge(
WASB_UPLOAD_RATE,
"The maximum upload rate encountered to Azure Storage in bytes/second.",
0L);
private final MutableGaugeLong maximumDownloadBytesPerSecond =
registry.newGauge(
WASB_DOWNLOAD_RATE,
"The maximum download rate encountered to Azure Storage in bytes/second.",
0L);
private final MutableCounterLong rawBytesUploaded =
registry.newCounter(
WASB_RAW_BYTES_UPLOADED,
"Total number of raw bytes (including overhead) uploaded to Azure"
+ " Storage.",
0L);
private final MutableCounterLong rawBytesDownloaded =
registry.newCounter(
WASB_RAW_BYTES_DOWNLOADED,
"Total number of raw bytes (including overhead) downloaded from Azure"
+ " Storage.",
0L);
private final MutableCounterLong clientErrors =
registry.newCounter(
WASB_CLIENT_ERRORS,
"Total number of client-side errors by WASB (excluding 404).",
0L);
private final MutableCounterLong serverErrors =
registry.newCounter(
WASB_SERVER_ERRORS,
"Total number of server-caused errors by WASB.",
0L);
private final MutableGaugeLong averageBlockUploadLatencyMs;
private final MutableGaugeLong averageBlockDownloadLatencyMs;
private long currentMaximumUploadBytesPerSecond;
private long currentMaximumDownloadBytesPerSecond;
private static final int DEFAULT_LATENCY_ROLLING_AVERAGE_WINDOW =
5; // seconds
private final RollingWindowAverage currentBlockUploadLatency;
private final RollingWindowAverage currentBlockDownloadLatency;
private UUID fileSystemInstanceId;
public AzureFileSystemInstrumentation(Configuration conf) {
fileSystemInstanceId = UUID.randomUUID();
registry.tag("wasbFileSystemId",
"A unique identifier for the file ",
fileSystemInstanceId.toString());
final int rollingWindowSizeInSeconds =
conf.getInt(KEY_ROLLING_WINDOW_SIZE,
DEFAULT_LATENCY_ROLLING_AVERAGE_WINDOW);
averageBlockUploadLatencyMs =
registry.newGauge(
WASB_UPLOAD_LATENCY,
String.format("The average latency in milliseconds of uploading a single block"
+ ". The average latency is calculated over a %d-second rolling"
+ " window.", rollingWindowSizeInSeconds),
0L);
averageBlockDownloadLatencyMs =
registry.newGauge(
WASB_DOWNLOAD_LATENCY,
String.format("The average latency in milliseconds of downloading a single block"
+ ". The average latency is calculated over a %d-second rolling"
+ " window.", rollingWindowSizeInSeconds),
0L);
currentBlockUploadLatency =
new RollingWindowAverage(rollingWindowSizeInSeconds * 1000);
currentBlockDownloadLatency =
new RollingWindowAverage(rollingWindowSizeInSeconds * 1000);
}
/**
* The unique identifier for this file system in the metrics.
*/
public UUID getFileSystemInstanceId() {
return fileSystemInstanceId;
}
/**
* Get the metrics registry information.
*/
public MetricsInfo getMetricsRegistryInfo() {
return registry.info();
}
/**
* Sets the account name to tag all the metrics with.
* @param accountName The account name.
*/
public void setAccountName(String accountName) {
registry.tag("accountName",
"Name of the Azure Storage account that these metrics are going against",
accountName);
}
/**
* Sets the container name to tag all the metrics with.
* @param containerName The container name.
*/
public void setContainerName(String containerName) {
registry.tag("containerName",
"Name of the Azure Storage container that these metrics are going against",
containerName);
}
/**
* Indicate that we just got a web response from Azure Storage. This should
* be called for every web request/response we do (to get accurate metrics
* of how we're hitting the storage service).
*/
public void webResponse() {
numberOfWebResponses.incr();
inMemoryNumberOfWebResponses.incrementAndGet();
}
/**
* Gets the current number of web responses obtained from Azure Storage.
* @return The number of web responses.
*/
public long getCurrentWebResponses() {
return inMemoryNumberOfWebResponses.get();
}
/**
* Indicate that we just created a file through WASB.
*/
public void fileCreated() {
numberOfFilesCreated.incr();
}
/**
* Indicate that we just deleted a file through WASB.
*/
public void fileDeleted() {
numberOfFilesDeleted.incr();
}
/**
* Indicate that we just created a directory through WASB.
*/
public void directoryCreated() {
numberOfDirectoriesCreated.incr();
}
/**
* Indicate that we just deleted a directory through WASB.
*/
public void directoryDeleted() {
numberOfDirectoriesDeleted.incr();
}
/**
* Sets the current gauge value for how many bytes were written in the last
* second.
* @param currentBytesWritten The number of bytes.
*/
public void updateBytesWrittenInLastSecond(long currentBytesWritten) {
bytesWrittenInLastSecond.set(currentBytesWritten);
}
/**
* Sets the current gauge value for how many bytes were read in the last
* second.
* @param currentBytesRead The number of bytes.
*/
public void updateBytesReadInLastSecond(long currentBytesRead) {
bytesReadInLastSecond.set(currentBytesRead);
}
/**
* Record the current bytes-per-second upload rate seen.
* @param bytesPerSecond The bytes per second.
*/
public synchronized void currentUploadBytesPerSecond(long bytesPerSecond) {
if (bytesPerSecond > currentMaximumUploadBytesPerSecond) {
currentMaximumUploadBytesPerSecond = bytesPerSecond;
maximumUploadBytesPerSecond.set(bytesPerSecond);
}
}
/**
* Record the current bytes-per-second download rate seen.
* @param bytesPerSecond The bytes per second.
*/
public synchronized void currentDownloadBytesPerSecond(long bytesPerSecond) {
if (bytesPerSecond > currentMaximumDownloadBytesPerSecond) {
currentMaximumDownloadBytesPerSecond = bytesPerSecond;
maximumDownloadBytesPerSecond.set(bytesPerSecond);
}
}
/**
* Indicate that we just uploaded some data to Azure storage.
* @param numberOfBytes The raw number of bytes uploaded (including overhead).
*/
public void rawBytesUploaded(long numberOfBytes) {
rawBytesUploaded.incr(numberOfBytes);
}
/**
* Indicate that we just downloaded some data to Azure storage.
* @param numberOfBytes The raw number of bytes downloaded (including overhead).
*/
public void rawBytesDownloaded(long numberOfBytes) {
rawBytesDownloaded.incr(numberOfBytes);
}
/**
* Indicate that we just uploaded a block and record its latency.
* @param latency The latency in milliseconds.
*/
public void blockUploaded(long latency) {
currentBlockUploadLatency.addPoint(latency);
}
/**
* Indicate that we just downloaded a block and record its latency.
* @param latency The latency in milliseconds.
*/
public void blockDownloaded(long latency) {
currentBlockDownloadLatency.addPoint(latency);
}
/**
* Indicate that we just encountered a client-side error.
*/
public void clientErrorEncountered() {
clientErrors.incr();
}
/**
* Indicate that we just encountered a server-caused error.
*/
public void serverErrorEncountered() {
serverErrors.incr();
}
/**
* Get the current rolling average of the upload latency.
* @return rolling average of upload latency in milliseconds.
*/
public long getBlockUploadLatency() {
return currentBlockUploadLatency.getCurrentAverage();
}
/**
* Get the current rolling average of the download latency.
* @return rolling average of download latency in milliseconds.
*/
public long getBlockDownloadLatency() {
return currentBlockDownloadLatency.getCurrentAverage();
}
/**
* Get the current maximum upload bandwidth.
* @return maximum upload bandwidth in bytes per second.
*/
public long getCurrentMaximumUploadBandwidth() {
return currentMaximumUploadBytesPerSecond;
}
/**
* Get the current maximum download bandwidth.
* @return maximum download bandwidth in bytes per second.
*/
public long getCurrentMaximumDownloadBandwidth() {
return currentMaximumDownloadBytesPerSecond;
}
@Override
public void getMetrics(MetricsCollector builder, boolean all) {
averageBlockDownloadLatencyMs.set(
currentBlockDownloadLatency.getCurrentAverage());
averageBlockUploadLatencyMs.set(
currentBlockUploadLatency.getCurrentAverage());
registry.snapshot(builder.addRecord(registry.info().name()), true);
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
/**
* AzureFileSystemMetricsSystem
*/
@InterfaceAudience.Private
public final class AzureFileSystemMetricsSystem {
private static MetricsSystemImpl instance;
private static int numFileSystems;
//private ctor
private AzureFileSystemMetricsSystem(){
}
public static synchronized void fileSystemStarted() {
if (numFileSystems == 0) {
instance = new MetricsSystemImpl();
instance.init("azure-file-system");
}
numFileSystems++;
}
public static synchronized void fileSystemClosed() {
if (instance != null) {
instance.publishMetricsNow();
}
if (numFileSystems == 1) {
instance.stop();
instance.shutdown();
instance = null;
}
numFileSystems--;
}
public static void registerSource(String name, String desc,
MetricsSource source) {
// Register the source with the name appended with -WasbSystem
// so that the name is globally unique.
instance.register(name + "-WasbSystem", desc, source);
}
}

View File

@ -0,0 +1,289 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import java.util.ArrayList;
import java.util.Date;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Internal implementation class to help calculate the current bytes
* uploaded/downloaded and the maximum bandwidth gauges.
*/
@InterfaceAudience.Private
public final class BandwidthGaugeUpdater {
public static final Log LOG = LogFactory
.getLog(BandwidthGaugeUpdater.class);
public static final String THREAD_NAME = "AzureNativeFilesystemStore-UploadBandwidthUpdater";
private static final int DEFAULT_WINDOW_SIZE_MS = 1000;
private static final int PROCESS_QUEUE_INITIAL_CAPACITY = 1000;
private int windowSizeMs;
private ArrayList<BlockTransferWindow> allBlocksWritten =
createNewToProcessQueue();
private ArrayList<BlockTransferWindow> allBlocksRead =
createNewToProcessQueue();
private final Object blocksWrittenLock = new Object();
private final Object blocksReadLock = new Object();
private final AzureFileSystemInstrumentation instrumentation;
private Thread uploadBandwidthUpdater;
private volatile boolean suppressAutoUpdate = false;
/**
* Create a new updater object with default values.
* @param instrumentation The metrics source to update.
*/
public BandwidthGaugeUpdater(AzureFileSystemInstrumentation instrumentation) {
this(instrumentation, DEFAULT_WINDOW_SIZE_MS, false);
}
/**
* Create a new updater object with some overrides (used in unit tests).
* @param instrumentation The metrics source to update.
* @param windowSizeMs The window size to use for calculating bandwidth
* (in milliseconds).
* @param manualUpdateTrigger If true, then this object won't create the
* auto-update threads, and will wait for manual
* calls to triggerUpdate to occur.
*/
public BandwidthGaugeUpdater(AzureFileSystemInstrumentation instrumentation,
int windowSizeMs, boolean manualUpdateTrigger) {
this.windowSizeMs = windowSizeMs;
this.instrumentation = instrumentation;
if (!manualUpdateTrigger) {
uploadBandwidthUpdater = new Thread(new UploadBandwidthUpdater(), THREAD_NAME);
uploadBandwidthUpdater.setDaemon(true);
uploadBandwidthUpdater.start();
}
}
/**
* Indicate that a block has been uploaded.
* @param startDate The exact time the upload started.
* @param endDate The exact time the upload ended.
* @param length The number of bytes uploaded in the block.
*/
public void blockUploaded(Date startDate, Date endDate, long length) {
synchronized (blocksWrittenLock) {
allBlocksWritten.add(new BlockTransferWindow(startDate, endDate, length));
}
}
/**
* Indicate that a block has been downloaded.
* @param startDate The exact time the download started.
* @param endDate The exact time the download ended.
* @param length The number of bytes downloaded in the block.
*/
public void blockDownloaded(Date startDate, Date endDate, long length) {
synchronized (blocksReadLock) {
allBlocksRead.add(new BlockTransferWindow(startDate, endDate, length));
}
}
/**
* Creates a new ArrayList to hold incoming block transfer notifications
* before they're processed.
* @return The newly created ArrayList.
*/
private static ArrayList<BlockTransferWindow> createNewToProcessQueue() {
return new ArrayList<BlockTransferWindow>(PROCESS_QUEUE_INITIAL_CAPACITY);
}
/**
* Update the metrics source gauge for how many bytes were transferred
* during the last time window.
* @param updateWrite If true, update the write (upload) counter.
* Otherwise update the read (download) counter.
* @param bytes The number of bytes transferred.
*/
private void updateBytesTransferred(boolean updateWrite, long bytes) {
if (updateWrite) {
instrumentation.updateBytesWrittenInLastSecond(bytes);
}
else {
instrumentation.updateBytesReadInLastSecond(bytes);
}
}
/**
* Update the metrics source gauge for what the current transfer rate
* is.
* @param updateWrite If true, update the write (upload) counter.
* Otherwise update the read (download) counter.
* @param bytesPerSecond The number of bytes per second we're seeing.
*/
private void updateBytesTransferRate(boolean updateWrite, long bytesPerSecond) {
if (updateWrite) {
instrumentation.currentUploadBytesPerSecond(bytesPerSecond);
}
else {
instrumentation.currentDownloadBytesPerSecond(bytesPerSecond);
}
}
/**
* For unit test purposes, suppresses auto-update of the metrics
* from the dedicated thread.
*/
public void suppressAutoUpdate() {
suppressAutoUpdate = true;
}
/**
* Resumes auto-update (undo suppressAutoUpdate).
*/
public void resumeAutoUpdate() {
suppressAutoUpdate = false;
}
/**
* Triggers the update of the metrics gauge based on all the blocks
* uploaded/downloaded so far. This is typically done periodically in a
* dedicated update thread, but exposing as public for unit test purposes.
*
* @param updateWrite If true, we'll update the write (upload) metrics.
* Otherwise we'll update the read (download) ones.
*/
public void triggerUpdate(boolean updateWrite) {
ArrayList<BlockTransferWindow> toProcess = null;
synchronized (updateWrite ? blocksWrittenLock : blocksReadLock) {
if (updateWrite && !allBlocksWritten.isEmpty()) {
toProcess = allBlocksWritten;
allBlocksWritten = createNewToProcessQueue();
} else if (!updateWrite && !allBlocksRead.isEmpty()) {
toProcess = allBlocksRead;
allBlocksRead = createNewToProcessQueue();
}
}
// Check to see if we have any blocks to process.
if (toProcess == null) {
// Nothing to process, set the current bytes and rate to zero.
updateBytesTransferred(updateWrite, 0);
updateBytesTransferRate(updateWrite, 0);
return;
}
// The cut-off time for when we want to calculate rates is one
// window size ago from now.
long cutoffTime = new Date().getTime() - windowSizeMs;
// Go through all the blocks we're processing, and calculate the
// total number of bytes processed as well as the maximum transfer
// rate we experienced for any single block during our time window.
long maxSingleBlockTransferRate = 0;
long bytesInLastSecond = 0;
for (BlockTransferWindow currentWindow : toProcess) {
long windowDuration = currentWindow.getEndDate().getTime()
- currentWindow.getStartDate().getTime();
if (windowDuration == 0) {
// Edge case, assume it took 1 ms but we were too fast
windowDuration = 1;
}
if (currentWindow.getStartDate().getTime() > cutoffTime) {
// This block was transferred fully within our time window,
// just add its bytes to the total.
bytesInLastSecond += currentWindow.bytesTransferred;
} else if (currentWindow.getEndDate().getTime() > cutoffTime) {
// This block started its transfer before our time window,
// interpolate to estimate how many bytes from that block
// were actually transferred during our time window.
long adjustedBytes = (currentWindow.getBytesTransferred()
* (currentWindow.getEndDate().getTime() - cutoffTime))
/ windowDuration;
bytesInLastSecond += adjustedBytes;
}
// Calculate the transfer rate for this block.
long currentBlockTransferRate =
(currentWindow.getBytesTransferred() * 1000) / windowDuration;
maxSingleBlockTransferRate =
Math.max(maxSingleBlockTransferRate, currentBlockTransferRate);
}
updateBytesTransferred(updateWrite, bytesInLastSecond);
// The transfer rate we saw in the last second is a tricky concept to
// define: If we saw two blocks, one 2 MB block transferred in 0.2 seconds,
// and one 4 MB block transferred in 0.2 seconds, then the maximum rate
// is 20 MB/s (the 4 MB block), the average of the two blocks is 15 MB/s,
// and the aggregate rate is 6 MB/s (total of 6 MB transferred in one
// second). As a first cut, I'm taking the definition to be the maximum
// of aggregate or of any single block's rate (so in the example case it's
// 6 MB/s).
long aggregateTransferRate = bytesInLastSecond;
long maxObservedTransferRate =
Math.max(aggregateTransferRate, maxSingleBlockTransferRate);
updateBytesTransferRate(updateWrite, maxObservedTransferRate);
}
/**
* A single block transfer.
*/
private static final class BlockTransferWindow {
private final Date startDate;
private final Date endDate;
private final long bytesTransferred;
public BlockTransferWindow(Date startDate, Date endDate,
long bytesTransferred) {
this.startDate = startDate;
this.endDate = endDate;
this.bytesTransferred = bytesTransferred;
}
public Date getStartDate() { return startDate; }
public Date getEndDate() { return endDate; }
public long getBytesTransferred() { return bytesTransferred; }
}
/**
* The auto-update thread.
*/
private final class UploadBandwidthUpdater implements Runnable {
@Override
public void run() {
try {
while (true) {
Thread.sleep(windowSizeMs);
if (!suppressAutoUpdate) {
triggerUpdate(true);
triggerUpdate(false);
}
}
} catch (InterruptedException e) {
}
}
}
public void close() {
if (uploadBandwidthUpdater != null) {
// Interrupt and join the updater thread in death.
uploadBandwidthUpdater.interrupt();
try {
uploadBandwidthUpdater.join();
} catch (InterruptedException e) {
}
uploadBandwidthUpdater = null;
}
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND; //404
import static java.net.HttpURLConnection.HTTP_BAD_REQUEST; //400
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; //500
import org.apache.hadoop.classification.InterfaceAudience;
import com.microsoft.windowsazure.storage.OperationContext;
import com.microsoft.windowsazure.storage.RequestResult;
import com.microsoft.windowsazure.storage.ResponseReceivedEvent;
import com.microsoft.windowsazure.storage.StorageEvent;
/**
* An event listener to the ResponseReceived event from Azure Storage that will
* update error metrics appropriately when it gets that event.
*/
@InterfaceAudience.Private
public final class ErrorMetricUpdater extends StorageEvent<ResponseReceivedEvent> {
private final AzureFileSystemInstrumentation instrumentation;
private final OperationContext operationContext;
private ErrorMetricUpdater(OperationContext operationContext,
AzureFileSystemInstrumentation instrumentation) {
this.instrumentation = instrumentation;
this.operationContext = operationContext;
}
/**
* Hooks a new listener to the given operationContext that will update the
* error metrics for the WASB file system appropriately in response to
* ResponseReceived events.
*
* @param operationContext The operationContext to hook.
* @param instrumentation The metrics source to update.
*/
public static void hook(
OperationContext operationContext,
AzureFileSystemInstrumentation instrumentation) {
ErrorMetricUpdater listener =
new ErrorMetricUpdater(operationContext,
instrumentation);
operationContext.getResponseReceivedEventHandler().addListener(listener);
}
@Override
public void eventOccurred(ResponseReceivedEvent eventArg) {
RequestResult currentResult = operationContext.getLastResult();
int statusCode = currentResult.getStatusCode();
// Check if it's a client-side error: a 4xx status
// We exclude 404 because it happens frequently during the normal
// course of operation (each call to exists() would generate that
// if it's not found).
if (statusCode >= HTTP_BAD_REQUEST && statusCode < HTTP_INTERNAL_ERROR
&& statusCode != HTTP_NOT_FOUND) {
instrumentation.clientErrorEncountered();
} else if (statusCode >= HTTP_INTERNAL_ERROR) {
// It's a server error: a 5xx status. Could be an Azure Storage
// bug or (more likely) throttling.
instrumentation.serverErrorEncountered();
}
}
}

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import java.net.HttpURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import com.microsoft.windowsazure.storage.Constants.HeaderConstants;
import com.microsoft.windowsazure.storage.OperationContext;
import com.microsoft.windowsazure.storage.RequestResult;
import com.microsoft.windowsazure.storage.ResponseReceivedEvent;
import com.microsoft.windowsazure.storage.StorageEvent;
/**
* An event listener to the ResponseReceived event from Azure Storage that will
* update metrics appropriately.
*
*/
@InterfaceAudience.Private
public final class ResponseReceivedMetricUpdater extends StorageEvent<ResponseReceivedEvent> {
public static final Log LOG = LogFactory.getLog(ResponseReceivedMetricUpdater.class);
private final AzureFileSystemInstrumentation instrumentation;
private final BandwidthGaugeUpdater blockUploadGaugeUpdater;
private ResponseReceivedMetricUpdater(OperationContext operationContext,
AzureFileSystemInstrumentation instrumentation,
BandwidthGaugeUpdater blockUploadGaugeUpdater) {
this.instrumentation = instrumentation;
this.blockUploadGaugeUpdater = blockUploadGaugeUpdater;
}
/**
* Hooks a new listener to the given operationContext that will update the
* metrics for the WASB file system appropriately in response to
* ResponseReceived events.
*
* @param operationContext The operationContext to hook.
* @param instrumentation The metrics source to update.
* @param blockUploadGaugeUpdater The blockUploadGaugeUpdater to use.
*/
public static void hook(
OperationContext operationContext,
AzureFileSystemInstrumentation instrumentation,
BandwidthGaugeUpdater blockUploadGaugeUpdater) {
ResponseReceivedMetricUpdater listener =
new ResponseReceivedMetricUpdater(operationContext,
instrumentation, blockUploadGaugeUpdater);
operationContext.getResponseReceivedEventHandler().addListener(listener);
}
/**
* Get the content length of the request in the given HTTP connection.
* @param connection The connection.
* @return The content length, or zero if not found.
*/
private long getRequestContentLength(HttpURLConnection connection) {
String lengthString = connection.getRequestProperty(
HeaderConstants.CONTENT_LENGTH);
if (lengthString != null){
return Long.parseLong(lengthString);
}
else{
return 0;
}
}
/**
* Gets the content length of the response in the given HTTP connection.
* @param connection The connection.
* @return The content length.
*/
private long getResponseContentLength(HttpURLConnection connection) {
return connection.getContentLength();
}
/**
* Handle the response-received event from Azure SDK.
*/
@Override
public void eventOccurred(ResponseReceivedEvent eventArg) {
instrumentation.webResponse();
if (!(eventArg.getConnectionObject() instanceof HttpURLConnection)) {
// Typically this shouldn't happen, but just let it pass
return;
}
HttpURLConnection connection =
(HttpURLConnection) eventArg.getConnectionObject();
RequestResult currentResult = eventArg.getRequestResult();
if (currentResult == null) {
// Again, typically shouldn't happen, but let it pass
return;
}
long requestLatency = currentResult.getStopDate().getTime()
- currentResult.getStartDate().getTime();
if (currentResult.getStatusCode() == HttpURLConnection.HTTP_CREATED
&& connection.getRequestMethod().equalsIgnoreCase("PUT")) {
// If it's a PUT with an HTTP_CREATED status then it's a successful
// block upload.
long length = getRequestContentLength(connection);
if (length > 0) {
blockUploadGaugeUpdater.blockUploaded(
currentResult.getStartDate(),
currentResult.getStopDate(),
length);
instrumentation.rawBytesUploaded(length);
instrumentation.blockUploaded(requestLatency);
}
} else if (currentResult.getStatusCode() == HttpURLConnection.HTTP_PARTIAL
&& connection.getRequestMethod().equalsIgnoreCase("GET")) {
// If it's a GET with an HTTP_PARTIAL status then it's a successful
// block download.
long length = getResponseContentLength(connection);
if (length > 0) {
blockUploadGaugeUpdater.blockDownloaded(
currentResult.getStartDate(),
currentResult.getStopDate(),
length);
instrumentation.rawBytesDownloaded(length);
instrumentation.blockDownloaded(requestLatency);
}
}
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import java.util.ArrayDeque;
import java.util.Date;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Helper class to calculate rolling-window averages.
* Used to calculate rolling-window metrics in AzureNativeFileSystem.
*/
@InterfaceAudience.Private
final class RollingWindowAverage {
private final ArrayDeque<DataPoint> currentPoints =
new ArrayDeque<DataPoint>();
private final long windowSizeMs;
/**
* Create a new rolling-window average for the given window size.
* @param windowSizeMs The size of the window in milliseconds.
*/
public RollingWindowAverage(long windowSizeMs) {
this.windowSizeMs = windowSizeMs;
}
/**
* Add a new data point that just happened.
* @param value The value of the data point.
*/
public synchronized void addPoint(long value) {
currentPoints.offer(new DataPoint(new Date(), value));
cleanupOldPoints();
}
/**
* Get the current average.
* @return The current average.
*/
public synchronized long getCurrentAverage() {
cleanupOldPoints();
if (currentPoints.isEmpty()) {
return 0;
}
long sum = 0;
for (DataPoint current : currentPoints) {
sum += current.getValue();
}
return sum / currentPoints.size();
}
/**
* Clean up points that don't count any more (are before our
* rolling window) from our current queue of points.
*/
private void cleanupOldPoints() {
Date cutoffTime = new Date(new Date().getTime() - windowSizeMs);
while (!currentPoints.isEmpty()
&& currentPoints.peekFirst().getEventTime().before(cutoffTime)) {
currentPoints.removeFirst();
}
}
/**
* A single data point.
*/
private static class DataPoint {
private final Date eventTime;
private final long value;
public DataPoint(Date eventTime, long value) {
this.eventTime = eventTime;
this.value = value;
}
public Date getEventTime() {
return eventTime;
}
public long getValue() {
return value;
}
}
}

View File

@ -0,0 +1,28 @@
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<body>
<p>
Infrastructure for a Metrics2 source that provides information on Windows
Azure Filesystem for Hadoop instances.
</p>
</body>
</html>

View File

@ -27,9 +27,18 @@ import java.util.Date;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.GregorianCalendar; import java.util.GregorianCalendar;
import java.util.TimeZone; import java.util.TimeZone;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.commons.configuration.SubsetConfiguration;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation;
import org.apache.hadoop.fs.azure.metrics.AzureFileSystemMetricsSystem;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import com.microsoft.windowsazure.storage.AccessCondition; import com.microsoft.windowsazure.storage.AccessCondition;
import com.microsoft.windowsazure.storage.CloudStorageAccount; import com.microsoft.windowsazure.storage.CloudStorageAccount;
@ -76,6 +85,9 @@ public final class AzureBlobStorageTestAccount {
private NativeAzureFileSystem fs; private NativeAzureFileSystem fs;
private AzureNativeFileSystemStore storage; private AzureNativeFileSystemStore storage;
private MockStorageInterface mockStorage; private MockStorageInterface mockStorage;
private static final ConcurrentLinkedQueue<MetricsRecord> allMetrics =
new ConcurrentLinkedQueue<MetricsRecord>();
private AzureBlobStorageTestAccount(NativeAzureFileSystem fs, private AzureBlobStorageTestAccount(NativeAzureFileSystem fs,
CloudStorageAccount account, CloudBlobContainer container) { CloudStorageAccount account, CloudBlobContainer container) {
@ -125,6 +137,10 @@ public final class AzureBlobStorageTestAccount {
this.mockStorage = mockStorage; this.mockStorage = mockStorage;
} }
private static void addRecord(MetricsRecord record) {
allMetrics.add(record);
}
public static String getMockContainerUri() { public static String getMockContainerUri() {
return String.format("http://%s/%s", return String.format("http://%s/%s",
AzureBlobStorageTestAccount.MOCK_ACCOUNT_NAME, AzureBlobStorageTestAccount.MOCK_ACCOUNT_NAME,
@ -142,6 +158,47 @@ public final class AzureBlobStorageTestAccount {
return toMockUri(path.toUri().getRawPath().substring(1)); return toMockUri(path.toUri().getRawPath().substring(1));
} }
public Number getLatestMetricValue(String metricName, Number defaultValue)
throws IndexOutOfBoundsException{
boolean found = false;
Number ret = null;
for (MetricsRecord currentRecord : allMetrics) {
// First check if this record is coming for my file system.
if (wasGeneratedByMe(currentRecord)) {
for (AbstractMetric currentMetric : currentRecord.metrics()) {
if (currentMetric.name().equalsIgnoreCase(metricName)) {
found = true;
ret = currentMetric.value();
break;
}
}
}
}
if (!found) {
if (defaultValue != null) {
return defaultValue;
}
throw new IndexOutOfBoundsException(metricName);
}
return ret;
}
/**
* Checks if the given record was generated by my WASB file system instance.
* @param currentRecord The metrics record to check.
* @return
*/
private boolean wasGeneratedByMe(MetricsRecord currentRecord) {
String myFsId = fs.getInstrumentation().getFileSystemInstanceId().toString();
for (MetricsTag currentTag : currentRecord.tags()) {
if (currentTag.name().equalsIgnoreCase("wasbFileSystemId")) {
return currentTag.value().equals(myFsId);
}
}
return false;
}
/** /**
* Gets the blob reference to the given blob key. * Gets the blob reference to the given blob key.
* *
@ -236,7 +293,6 @@ public final class AzureBlobStorageTestAccount {
public static AzureBlobStorageTestAccount createOutOfBandStore( public static AzureBlobStorageTestAccount createOutOfBandStore(
int uploadBlockSize, int downloadBlockSize) throws Exception { int uploadBlockSize, int downloadBlockSize) throws Exception {
CloudBlobContainer container = null; CloudBlobContainer container = null;
Configuration conf = createTestConfiguration(); Configuration conf = createTestConfiguration();
CloudStorageAccount account = createTestAccount(conf); CloudStorageAccount account = createTestAccount(conf);
@ -262,11 +318,25 @@ public final class AzureBlobStorageTestAccount {
// Set account URI and initialize Azure file system. // Set account URI and initialize Azure file system.
URI accountUri = createAccountUri(accountName, containerName); URI accountUri = createAccountUri(accountName, containerName);
// Set up instrumentation.
//
AzureFileSystemMetricsSystem.fileSystemStarted();
String sourceName = NativeAzureFileSystem.newMetricsSourceName();
String sourceDesc = "Azure Storage Volume File System metrics";
AzureFileSystemInstrumentation instrumentation =
DefaultMetricsSystem.instance().register(sourceName,
sourceDesc, new AzureFileSystemInstrumentation(conf));
AzureFileSystemMetricsSystem.registerSource(
sourceName, sourceDesc, instrumentation);
// Create a new AzureNativeFileSystemStore object. // Create a new AzureNativeFileSystemStore object.
AzureNativeFileSystemStore testStorage = new AzureNativeFileSystemStore(); AzureNativeFileSystemStore testStorage = new AzureNativeFileSystemStore();
// Initialize the store with the throttling feedback interfaces. // Initialize the store with the throttling feedback interfaces.
testStorage.initialize(accountUri, conf); testStorage.initialize(accountUri, conf, instrumentation);
// Create test account initializing the appropriate member variables. // Create test account initializing the appropriate member variables.
AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount( AzureBlobStorageTestAccount testAcct = new AzureBlobStorageTestAccount(
@ -723,4 +793,19 @@ public final class AzureBlobStorageTestAccount {
return mockStorage; return mockStorage;
} }
public static class StandardCollector implements MetricsSink {
@Override
public void init(SubsetConfiguration conf) {
}
@Override
public void putMetrics(MetricsRecord record) {
addRecord(record);
}
@Override
public void flush() {
}
}
} }

View File

@ -33,11 +33,9 @@ import java.util.HashMap;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext; import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore.TestHookOperationContext;
import org.apache.hadoop.fs.permission.FsPermission;
import org.junit.Test; import org.junit.Test;
import com.microsoft.windowsazure.storage.OperationContext; import com.microsoft.windowsazure.storage.OperationContext;

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_BYTES_READ;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_BYTES_WRITTEN;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_RAW_BYTES_DOWNLOADED;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_RAW_BYTES_UPLOADED;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_WEB_RESPONSES;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getLongGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
public final class AzureMetricsTestUtil {
public static long getLongGaugeValue(AzureFileSystemInstrumentation instrumentation,
String gaugeName) {
return getLongGauge(gaugeName, getMetrics(instrumentation));
}
/**
* Gets the current value of the given counter.
*/
public static long getLongCounterValue(AzureFileSystemInstrumentation instrumentation,
String counterName) {
return getLongCounter(counterName, getMetrics(instrumentation));
}
/**
* Gets the current value of the wasb_bytes_written_last_second counter.
*/
public static long getCurrentBytesWritten(AzureFileSystemInstrumentation instrumentation) {
return getLongGaugeValue(instrumentation, WASB_BYTES_WRITTEN);
}
/**
* Gets the current value of the wasb_bytes_read_last_second counter.
*/
public static long getCurrentBytesRead(AzureFileSystemInstrumentation instrumentation) {
return getLongGaugeValue(instrumentation, WASB_BYTES_READ);
}
/**
* Gets the current value of the wasb_raw_bytes_uploaded counter.
*/
public static long getCurrentTotalBytesWritten(
AzureFileSystemInstrumentation instrumentation) {
return getLongCounterValue(instrumentation, WASB_RAW_BYTES_UPLOADED);
}
/**
* Gets the current value of the wasb_raw_bytes_downloaded counter.
*/
public static long getCurrentTotalBytesRead(
AzureFileSystemInstrumentation instrumentation) {
return getLongCounterValue(instrumentation, WASB_RAW_BYTES_DOWNLOADED);
}
/**
* Gets the current value of the asv_web_responses counter.
*/
public static long getCurrentWebResponses(
AzureFileSystemInstrumentation instrumentation) {
return getLongCounter(WASB_WEB_RESPONSES, getMetrics(instrumentation));
}
}

View File

@ -0,0 +1,546 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_CLIENT_ERRORS;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DIRECTORIES_CREATED;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DOWNLOAD_LATENCY;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_DOWNLOAD_RATE;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_FILES_CREATED;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_FILES_DELETED;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_SERVER_ERRORS;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_UPLOAD_LATENCY;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_UPLOAD_RATE;
import static org.apache.hadoop.fs.azure.metrics.AzureFileSystemInstrumentation.WASB_WEB_RESPONSES;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeNotNull;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.verify;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.Date;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
import org.apache.hadoop.fs.azure.AzureException;
import org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsTag;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestAzureFileSystemInstrumentation {
private FileSystem fs;
private AzureBlobStorageTestAccount testAccount;
@Before
public void setUp() throws Exception {
testAccount = AzureBlobStorageTestAccount.create();
if (testAccount != null) {
fs = testAccount.getFileSystem();
}
assumeNotNull(testAccount);
}
@After
public void tearDown() throws Exception {
if (testAccount != null) {
testAccount.cleanup();
testAccount = null;
fs = null;
}
}
@Test
public void testMetricTags() throws Exception {
String accountName =
testAccount.getRealAccount().getBlobEndpoint()
.getAuthority();
String containerName =
testAccount.getRealContainer().getName();
MetricsRecordBuilder myMetrics = getMyMetrics();
verify(myMetrics).add(argThat(
new TagMatcher("accountName", accountName)
));
verify(myMetrics).add(argThat(
new TagMatcher("containerName", containerName)
));
verify(myMetrics).add(argThat(
new TagMatcher("Context", "azureFileSystem")
));
verify(myMetrics).add(argThat(
new TagExistsMatcher("wasbFileSystemId")
));
}
@Test
public void testMetricsOnMkdirList() throws Exception {
long base = getBaseWebResponses();
// Create a directory
assertTrue(fs.mkdirs(new Path("a")));
// At the time of writing, it takes 1 request to create the actual directory,
// plus 2 requests per level to check that there's no blob with that name and
// 1 request per level above to create it if it doesn't exist.
// So for the path above (/user/<name>/a), it takes 2 requests each to check
// there's no blob called /user, no blob called /user/<name> and no blob
// called /user/<name>/a, and then 3 request for the creation of the three
// levels, and then 2 requests for checking/stamping the version of AS,
// totaling 11.
// Also, there's the initial 1 request for container check so total is 12.
base = assertWebResponsesInRange(base, 1, 12);
assertEquals(1,
AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_DIRECTORIES_CREATED));
// List the root contents
assertEquals(1, fs.listStatus(new Path("/")).length);
base = assertWebResponsesEquals(base, 1);
assertNoErrors();
}
private BandwidthGaugeUpdater getBandwidthGaugeUpdater() {
NativeAzureFileSystem azureFs = (NativeAzureFileSystem)fs;
AzureNativeFileSystemStore azureStore = azureFs.getStore();
return azureStore.getBandwidthGaugeUpdater();
}
private static byte[] nonZeroByteArray(int size) {
byte[] data = new byte[size];
Arrays.fill(data, (byte)5);
return data;
}
@Test
public void testMetricsOnFileCreateRead() throws Exception {
long base = getBaseWebResponses();
assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation()));
Path filePath = new Path("/metricsTest_webResponses");
final int FILE_SIZE = 1000;
// Suppress auto-update of bandwidth metrics so we get
// to update them exactly when we want to.
getBandwidthGaugeUpdater().suppressAutoUpdate();
// Create a file
Date start = new Date();
OutputStream outputStream = fs.create(filePath);
outputStream.write(nonZeroByteArray(FILE_SIZE));
outputStream.close();
long uploadDurationMs = new Date().getTime() - start.getTime();
// The exact number of requests/responses that happen to create a file
// can vary - at the time of writing this code it takes 10
// requests/responses for the 1000 byte file (33 for 100 MB),
// plus the initial container-check request but that
// can very easily change in the future. Just assert that we do roughly
// more than 2 but less than 15.
logOpResponseCount("Creating a 1K file", base);
base = assertWebResponsesInRange(base, 2, 15);
getBandwidthGaugeUpdater().triggerUpdate(true);
long bytesWritten = AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation());
assertTrue("The bytes written in the last second " + bytesWritten +
" is pretty far from the expected range of around " + FILE_SIZE +
" bytes plus a little overhead.",
bytesWritten > (FILE_SIZE / 2) && bytesWritten < (FILE_SIZE * 2));
long totalBytesWritten = AzureMetricsTestUtil.getCurrentTotalBytesWritten(getInstrumentation());
assertTrue("The total bytes written " + totalBytesWritten +
" is pretty far from the expected range of around " + FILE_SIZE +
" bytes plus a little overhead.",
totalBytesWritten >= FILE_SIZE && totalBytesWritten < (FILE_SIZE * 2));
long uploadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_UPLOAD_RATE);
System.out.println("Upload rate: " + uploadRate + " bytes/second.");
long expectedRate = (FILE_SIZE * 1000L) / uploadDurationMs;
assertTrue("The upload rate " + uploadRate +
" is below the expected range of around " + expectedRate +
" bytes/second that the unit test observed. This should never be" +
" the case since the test underestimates the rate by looking at " +
" end-to-end time instead of just block upload time.",
uploadRate >= expectedRate);
long uploadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(),
WASB_UPLOAD_LATENCY);
System.out.println("Upload latency: " + uploadLatency);
long expectedLatency = uploadDurationMs; // We're uploading less than a block.
assertTrue("The upload latency " + uploadLatency +
" should be greater than zero now that I've just uploaded a file.",
uploadLatency > 0);
assertTrue("The upload latency " + uploadLatency +
" is more than the expected range of around " + expectedLatency +
" milliseconds that the unit test observed. This should never be" +
" the case since the test overestimates the latency by looking at " +
" end-to-end time instead of just block upload time.",
uploadLatency <= expectedLatency);
// Read the file
start = new Date();
InputStream inputStream = fs.open(filePath);
int count = 0;
while (inputStream.read() >= 0) {
count++;
}
inputStream.close();
long downloadDurationMs = new Date().getTime() - start.getTime();
assertEquals(FILE_SIZE, count);
// Again, exact number varies. At the time of writing this code
// it takes 4 request/responses, so just assert a rough range between
// 1 and 10.
logOpResponseCount("Reading a 1K file", base);
base = assertWebResponsesInRange(base, 1, 10);
getBandwidthGaugeUpdater().triggerUpdate(false);
long totalBytesRead = AzureMetricsTestUtil.getCurrentTotalBytesRead(getInstrumentation());
assertEquals(FILE_SIZE, totalBytesRead);
long bytesRead = AzureMetricsTestUtil.getCurrentBytesRead(getInstrumentation());
assertTrue("The bytes read in the last second " + bytesRead +
" is pretty far from the expected range of around " + FILE_SIZE +
" bytes plus a little overhead.",
bytesRead > (FILE_SIZE / 2) && bytesRead < (FILE_SIZE * 2));
long downloadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_DOWNLOAD_RATE);
System.out.println("Download rate: " + downloadRate + " bytes/second.");
expectedRate = (FILE_SIZE * 1000L) / downloadDurationMs;
assertTrue("The download rate " + downloadRate +
" is below the expected range of around " + expectedRate +
" bytes/second that the unit test observed. This should never be" +
" the case since the test underestimates the rate by looking at " +
" end-to-end time instead of just block download time.",
downloadRate >= expectedRate);
long downloadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(),
WASB_DOWNLOAD_LATENCY);
System.out.println("Download latency: " + downloadLatency);
expectedLatency = downloadDurationMs; // We're downloading less than a block.
assertTrue("The download latency " + downloadLatency +
" should be greater than zero now that I've just downloaded a file.",
downloadLatency > 0);
assertTrue("The download latency " + downloadLatency +
" is more than the expected range of around " + expectedLatency +
" milliseconds that the unit test observed. This should never be" +
" the case since the test overestimates the latency by looking at " +
" end-to-end time instead of just block download time.",
downloadLatency <= expectedLatency);
assertNoErrors();
}
@Test
public void testMetricsOnBigFileCreateRead() throws Exception {
long base = getBaseWebResponses();
assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(getInstrumentation()));
Path filePath = new Path("/metricsTest_webResponses");
final int FILE_SIZE = 100 * 1024 * 1024;
// Suppress auto-update of bandwidth metrics so we get
// to update them exactly when we want to.
getBandwidthGaugeUpdater().suppressAutoUpdate();
// Create a file
OutputStream outputStream = fs.create(filePath);
outputStream.write(new byte[FILE_SIZE]);
outputStream.close();
// The exact number of requests/responses that happen to create a file
// can vary - at the time of writing this code it takes 34
// requests/responses for the 100 MB file,
// plus the initial container check request, but that
// can very easily change in the future. Just assert that we do roughly
// more than 20 but less than 50.
logOpResponseCount("Creating a 100 MB file", base);
base = assertWebResponsesInRange(base, 20, 50);
getBandwidthGaugeUpdater().triggerUpdate(true);
long totalBytesWritten = AzureMetricsTestUtil.getCurrentTotalBytesWritten(getInstrumentation());
assertTrue("The total bytes written " + totalBytesWritten +
" is pretty far from the expected range of around " + FILE_SIZE +
" bytes plus a little overhead.",
totalBytesWritten >= FILE_SIZE && totalBytesWritten < (FILE_SIZE * 2));
long uploadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_UPLOAD_RATE);
System.out.println("Upload rate: " + uploadRate + " bytes/second.");
long uploadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(),
WASB_UPLOAD_LATENCY);
System.out.println("Upload latency: " + uploadLatency);
assertTrue("The upload latency " + uploadLatency +
" should be greater than zero now that I've just uploaded a file.",
uploadLatency > 0);
// Read the file
InputStream inputStream = fs.open(filePath);
int count = 0;
while (inputStream.read() >= 0) {
count++;
}
inputStream.close();
assertEquals(FILE_SIZE, count);
// Again, exact number varies. At the time of writing this code
// it takes 27 request/responses, so just assert a rough range between
// 20 and 40.
logOpResponseCount("Reading a 100 MB file", base);
base = assertWebResponsesInRange(base, 20, 40);
getBandwidthGaugeUpdater().triggerUpdate(false);
long totalBytesRead = AzureMetricsTestUtil.getCurrentTotalBytesRead(getInstrumentation());
assertEquals(FILE_SIZE, totalBytesRead);
long downloadRate = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(), WASB_DOWNLOAD_RATE);
System.out.println("Download rate: " + downloadRate + " bytes/second.");
long downloadLatency = AzureMetricsTestUtil.getLongGaugeValue(getInstrumentation(),
WASB_DOWNLOAD_LATENCY);
System.out.println("Download latency: " + downloadLatency);
assertTrue("The download latency " + downloadLatency +
" should be greater than zero now that I've just downloaded a file.",
downloadLatency > 0);
}
@Test
public void testMetricsOnFileRename() throws Exception {
long base = getBaseWebResponses();
Path originalPath = new Path("/metricsTest_RenameStart");
Path destinationPath = new Path("/metricsTest_RenameFinal");
// Create an empty file
assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_CREATED));
assertTrue(fs.createNewFile(originalPath));
logOpResponseCount("Creating an empty file", base);
base = assertWebResponsesInRange(base, 2, 20);
assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_CREATED));
// Rename the file
assertTrue(fs.rename(originalPath, destinationPath));
// Varies: at the time of writing this code it takes 7 requests/responses.
logOpResponseCount("Renaming a file", base);
base = assertWebResponsesInRange(base, 2, 15);
assertNoErrors();
}
@Test
public void testMetricsOnFileExistsDelete() throws Exception {
long base = getBaseWebResponses();
Path filePath = new Path("/metricsTest_delete");
// Check existence
assertFalse(fs.exists(filePath));
// At the time of writing this code it takes 2 requests/responses to
// check existence, which seems excessive, plus initial request for
// container check.
logOpResponseCount("Checking file existence for non-existent file", base);
base = assertWebResponsesInRange(base, 1, 3);
// Create an empty file
assertTrue(fs.createNewFile(filePath));
base = getCurrentWebResponses();
// Check existence again
assertTrue(fs.exists(filePath));
logOpResponseCount("Checking file existence for existent file", base);
base = assertWebResponsesInRange(base, 1, 2);
// Delete the file
assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_DELETED));
assertTrue(fs.delete(filePath, false));
// At the time of writing this code it takes 4 requests/responses to
// delete, which seems excessive. Check for range 1-4 for now.
logOpResponseCount("Deleting a file", base);
base = assertWebResponsesInRange(base, 1, 4);
assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_FILES_DELETED));
assertNoErrors();
}
@Test
public void testMetricsOnDirRename() throws Exception {
long base = getBaseWebResponses();
Path originalDirName = new Path("/metricsTestDirectory_RenameStart");
Path innerFileName = new Path(originalDirName, "innerFile");
Path destDirName = new Path("/metricsTestDirectory_RenameFinal");
// Create an empty directory
assertTrue(fs.mkdirs(originalDirName));
base = getCurrentWebResponses();
// Create an inner file
assertTrue(fs.createNewFile(innerFileName));
base = getCurrentWebResponses();
// Rename the directory
assertTrue(fs.rename(originalDirName, destDirName));
// At the time of writing this code it takes 11 requests/responses
// to rename the directory with one file. Check for range 1-20 for now.
logOpResponseCount("Renaming a directory", base);
base = assertWebResponsesInRange(base, 1, 20);
assertNoErrors();
}
@Test
public void testClientErrorMetrics() throws Exception {
String directoryName = "metricsTestDirectory_ClientError";
Path directoryPath = new Path("/" + directoryName);
assertTrue(fs.mkdirs(directoryPath));
String leaseID = testAccount.acquireShortLease(directoryName);
try {
try {
fs.delete(directoryPath, true);
assertTrue("Should've thrown.", false);
} catch (AzureException ex) {
assertTrue("Unexpected exception: " + ex,
ex.getMessage().contains("lease"));
}
assertEquals(1, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_CLIENT_ERRORS));
assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_SERVER_ERRORS));
} finally {
testAccount.releaseLease(leaseID, directoryName);
}
}
private void logOpResponseCount(String opName, long base) {
System.out.println(opName + " took " + (getCurrentWebResponses() - base) +
" web responses to complete.");
}
/**
* Gets (and asserts) the value of the wasb_web_responses counter just
* after the creation of the file system object.
*/
private long getBaseWebResponses() {
// The number of requests should start at 0
return assertWebResponsesEquals(0, 0);
}
/**
* Gets the current value of the wasb_web_responses counter.
*/
private long getCurrentWebResponses() {
return AzureMetricsTestUtil.getCurrentWebResponses(getInstrumentation());
}
/**
* Checks that the wasb_web_responses counter is at the given value.
* @param base The base value (before the operation of interest).
* @param expected The expected value for the operation of interest.
* @return The new base value now.
*/
private long assertWebResponsesEquals(long base, long expected) {
assertCounter(WASB_WEB_RESPONSES, base + expected, getMyMetrics());
return base + expected;
}
private void assertNoErrors() {
assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_CLIENT_ERRORS));
assertEquals(0, AzureMetricsTestUtil.getLongCounterValue(getInstrumentation(), WASB_SERVER_ERRORS));
}
/**
* Checks that the wasb_web_responses counter is in the given range.
* @param base The base value (before the operation of interest).
* @param inclusiveLowerLimit The lower limit for what it should increase by.
* @param inclusiveUpperLimit The upper limit for what it should increase by.
* @return The new base value now.
*/
private long assertWebResponsesInRange(long base,
long inclusiveLowerLimit,
long inclusiveUpperLimit) {
long currentResponses = getCurrentWebResponses();
long justOperation = currentResponses - base;
assertTrue(String.format(
"Web responses expected in range [%d, %d], but was %d.",
inclusiveLowerLimit, inclusiveUpperLimit, justOperation),
justOperation >= inclusiveLowerLimit &&
justOperation <= inclusiveUpperLimit);
return currentResponses;
}
/**
* Gets the metrics for the file system object.
* @return The metrics record.
*/
private MetricsRecordBuilder getMyMetrics() {
return getMetrics(getInstrumentation());
}
private AzureFileSystemInstrumentation getInstrumentation() {
return ((NativeAzureFileSystem)fs).getInstrumentation();
}
/**
* A matcher class for asserting that we got a tag with a given
* value.
*/
private static class TagMatcher extends TagExistsMatcher {
private final String tagValue;
public TagMatcher(String tagName, String tagValue) {
super(tagName);
this.tagValue = tagValue;
}
@Override
public boolean matches(MetricsTag toMatch) {
return toMatch.value().equals(tagValue);
}
@Override
public void describeTo(Description desc) {
super.describeTo(desc);
desc.appendText(" with value " + tagValue);
}
}
/**
* A matcher class for asserting that we got a tag with any value.
*/
private static class TagExistsMatcher extends BaseMatcher<MetricsTag> {
private final String tagName;
public TagExistsMatcher(String tagName) {
this.tagName = tagName;
}
@Override
public boolean matches(Object toMatch) {
MetricsTag asTag = (MetricsTag)toMatch;
return asTag.name().equals(tagName) && matches(asTag);
}
protected boolean matches(MetricsTag toMatch) {
return true;
}
@Override
public void describeTo(Description desc) {
desc.appendText("Has tag " + tagName);
}
}
}

View File

@ -0,0 +1,125 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeNotNull;
import java.util.Date;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
import org.junit.Assume;
import org.junit.Test;
public class TestBandwidthGaugeUpdater {
@Test
public void testSingleThreaded() throws Exception {
AzureFileSystemInstrumentation instrumentation =
new AzureFileSystemInstrumentation(new Configuration());
BandwidthGaugeUpdater updater =
new BandwidthGaugeUpdater(instrumentation, 1000, true);
updater.triggerUpdate(true);
assertEquals(0, AzureMetricsTestUtil.getCurrentBytesWritten(instrumentation));
updater.blockUploaded(new Date(), new Date(), 150);
updater.triggerUpdate(true);
assertEquals(150, AzureMetricsTestUtil.getCurrentBytesWritten(instrumentation));
updater.blockUploaded(new Date(new Date().getTime() - 10000),
new Date(), 200);
updater.triggerUpdate(true);
long currentBytes = AzureMetricsTestUtil.getCurrentBytesWritten(instrumentation);
assertTrue(
"We expect around (200/10 = 20) bytes written as the gauge value." +
"Got " + currentBytes,
currentBytes > 18 && currentBytes < 22);
updater.close();
}
@Test
public void testMultiThreaded() throws Exception {
final AzureFileSystemInstrumentation instrumentation =
new AzureFileSystemInstrumentation(new Configuration());
final BandwidthGaugeUpdater updater =
new BandwidthGaugeUpdater(instrumentation, 1000, true);
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
updater.blockDownloaded(new Date(), new Date(), 10);
updater.blockDownloaded(new Date(0), new Date(0), 10);
}
});
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
updater.triggerUpdate(false);
assertEquals(10 * threads.length, AzureMetricsTestUtil.getCurrentBytesRead(instrumentation));
updater.close();
}
@Test
public void testFinalizerThreadShutdown() throws Exception {
// force cleanup of any existing wasb filesystems
System.gc();
System.runFinalization();
int nUpdaterThreadsStart = getWasbThreadCount();
assertTrue("Existing WASB threads have not been cleared", nUpdaterThreadsStart == 0);
final int nFilesystemsToSpawn = 10;
AzureBlobStorageTestAccount testAccount = null;
for(int i = 0; i < nFilesystemsToSpawn; i++){
testAccount = AzureBlobStorageTestAccount.createMock();
testAccount.getFileSystem();
}
int nUpdaterThreadsAfterSpawn = getWasbThreadCount();
Assume.assumeTrue("Background threads should have spawned.", nUpdaterThreadsAfterSpawn == 10);
testAccount = null; //clear the last reachable reference
// force cleanup
System.gc();
System.runFinalization();
int nUpdaterThreadsAfterCleanup = getWasbThreadCount();
assertTrue("Finalizers should have reduced the thread count. ", nUpdaterThreadsAfterCleanup == 0 );
}
private int getWasbThreadCount() {
int c = 0;
Map<Thread, StackTraceElement[]> stacksStart = Thread.getAllStackTraces();
for (Thread t : stacksStart.keySet()){
if(t.getName().equals(BandwidthGaugeUpdater.THREAD_NAME))
{
c++;
}
}
return c;
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import static org.junit.Assert.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount;
import org.apache.hadoop.fs.azure.NativeAzureFileSystem;
import org.junit.*;
/**
* Tests that the WASB-specific metrics system is working correctly.
*/
public class TestNativeAzureFileSystemMetricsSystem {
private static final String WASB_FILES_CREATED = "wasb_files_created";
private static int getFilesCreated(AzureBlobStorageTestAccount testAccount) {
return testAccount.getLatestMetricValue(WASB_FILES_CREATED, 0).intValue();
}
/**
* Tests that when we have multiple file systems created/destroyed
* metrics from each are published correctly.
* @throws Exception
*/
@Test
public void testMetricsAcrossFileSystems()
throws Exception {
AzureBlobStorageTestAccount a1, a2, a3;
a1 = AzureBlobStorageTestAccount.createMock();
assertEquals(0, getFilesCreated(a1));
a2 = AzureBlobStorageTestAccount.createMock();
assertEquals(0, getFilesCreated(a2));
a1.getFileSystem().create(new Path("/foo")).close();
a1.getFileSystem().create(new Path("/bar")).close();
a2.getFileSystem().create(new Path("/baz")).close();
assertEquals(0, getFilesCreated(a1));
assertEquals(0, getFilesCreated(a2));
a1.closeFileSystem(); // Causes the file system to close, which publishes metrics
a2.closeFileSystem();
assertEquals(2, getFilesCreated(a1));
assertEquals(1, getFilesCreated(a2));
a3 = AzureBlobStorageTestAccount.createMock();
assertEquals(0, getFilesCreated(a3));
a3.closeFileSystem();
assertEquals(0, getFilesCreated(a3));
}
@Test
public void testMetricsSourceNames() {
String name1 = NativeAzureFileSystem.newMetricsSourceName();
String name2 = NativeAzureFileSystem.newMetricsSourceName();
assertTrue(name1.startsWith("AzureFileSystemMetrics"));
assertTrue(name2.startsWith("AzureFileSystemMetrics"));
assertTrue(!name1.equals(name2));
}
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azure.metrics;
import static org.junit.Assert.*;
import org.junit.*;
public class TestRollingWindowAverage {
/**
* Tests the basic functionality of the class.
*/
@Test
public void testBasicFunctionality() throws Exception {
RollingWindowAverage average = new RollingWindowAverage(100);
assertEquals(0, average.getCurrentAverage()); // Nothing there yet.
average.addPoint(5);
assertEquals(5, average.getCurrentAverage()); // One point in there.
Thread.sleep(50);
average.addPoint(15);
assertEquals(10, average.getCurrentAverage()); // Two points in there.
Thread.sleep(60);
assertEquals(15, average.getCurrentAverage()); // One point retired.
Thread.sleep(50);
assertEquals(0, average.getCurrentAverage()); // Both points retired.
}
}

View File

@ -27,6 +27,7 @@
</property> </property>
<!-- For tests against live azure, provide the following account information --> <!-- For tests against live azure, provide the following account information -->
<!-- <!--
<property> <property>

View File

@ -0,0 +1,18 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
azure-file-system.sink.azuretestcollector.class = org.apache.hadoop.fs.azure.AzureBlobStorageTestAccount$StandardCollector