diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java similarity index 96% rename from hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java rename to hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java index 9094c4065de..57cc3eada48 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsInstrumentation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsCountersImpl.java @@ -41,7 +41,7 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; /** * Instrumentation of Abfs counters. */ -public class AbfsInstrumentation implements AbfsCounters { +public class AbfsCountersImpl implements AbfsCounters { /** * Single context for all the Abfs counters to separate them from other @@ -78,10 +78,17 @@ public class AbfsInstrumentation implements AbfsCounters { DIRECTORIES_DELETED, FILES_CREATED, FILES_DELETED, - ERROR_IGNORED + ERROR_IGNORED, + CONNECTIONS_MADE, + SEND_REQUESTS, + GET_RESPONSES, + BYTES_SENT, + BYTES_RECEIVED, + READ_THROTTLES, + WRITE_THROTTLES }; - public AbfsInstrumentation(URI uri) { + public AbfsCountersImpl(URI uri) { UUID fileSystemInstanceId = UUID.randomUUID(); registry.tag(REGISTRY_ID, "A unique identifier for the instance", diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java index a9867aa12b8..2935cd75431 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsStatistic.java @@ -22,7 +22,7 @@ import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames; /** * Statistic which are collected in Abfs. - * Available as metrics in {@link AbfsInstrumentation}. + * Available as metrics in {@link AbfsCountersImpl}. */ public enum AbfsStatistic { @@ -57,7 +57,23 @@ public enum AbfsStatistic { FILES_DELETED("files_deleted", "Total number of files deleted from the object store."), ERROR_IGNORED("error_ignored", - "Errors caught and ignored."); + "Errors caught and ignored."), + + //Network statistics. + CONNECTIONS_MADE("connections_made", + "Total number of times a connection was made with the data store."), + SEND_REQUESTS("send_requests", + "Total number of times http requests were sent to the data store."), + GET_RESPONSES("get_responses", + "Total number of times a response was received."), + BYTES_SENT("bytes_sent", + "Total bytes uploaded."), + BYTES_RECEIVED("bytes_received", + "Total bytes received."), + READ_THROTTLES("read_throttles", + "Total number of times a read operation is throttled."), + WRITE_THROTTLES("write_throttles", + "Total number of times a write operation is throttled."); private String statName; private String statDescription; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 8fce0ffb689..84d60686480 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -97,7 +97,7 @@ public class AzureBlobFileSystem extends FileSystem { private boolean delegationTokenEnabled = false; private AbfsDelegationTokenManager delegationTokenManager; - private AbfsCounters instrumentation; + private AbfsCounters abfsCounters; @Override public void initialize(URI uri, Configuration configuration) @@ -109,11 +109,12 @@ public class AzureBlobFileSystem extends FileSystem { LOG.debug("Initializing AzureBlobFileSystem for {}", uri); this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); - this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration); + abfsCounters = new AbfsCountersImpl(uri); + this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), + configuration, abfsCounters); LOG.trace("AzureBlobFileSystemStore init complete"); final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration(); - instrumentation = new AbfsInstrumentation(uri); this.setWorkingDirectory(this.getHomeDirectory()); if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) { @@ -150,8 +151,8 @@ public class AzureBlobFileSystem extends FileSystem { sb.append("uri=").append(uri); sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); - if (instrumentation != null) { - sb.append(", Statistics: {").append(instrumentation.formString("{", "=", + if (abfsCounters != null) { + sb.append(", Statistics: {").append(abfsCounters.formString("{", "=", "}", true)); sb.append("}"); } @@ -392,7 +393,7 @@ public class AzureBlobFileSystem extends FileSystem { * @param statistic the Statistic to be incremented. */ private void incrementStatistic(AbfsStatistic statistic) { - instrumentation.incrementCounter(statistic, 1); + abfsCounters.incrementCounter(statistic, 1); } /** @@ -1241,7 +1242,7 @@ public class AzureBlobFileSystem extends FileSystem { @VisibleForTesting Map getInstrumentationMap() { - return instrumentation.toMap(); + return abfsCounters.toMap(); } @Override diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index f478c4d1546..27ca20763b8 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -83,6 +83,7 @@ import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer; import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface; import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper; import org.apache.hadoop.fs.azurebfs.services.AbfsClient; +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext; @@ -144,8 +145,9 @@ public class AzureBlobFileSystemStore implements Closeable { private final IdentityTransformerInterface identityTransformer; private final AbfsPerfTracker abfsPerfTracker; - public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration) - throws IOException { + public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, + Configuration configuration, + AbfsCounters abfsCounters) throws IOException { this.uri = uri; String[] authorityParts = authorityParts(uri); final String fileSystemName = authorityParts[0]; @@ -183,7 +185,7 @@ public class AzureBlobFileSystemStore implements Closeable { boolean usingOauth = (authType == AuthType.OAuth); boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme; this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration); - initializeClient(uri, fileSystemName, accountName, useHttps); + initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters); final Class identityTransformerClass = configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class, IdentityTransformerInterface.class); @@ -1171,7 +1173,8 @@ public class AzureBlobFileSystemStore implements Closeable { return isKeyForDirectorySet(key, azureAtomicRenameDirSet); } - private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure) + private void initializeClient(URI uri, String fileSystemName, + String accountName, boolean isSecure, AbfsCounters abfsCounters) throws IOException { if (this.client != null) { return; @@ -1219,11 +1222,11 @@ public class AzureBlobFileSystemStore implements Closeable { if (tokenProvider != null) { this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()), - tokenProvider, abfsPerfTracker); + tokenProvider, abfsPerfTracker, abfsCounters); } else { this.client = new AbfsClient(baseUrl, creds, abfsConfiguration, new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()), - sasTokenProvider, abfsPerfTracker); + sasTokenProvider, abfsPerfTracker, abfsCounters); } LOG.trace("AbfsClient init complete"); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index f104e7b9c4d..f614bbd41d2 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -73,11 +73,13 @@ public class AbfsClient implements Closeable { private final AuthType authType; private AccessTokenProvider tokenProvider; private SASTokenProvider sasTokenProvider; + private final AbfsCounters abfsCounters; private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, final ExponentialRetryPolicy exponentialRetryPolicy, - final AbfsPerfTracker abfsPerfTracker) { + final AbfsPerfTracker abfsPerfTracker, + final AbfsCounters abfsCounters) { this.baseUrl = baseUrl; this.sharedKeyCredentials = sharedKeyCredentials; String baseUrlString = baseUrl.toString(); @@ -104,14 +106,17 @@ public class AbfsClient implements Closeable { this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName); this.abfsPerfTracker = abfsPerfTracker; + this.abfsCounters = abfsCounters; } public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials, final AbfsConfiguration abfsConfiguration, final ExponentialRetryPolicy exponentialRetryPolicy, final AccessTokenProvider tokenProvider, - final AbfsPerfTracker abfsPerfTracker) { - this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker); + final AbfsPerfTracker abfsPerfTracker, + final AbfsCounters abfsCounters) { + this(baseUrl, sharedKeyCredentials, abfsConfiguration, + exponentialRetryPolicy, abfsPerfTracker, abfsCounters); this.tokenProvider = tokenProvider; } @@ -119,8 +124,10 @@ public class AbfsClient implements Closeable { final AbfsConfiguration abfsConfiguration, final ExponentialRetryPolicy exponentialRetryPolicy, final SASTokenProvider sasTokenProvider, - final AbfsPerfTracker abfsPerfTracker) { - this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker); + final AbfsPerfTracker abfsPerfTracker, + final AbfsCounters abfsCounters) { + this(baseUrl, sharedKeyCredentials, abfsConfiguration, + exponentialRetryPolicy, abfsPerfTracker, abfsCounters); this.sasTokenProvider = sasTokenProvider; } @@ -892,4 +899,12 @@ public class AbfsClient implements Closeable { public SASTokenProvider getSasTokenProvider() { return this.sasTokenProvider; } + + /** + * Getter for abfsCounters from AbfsClient. + * @return AbfsCounters instance. + */ + protected AbfsCounters getAbfsCounters() { + return abfsCounters; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java index f1e5aaae683..e1a799b7a26 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingAnalyzer.java @@ -114,16 +114,19 @@ class AbfsClientThrottlingAnalyzer { /** * Suspends the current storage operation, as necessary, to reduce throughput. + * @return true if Thread sleeps(Throttling occurs) else false. */ - public void suspendIfNecessary() { + public boolean suspendIfNecessary() { int duration = sleepDuration; if (duration > 0) { try { Thread.sleep(duration); + return true; } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } } + return false; } @VisibleForTesting @@ -269,4 +272,4 @@ class AbfsClientThrottlingAnalyzer { this.operationsSuccessful = new AtomicLong(); } } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java index 1c6ce17a38c..7303e833418 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientThrottlingIntercept.java @@ -23,6 +23,7 @@ import java.net.HttpURLConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; /** @@ -103,17 +104,24 @@ public final class AbfsClientThrottlingIntercept { * uses this to suspend the request, if necessary, to minimize errors and * maximize throughput. */ - static void sendingRequest(AbfsRestOperationType operationType) { + static void sendingRequest(AbfsRestOperationType operationType, + AbfsCounters abfsCounters) { if (!isAutoThrottlingEnabled) { return; } switch (operationType) { case ReadFile: - singleton.readThrottler.suspendIfNecessary(); + if (singleton.readThrottler.suspendIfNecessary() + && abfsCounters != null) { + abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1); + } break; case Append: - singleton.writeThrottler.suspendIfNecessary(); + if (singleton.writeThrottler.suspendIfNecessary() + && abfsCounters != null) { + abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1); + } break; default: break; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java index 521da96e960..f3986d4b1f3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java @@ -27,6 +27,7 @@ import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -66,6 +67,7 @@ public class AbfsRestOperation { private int retryCount = 0; private AbfsHttpOperation result; + private AbfsCounters abfsCounters; public AbfsHttpOperation getResult() { return result; @@ -131,6 +133,7 @@ public class AbfsRestOperation { this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method) || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); this.sasToken = sasToken; + this.abfsCounters = client.getAbfsCounters(); } /** @@ -160,6 +163,7 @@ public class AbfsRestOperation { this.buffer = buffer; this.bufferOffset = bufferOffset; this.bufferLength = bufferLength; + this.abfsCounters = client.getAbfsCounters(); } /** @@ -205,6 +209,7 @@ public class AbfsRestOperation { try { // initialize the HTTP request and open the connection httpOperation = new AbfsHttpOperation(url, method, requestHeaders); + incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1); switch(client.getAuthType()) { case Custom: @@ -229,14 +234,19 @@ public class AbfsRestOperation { // dump the headers AbfsIoUtils.dumpHeadersToDebugLog("Request Headers", httpOperation.getConnection().getRequestProperties()); - AbfsClientThrottlingIntercept.sendingRequest(operationType); + AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters); if (hasRequestBody) { // HttpUrlConnection requires httpOperation.sendRequest(buffer, bufferOffset, bufferLength); + incrementCounter(AbfsStatistic.SEND_REQUESTS, 1); + incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength); } httpOperation.processResponse(buffer, bufferOffset, bufferLength); + incrementCounter(AbfsStatistic.GET_RESPONSES, 1); + incrementCounter(AbfsStatistic.BYTES_RECEIVED, + httpOperation.getBytesReceived()); } catch (IOException ex) { if (ex instanceof UnknownHostException) { LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost())); @@ -276,4 +286,16 @@ public class AbfsRestOperation { return true; } + + /** + * Incrementing Abfs counters with a long value. + * + * @param statistic the Abfs statistic that needs to be incremented. + * @param value the value to be incremented by. + */ + private void incrementCounter(AbfsStatistic statistic, long value) { + if (abfsCounters != null) { + abfsCounters.incrementCounter(statistic, value); + } + } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java index f41cbd63186..a80bee65bf4 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java @@ -436,9 +436,10 @@ public abstract class AbstractAbfsIntegrationTest extends * @param metricMap map of (String, Long) with statistics name as key and * statistics value as map value. */ - protected void assertAbfsStatistics(AbfsStatistic statistic, + protected long assertAbfsStatistics(AbfsStatistic statistic, long expectedValue, Map metricMap) { assertEquals("Mismatch in " + statistic.getStatName(), expectedValue, (long) metricMap.get(statistic.getStatName())); + return expectedValue; } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java new file mode 100644 index 00000000000..904fdf3f7c1 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsNetworkStatistics.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.junit.Test; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; +import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; + +public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class); + private static final int LARGE_OPERATIONS = 10; + + public ITestAbfsNetworkStatistics() throws Exception { + } + + /** + * Testing connections_made, send_request and bytes_send statistics in + * {@link AbfsRestOperation}. + */ + @Test + public void testAbfsHttpSendStatistics() throws IOException { + describe("Test to check correct values of statistics after Abfs http send " + + "request is done."); + + AzureBlobFileSystem fs = getFileSystem(); + Map metricMap; + Path sendRequestPath = path(getMethodName()); + String testNetworkStatsString = "http_send"; + long connectionsMade, requestsSent, bytesSent; + + /* + * Creating AbfsOutputStream will result in 1 connection made and 1 send + * request. + */ + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + sendRequestPath)) { + out.write(testNetworkStatsString.getBytes()); + + /* + * Flushes all outstanding data (i.e. the current unfinished packet) + * from the client into the service on all DataNode replicas. + */ + out.hflush(); + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing the network stats with 1 write operation. + * + * connections_made : 3(getFileSystem()) + 1(AbfsOutputStream) + 2(flush). + * + * send_requests : 1(getFileSystem()) + 1(AbfsOutputStream) + 2(flush). + * + * bytes_sent : bytes wrote in AbfsOutputStream. + */ + connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + 6, metricMap); + requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4, + metricMap); + bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT, + testNetworkStatsString.getBytes().length, metricMap); + + } + + // To close the AbfsOutputStream 1 connection is made and 1 request is sent. + connectionsMade++; + requestsSent++; + + try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs, + sendRequestPath)) { + + for (int i = 0; i < LARGE_OPERATIONS; i++) { + out.write(testNetworkStatsString.getBytes()); + + /* + * 1 flush call would create 2 connections and 2 send requests. + * when hflush() is called it will essentially trigger append() and + * flush() inside AbfsRestOperation. Both of which calls + * executeHttpOperation() method which creates a connection and sends + * requests. + */ + out.hflush(); + } + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing the network stats with Large amount of bytes sent. + * + * connections made : connections_made(Last assertion) + 1 + * (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush). + * + * send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) + + * LARGE_OPERATIONS * 2(flush). + * + * bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes + * wrote each time). + * + */ + assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE, + connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap); + assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, + requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap); + assertAbfsStatistics(AbfsStatistic.BYTES_SENT, + bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length), + metricMap); + + } + + } + + /** + * Testing get_response and bytes_received in {@link AbfsRestOperation}. + */ + @Test + public void testAbfsHttpResponseStatistics() throws IOException { + describe("Test to check correct values of statistics after Http " + + "Response is processed."); + + AzureBlobFileSystem fs = getFileSystem(); + Path getResponsePath = path(getMethodName()); + Map metricMap; + String testResponseString = "some response"; + long getResponses, bytesReceived; + + FSDataOutputStream out = null; + FSDataInputStream in = null; + try { + + /* + * Creating a File and writing some bytes in it. + * + * get_response : 3(getFileSystem) + 1(OutputStream creation) + 2 + * (Writing data in Data store). + * + */ + out = fs.create(getResponsePath); + out.write(testResponseString.getBytes()); + out.hflush(); + + // open would require 1 get response. + in = fs.open(getResponsePath); + // read would require 1 get response and also get the bytes received. + int result = in.read(); + + // Confirming read isn't -1. + LOG.info("Result of read operation : {}", result); + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing values of statistics after writing and reading a buffer. + * + * get_responses - 6(above operations) + 1(open()) + 1 (read()). + * + * bytes_received - This should be equal to bytes sent earlier. + */ + getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8, + metricMap); + // Testing that bytes received is equal to bytes sent. + long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName()); + bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, + bytesSend, + metricMap); + + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + + // To close the streams 1 response is received. + getResponses++; + + try { + + /* + * Creating a file and writing buffer into it. Also recording the + * buffer for future read() call. + * This creating outputStream and writing requires 2 * + * (LARGE_OPERATIONS) get requests. + */ + StringBuilder largeBuffer = new StringBuilder(); + out = fs.create(getResponsePath); + for (int i = 0; i < LARGE_OPERATIONS; i++) { + out.write(testResponseString.getBytes()); + out.hflush(); + largeBuffer.append(testResponseString); + } + + // Open requires 1 get_response. + in = fs.open(getResponsePath); + + /* + * Reading the file which was written above. This read() call would + * read bytes equal to the bytes that was written above. + * Get response would be 1 only. + */ + in.read(0, largeBuffer.toString().getBytes(), 0, + largeBuffer.toString().getBytes().length); + + metricMap = fs.getInstrumentationMap(); + + /* + * Testing the statistics values after writing and reading a large buffer. + * + * get_response : get_responses(Last assertion) + 1 + * (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing + * LARGE_OPERATIONS times) + 1(open()) + 1(read()). + * + * bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS * + * bytes wrote each time (bytes_received is equal to bytes wrote in the + * File). + * + */ + assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED, + bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length), + metricMap); + assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, + getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap); + + } finally { + IOUtils.cleanupWithLogger(LOG, out, in); + } + } + +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java index c88dc847a3f..42205807c1b 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsStatistics.java @@ -45,7 +45,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest { describe("Testing the initial values of Abfs counters"); AbfsCounters abfsCounters = - new AbfsInstrumentation(getFileSystem().getUri()); + new AbfsCountersImpl(getFileSystem().getUri()); Map metricMap = abfsCounters.toMap(); for (Map.Entry entry : metricMap.entrySet()) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java new file mode 100644 index 00000000000..0639cf2f82b --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsNetworkStatistics.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; + +public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { + + private static final int LARGE_OPERATIONS = 1000; + + public TestAbfsNetworkStatistics() throws Exception { + } + + /** + * Test to check correct values of read and write throttling statistics in + * {@code AbfsClientThrottlingAnalyzer}. + */ + @Test + public void testAbfsThrottlingStatistics() throws IOException { + describe("Test to check correct values of read throttle and write " + + "throttle statistics in Abfs"); + + AbfsCounters statistics = + new AbfsCountersImpl(getFileSystem().getUri()); + + /* + * Calling the throttle methods to check correct summation and values of + * the counters. + */ + for (int i = 0; i < LARGE_OPERATIONS; i++) { + statistics.incrementCounter(AbfsStatistic.READ_THROTTLES, 1); + statistics.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1); + } + + Map metricMap = statistics.toMap(); + + /* + * Test to check read and write throttle statistics gave correct values for + * 1000 calls. + */ + assertAbfsStatistics(AbfsStatistic.READ_THROTTLES, LARGE_OPERATIONS, + metricMap); + assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS, + metricMap); + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java index 20d96fadef6..f831d2d4cd2 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAbfsStatistics.java @@ -43,7 +43,7 @@ public class TestAbfsStatistics extends AbstractAbfsIntegrationTest { describe("Testing the counter values after Abfs is initialised"); AbfsCounters instrumentation = - new AbfsInstrumentation(getFileSystem().getUri()); + new AbfsCountersImpl(getFileSystem().getUri()); //Testing summation of the counter values. for (int i = 0; i < LARGE_OPS; i++) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java index 0fd65fb0a60..8197e7e2020 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -100,7 +100,7 @@ public final class TestAbfsClient { private String getUserAgentString(AbfsConfiguration config, boolean includeSSLProvider) throws MalformedURLException { AbfsClient client = new AbfsClient(new URL("https://azure.com"), null, - config, null, (AccessTokenProvider) null, null); + config, null, (AccessTokenProvider) null, null, null); String sslProviderName = null; if (includeSSLProvider) { sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory() @@ -267,7 +267,7 @@ public final class TestAbfsClient { (currentAuthType == AuthType.OAuth ? abfsConfig.getTokenProvider() : null), - tracker); + tracker, null); return testClient; }