HADOOP-17065. Add Network Counters to ABFS (#2056)
Contributed by Mehakmeet Singh.
This commit is contained in:
parent
469841446f
commit
3472c3efc0
|
@ -41,7 +41,7 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
|
|||
/**
|
||||
* Instrumentation of Abfs counters.
|
||||
*/
|
||||
public class AbfsInstrumentation implements AbfsCounters {
|
||||
public class AbfsCountersImpl implements AbfsCounters {
|
||||
|
||||
/**
|
||||
* Single context for all the Abfs counters to separate them from other
|
||||
|
@ -78,10 +78,17 @@ public class AbfsInstrumentation implements AbfsCounters {
|
|||
DIRECTORIES_DELETED,
|
||||
FILES_CREATED,
|
||||
FILES_DELETED,
|
||||
ERROR_IGNORED
|
||||
ERROR_IGNORED,
|
||||
CONNECTIONS_MADE,
|
||||
SEND_REQUESTS,
|
||||
GET_RESPONSES,
|
||||
BYTES_SENT,
|
||||
BYTES_RECEIVED,
|
||||
READ_THROTTLES,
|
||||
WRITE_THROTTLES
|
||||
};
|
||||
|
||||
public AbfsInstrumentation(URI uri) {
|
||||
public AbfsCountersImpl(URI uri) {
|
||||
UUID fileSystemInstanceId = UUID.randomUUID();
|
||||
registry.tag(REGISTRY_ID,
|
||||
"A unique identifier for the instance",
|
|
@ -22,7 +22,7 @@ import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
|
|||
|
||||
/**
|
||||
* Statistic which are collected in Abfs.
|
||||
* Available as metrics in {@link AbfsInstrumentation}.
|
||||
* Available as metrics in {@link AbfsCountersImpl}.
|
||||
*/
|
||||
public enum AbfsStatistic {
|
||||
|
||||
|
@ -57,7 +57,23 @@ public enum AbfsStatistic {
|
|||
FILES_DELETED("files_deleted",
|
||||
"Total number of files deleted from the object store."),
|
||||
ERROR_IGNORED("error_ignored",
|
||||
"Errors caught and ignored.");
|
||||
"Errors caught and ignored."),
|
||||
|
||||
//Network statistics.
|
||||
CONNECTIONS_MADE("connections_made",
|
||||
"Total number of times a connection was made with the data store."),
|
||||
SEND_REQUESTS("send_requests",
|
||||
"Total number of times http requests were sent to the data store."),
|
||||
GET_RESPONSES("get_responses",
|
||||
"Total number of times a response was received."),
|
||||
BYTES_SENT("bytes_sent",
|
||||
"Total bytes uploaded."),
|
||||
BYTES_RECEIVED("bytes_received",
|
||||
"Total bytes received."),
|
||||
READ_THROTTLES("read_throttles",
|
||||
"Total number of times a read operation is throttled."),
|
||||
WRITE_THROTTLES("write_throttles",
|
||||
"Total number of times a write operation is throttled.");
|
||||
|
||||
private String statName;
|
||||
private String statDescription;
|
||||
|
|
|
@ -97,7 +97,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
|
||||
private boolean delegationTokenEnabled = false;
|
||||
private AbfsDelegationTokenManager delegationTokenManager;
|
||||
private AbfsCounters instrumentation;
|
||||
private AbfsCounters abfsCounters;
|
||||
|
||||
@Override
|
||||
public void initialize(URI uri, Configuration configuration)
|
||||
|
@ -109,11 +109,12 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
LOG.debug("Initializing AzureBlobFileSystem for {}", uri);
|
||||
|
||||
this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
|
||||
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration);
|
||||
abfsCounters = new AbfsCountersImpl(uri);
|
||||
this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(),
|
||||
configuration, abfsCounters);
|
||||
LOG.trace("AzureBlobFileSystemStore init complete");
|
||||
|
||||
final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
|
||||
instrumentation = new AbfsInstrumentation(uri);
|
||||
this.setWorkingDirectory(this.getHomeDirectory());
|
||||
|
||||
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
|
||||
|
@ -150,8 +151,8 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
sb.append("uri=").append(uri);
|
||||
sb.append(", user='").append(abfsStore.getUser()).append('\'');
|
||||
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
|
||||
if (instrumentation != null) {
|
||||
sb.append(", Statistics: {").append(instrumentation.formString("{", "=",
|
||||
if (abfsCounters != null) {
|
||||
sb.append(", Statistics: {").append(abfsCounters.formString("{", "=",
|
||||
"}", true));
|
||||
sb.append("}");
|
||||
}
|
||||
|
@ -392,7 +393,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
* @param statistic the Statistic to be incremented.
|
||||
*/
|
||||
private void incrementStatistic(AbfsStatistic statistic) {
|
||||
instrumentation.incrementCounter(statistic, 1);
|
||||
abfsCounters.incrementCounter(statistic, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1241,7 +1242,7 @@ public class AzureBlobFileSystem extends FileSystem {
|
|||
|
||||
@VisibleForTesting
|
||||
Map<String, Long> getInstrumentationMap() {
|
||||
return instrumentation.toMap();
|
||||
return abfsCounters.toMap();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -82,6 +82,7 @@ import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformer;
|
|||
import org.apache.hadoop.fs.azurebfs.oauth2.IdentityTransformerInterface;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
|
||||
|
@ -143,8 +144,9 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
private final IdentityTransformerInterface identityTransformer;
|
||||
private final AbfsPerfTracker abfsPerfTracker;
|
||||
|
||||
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme, Configuration configuration)
|
||||
throws IOException {
|
||||
public AzureBlobFileSystemStore(URI uri, boolean isSecureScheme,
|
||||
Configuration configuration,
|
||||
AbfsCounters abfsCounters) throws IOException {
|
||||
this.uri = uri;
|
||||
String[] authorityParts = authorityParts(uri);
|
||||
final String fileSystemName = authorityParts[0];
|
||||
|
@ -182,7 +184,7 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
boolean usingOauth = (authType == AuthType.OAuth);
|
||||
boolean useHttps = (usingOauth || abfsConfiguration.isHttpsAlwaysUsed()) ? true : isSecureScheme;
|
||||
this.abfsPerfTracker = new AbfsPerfTracker(fileSystemName, accountName, this.abfsConfiguration);
|
||||
initializeClient(uri, fileSystemName, accountName, useHttps);
|
||||
initializeClient(uri, fileSystemName, accountName, useHttps, abfsCounters);
|
||||
final Class<? extends IdentityTransformerInterface> identityTransformerClass =
|
||||
configuration.getClass(FS_AZURE_IDENTITY_TRANSFORM_CLASS, IdentityTransformer.class,
|
||||
IdentityTransformerInterface.class);
|
||||
|
@ -1170,7 +1172,8 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
return isKeyForDirectorySet(key, azureAtomicRenameDirSet);
|
||||
}
|
||||
|
||||
private void initializeClient(URI uri, String fileSystemName, String accountName, boolean isSecure)
|
||||
private void initializeClient(URI uri, String fileSystemName,
|
||||
String accountName, boolean isSecure, AbfsCounters abfsCounters)
|
||||
throws IOException {
|
||||
if (this.client != null) {
|
||||
return;
|
||||
|
@ -1214,11 +1217,11 @@ public class AzureBlobFileSystemStore implements Closeable {
|
|||
if (tokenProvider != null) {
|
||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
||||
tokenProvider, abfsPerfTracker);
|
||||
tokenProvider, abfsPerfTracker, abfsCounters);
|
||||
} else {
|
||||
this.client = new AbfsClient(baseUrl, creds, abfsConfiguration,
|
||||
new ExponentialRetryPolicy(abfsConfiguration.getMaxIoRetries()),
|
||||
sasTokenProvider, abfsPerfTracker);
|
||||
sasTokenProvider, abfsPerfTracker, abfsCounters);
|
||||
}
|
||||
LOG.trace("AbfsClient init complete");
|
||||
}
|
||||
|
|
|
@ -73,11 +73,13 @@ public class AbfsClient implements Closeable {
|
|||
private final AuthType authType;
|
||||
private AccessTokenProvider tokenProvider;
|
||||
private SASTokenProvider sasTokenProvider;
|
||||
private final AbfsCounters abfsCounters;
|
||||
|
||||
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||
final AbfsConfiguration abfsConfiguration,
|
||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
||||
final AbfsPerfTracker abfsPerfTracker) {
|
||||
final AbfsPerfTracker abfsPerfTracker,
|
||||
final AbfsCounters abfsCounters) {
|
||||
this.baseUrl = baseUrl;
|
||||
this.sharedKeyCredentials = sharedKeyCredentials;
|
||||
String baseUrlString = baseUrl.toString();
|
||||
|
@ -104,14 +106,17 @@ public class AbfsClient implements Closeable {
|
|||
|
||||
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
|
||||
this.abfsPerfTracker = abfsPerfTracker;
|
||||
this.abfsCounters = abfsCounters;
|
||||
}
|
||||
|
||||
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
|
||||
final AbfsConfiguration abfsConfiguration,
|
||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
||||
final AccessTokenProvider tokenProvider,
|
||||
final AbfsPerfTracker abfsPerfTracker) {
|
||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
|
||||
final AbfsPerfTracker abfsPerfTracker,
|
||||
final AbfsCounters abfsCounters) {
|
||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
|
||||
exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
|
||||
this.tokenProvider = tokenProvider;
|
||||
}
|
||||
|
||||
|
@ -119,8 +124,10 @@ public class AbfsClient implements Closeable {
|
|||
final AbfsConfiguration abfsConfiguration,
|
||||
final ExponentialRetryPolicy exponentialRetryPolicy,
|
||||
final SASTokenProvider sasTokenProvider,
|
||||
final AbfsPerfTracker abfsPerfTracker) {
|
||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration, exponentialRetryPolicy, abfsPerfTracker);
|
||||
final AbfsPerfTracker abfsPerfTracker,
|
||||
final AbfsCounters abfsCounters) {
|
||||
this(baseUrl, sharedKeyCredentials, abfsConfiguration,
|
||||
exponentialRetryPolicy, abfsPerfTracker, abfsCounters);
|
||||
this.sasTokenProvider = sasTokenProvider;
|
||||
}
|
||||
|
||||
|
@ -892,4 +899,12 @@ public class AbfsClient implements Closeable {
|
|||
public SASTokenProvider getSasTokenProvider() {
|
||||
return this.sasTokenProvider;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter for abfsCounters from AbfsClient.
|
||||
* @return AbfsCounters instance.
|
||||
*/
|
||||
protected AbfsCounters getAbfsCounters() {
|
||||
return abfsCounters;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,16 +114,19 @@ class AbfsClientThrottlingAnalyzer {
|
|||
|
||||
/**
|
||||
* Suspends the current storage operation, as necessary, to reduce throughput.
|
||||
* @return true if Thread sleeps(Throttling occurs) else false.
|
||||
*/
|
||||
public void suspendIfNecessary() {
|
||||
public boolean suspendIfNecessary() {
|
||||
int duration = sleepDuration;
|
||||
if (duration > 0) {
|
||||
try {
|
||||
Thread.sleep(duration);
|
||||
return true;
|
||||
} catch (InterruptedException ie) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
@ -269,4 +272,4 @@ class AbfsClientThrottlingAnalyzer {
|
|||
this.operationsSuccessful = new AtomicLong();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.net.HttpURLConnection;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
|
||||
|
||||
/**
|
||||
|
@ -103,17 +104,24 @@ public final class AbfsClientThrottlingIntercept {
|
|||
* uses this to suspend the request, if necessary, to minimize errors and
|
||||
* maximize throughput.
|
||||
*/
|
||||
static void sendingRequest(AbfsRestOperationType operationType) {
|
||||
static void sendingRequest(AbfsRestOperationType operationType,
|
||||
AbfsCounters abfsCounters) {
|
||||
if (!isAutoThrottlingEnabled) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch (operationType) {
|
||||
case ReadFile:
|
||||
singleton.readThrottler.suspendIfNecessary();
|
||||
if (singleton.readThrottler.suspendIfNecessary()
|
||||
&& abfsCounters != null) {
|
||||
abfsCounters.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
|
||||
}
|
||||
break;
|
||||
case Append:
|
||||
singleton.writeThrottler.suspendIfNecessary();
|
||||
if (singleton.writeThrottler.suspendIfNecessary()
|
||||
&& abfsCounters != null) {
|
||||
abfsCounters.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.List;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
|
||||
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
|
||||
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
|
||||
|
@ -66,6 +67,7 @@ public class AbfsRestOperation {
|
|||
private int retryCount = 0;
|
||||
|
||||
private AbfsHttpOperation result;
|
||||
private AbfsCounters abfsCounters;
|
||||
|
||||
public AbfsHttpOperation getResult() {
|
||||
return result;
|
||||
|
@ -131,6 +133,7 @@ public class AbfsRestOperation {
|
|||
this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method)
|
||||
|| AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method));
|
||||
this.sasToken = sasToken;
|
||||
this.abfsCounters = client.getAbfsCounters();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -160,6 +163,7 @@ public class AbfsRestOperation {
|
|||
this.buffer = buffer;
|
||||
this.bufferOffset = bufferOffset;
|
||||
this.bufferLength = bufferLength;
|
||||
this.abfsCounters = client.getAbfsCounters();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -205,6 +209,7 @@ public class AbfsRestOperation {
|
|||
try {
|
||||
// initialize the HTTP request and open the connection
|
||||
httpOperation = new AbfsHttpOperation(url, method, requestHeaders);
|
||||
incrementCounter(AbfsStatistic.CONNECTIONS_MADE, 1);
|
||||
|
||||
switch(client.getAuthType()) {
|
||||
case Custom:
|
||||
|
@ -229,14 +234,19 @@ public class AbfsRestOperation {
|
|||
// dump the headers
|
||||
AbfsIoUtils.dumpHeadersToDebugLog("Request Headers",
|
||||
httpOperation.getConnection().getRequestProperties());
|
||||
AbfsClientThrottlingIntercept.sendingRequest(operationType);
|
||||
AbfsClientThrottlingIntercept.sendingRequest(operationType, abfsCounters);
|
||||
|
||||
if (hasRequestBody) {
|
||||
// HttpUrlConnection requires
|
||||
httpOperation.sendRequest(buffer, bufferOffset, bufferLength);
|
||||
incrementCounter(AbfsStatistic.SEND_REQUESTS, 1);
|
||||
incrementCounter(AbfsStatistic.BYTES_SENT, bufferLength);
|
||||
}
|
||||
|
||||
httpOperation.processResponse(buffer, bufferOffset, bufferLength);
|
||||
incrementCounter(AbfsStatistic.GET_RESPONSES, 1);
|
||||
incrementCounter(AbfsStatistic.BYTES_RECEIVED,
|
||||
httpOperation.getBytesReceived());
|
||||
} catch (IOException ex) {
|
||||
if (ex instanceof UnknownHostException) {
|
||||
LOG.warn(String.format("Unknown host name: %s. Retrying to resolve the host name...", httpOperation.getUrl().getHost()));
|
||||
|
@ -276,4 +286,16 @@ public class AbfsRestOperation {
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Incrementing Abfs counters with a long value.
|
||||
*
|
||||
* @param statistic the Abfs statistic that needs to be incremented.
|
||||
* @param value the value to be incremented by.
|
||||
*/
|
||||
private void incrementCounter(AbfsStatistic statistic, long value) {
|
||||
if (abfsCounters != null) {
|
||||
abfsCounters.incrementCounter(statistic, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -436,9 +436,10 @@ public abstract class AbstractAbfsIntegrationTest extends
|
|||
* @param metricMap map of (String, Long) with statistics name as key and
|
||||
* statistics value as map value.
|
||||
*/
|
||||
protected void assertAbfsStatistics(AbfsStatistic statistic,
|
||||
protected long assertAbfsStatistics(AbfsStatistic statistic,
|
||||
long expectedValue, Map<String, Long> metricMap) {
|
||||
assertEquals("Mismatch in " + statistic.getStatName(), expectedValue,
|
||||
(long) metricMap.get(statistic.getStatName()));
|
||||
return expectedValue;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,253 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
|
||||
|
||||
public class ITestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ITestAbfsNetworkStatistics.class);
|
||||
private static final int LARGE_OPERATIONS = 10;
|
||||
|
||||
public ITestAbfsNetworkStatistics() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing connections_made, send_request and bytes_send statistics in
|
||||
* {@link AbfsRestOperation}.
|
||||
*/
|
||||
@Test
|
||||
public void testAbfsHttpSendStatistics() throws IOException {
|
||||
describe("Test to check correct values of statistics after Abfs http send "
|
||||
+ "request is done.");
|
||||
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
Map<String, Long> metricMap;
|
||||
Path sendRequestPath = path(getMethodName());
|
||||
String testNetworkStatsString = "http_send";
|
||||
long connectionsMade, requestsSent, bytesSent;
|
||||
|
||||
/*
|
||||
* Creating AbfsOutputStream will result in 1 connection made and 1 send
|
||||
* request.
|
||||
*/
|
||||
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
|
||||
sendRequestPath)) {
|
||||
out.write(testNetworkStatsString.getBytes());
|
||||
|
||||
/*
|
||||
* Flushes all outstanding data (i.e. the current unfinished packet)
|
||||
* from the client into the service on all DataNode replicas.
|
||||
*/
|
||||
out.hflush();
|
||||
|
||||
metricMap = fs.getInstrumentationMap();
|
||||
|
||||
/*
|
||||
* Testing the network stats with 1 write operation.
|
||||
*
|
||||
* connections_made : 3(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
|
||||
*
|
||||
* send_requests : 1(getFileSystem()) + 1(AbfsOutputStream) + 2(flush).
|
||||
*
|
||||
* bytes_sent : bytes wrote in AbfsOutputStream.
|
||||
*/
|
||||
connectionsMade = assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
|
||||
6, metricMap);
|
||||
requestsSent = assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS, 4,
|
||||
metricMap);
|
||||
bytesSent = assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
||||
testNetworkStatsString.getBytes().length, metricMap);
|
||||
|
||||
}
|
||||
|
||||
// To close the AbfsOutputStream 1 connection is made and 1 request is sent.
|
||||
connectionsMade++;
|
||||
requestsSent++;
|
||||
|
||||
try (AbfsOutputStream out = createAbfsOutputStreamWithFlushEnabled(fs,
|
||||
sendRequestPath)) {
|
||||
|
||||
for (int i = 0; i < LARGE_OPERATIONS; i++) {
|
||||
out.write(testNetworkStatsString.getBytes());
|
||||
|
||||
/*
|
||||
* 1 flush call would create 2 connections and 2 send requests.
|
||||
* when hflush() is called it will essentially trigger append() and
|
||||
* flush() inside AbfsRestOperation. Both of which calls
|
||||
* executeHttpOperation() method which creates a connection and sends
|
||||
* requests.
|
||||
*/
|
||||
out.hflush();
|
||||
}
|
||||
|
||||
metricMap = fs.getInstrumentationMap();
|
||||
|
||||
/*
|
||||
* Testing the network stats with Large amount of bytes sent.
|
||||
*
|
||||
* connections made : connections_made(Last assertion) + 1
|
||||
* (AbfsOutputStream) + LARGE_OPERATIONS * 2(flush).
|
||||
*
|
||||
* send requests : requests_sent(Last assertion) + 1(AbfsOutputStream) +
|
||||
* LARGE_OPERATIONS * 2(flush).
|
||||
*
|
||||
* bytes sent : bytes_sent(Last assertion) + LARGE_OPERATIONS * (bytes
|
||||
* wrote each time).
|
||||
*
|
||||
*/
|
||||
assertAbfsStatistics(AbfsStatistic.CONNECTIONS_MADE,
|
||||
connectionsMade + 1 + LARGE_OPERATIONS * 2, metricMap);
|
||||
assertAbfsStatistics(AbfsStatistic.SEND_REQUESTS,
|
||||
requestsSent + 1 + LARGE_OPERATIONS * 2, metricMap);
|
||||
assertAbfsStatistics(AbfsStatistic.BYTES_SENT,
|
||||
bytesSent + LARGE_OPERATIONS * (testNetworkStatsString.getBytes().length),
|
||||
metricMap);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Testing get_response and bytes_received in {@link AbfsRestOperation}.
|
||||
*/
|
||||
@Test
|
||||
public void testAbfsHttpResponseStatistics() throws IOException {
|
||||
describe("Test to check correct values of statistics after Http "
|
||||
+ "Response is processed.");
|
||||
|
||||
AzureBlobFileSystem fs = getFileSystem();
|
||||
Path getResponsePath = path(getMethodName());
|
||||
Map<String, Long> metricMap;
|
||||
String testResponseString = "some response";
|
||||
long getResponses, bytesReceived;
|
||||
|
||||
FSDataOutputStream out = null;
|
||||
FSDataInputStream in = null;
|
||||
try {
|
||||
|
||||
/*
|
||||
* Creating a File and writing some bytes in it.
|
||||
*
|
||||
* get_response : 3(getFileSystem) + 1(OutputStream creation) + 2
|
||||
* (Writing data in Data store).
|
||||
*
|
||||
*/
|
||||
out = fs.create(getResponsePath);
|
||||
out.write(testResponseString.getBytes());
|
||||
out.hflush();
|
||||
|
||||
// open would require 1 get response.
|
||||
in = fs.open(getResponsePath);
|
||||
// read would require 1 get response and also get the bytes received.
|
||||
int result = in.read();
|
||||
|
||||
// Confirming read isn't -1.
|
||||
LOG.info("Result of read operation : {}", result);
|
||||
|
||||
metricMap = fs.getInstrumentationMap();
|
||||
|
||||
/*
|
||||
* Testing values of statistics after writing and reading a buffer.
|
||||
*
|
||||
* get_responses - 6(above operations) + 1(open()) + 1 (read()).
|
||||
*
|
||||
* bytes_received - This should be equal to bytes sent earlier.
|
||||
*/
|
||||
getResponses = assertAbfsStatistics(AbfsStatistic.GET_RESPONSES, 8,
|
||||
metricMap);
|
||||
// Testing that bytes received is equal to bytes sent.
|
||||
long bytesSend = metricMap.get(AbfsStatistic.BYTES_SENT.getStatName());
|
||||
bytesReceived = assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
|
||||
bytesSend,
|
||||
metricMap);
|
||||
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||
}
|
||||
|
||||
// To close the streams 1 response is received.
|
||||
getResponses++;
|
||||
|
||||
try {
|
||||
|
||||
/*
|
||||
* Creating a file and writing buffer into it. Also recording the
|
||||
* buffer for future read() call.
|
||||
* This creating outputStream and writing requires 2 *
|
||||
* (LARGE_OPERATIONS) get requests.
|
||||
*/
|
||||
StringBuilder largeBuffer = new StringBuilder();
|
||||
out = fs.create(getResponsePath);
|
||||
for (int i = 0; i < LARGE_OPERATIONS; i++) {
|
||||
out.write(testResponseString.getBytes());
|
||||
out.hflush();
|
||||
largeBuffer.append(testResponseString);
|
||||
}
|
||||
|
||||
// Open requires 1 get_response.
|
||||
in = fs.open(getResponsePath);
|
||||
|
||||
/*
|
||||
* Reading the file which was written above. This read() call would
|
||||
* read bytes equal to the bytes that was written above.
|
||||
* Get response would be 1 only.
|
||||
*/
|
||||
in.read(0, largeBuffer.toString().getBytes(), 0,
|
||||
largeBuffer.toString().getBytes().length);
|
||||
|
||||
metricMap = fs.getInstrumentationMap();
|
||||
|
||||
/*
|
||||
* Testing the statistics values after writing and reading a large buffer.
|
||||
*
|
||||
* get_response : get_responses(Last assertion) + 1
|
||||
* (OutputStream) + 2 * LARGE_OPERATIONS(Writing and flushing
|
||||
* LARGE_OPERATIONS times) + 1(open()) + 1(read()).
|
||||
*
|
||||
* bytes_received : bytes_received(Last assertion) + LARGE_OPERATIONS *
|
||||
* bytes wrote each time (bytes_received is equal to bytes wrote in the
|
||||
* File).
|
||||
*
|
||||
*/
|
||||
assertAbfsStatistics(AbfsStatistic.BYTES_RECEIVED,
|
||||
bytesReceived + LARGE_OPERATIONS * (testResponseString.getBytes().length),
|
||||
metricMap);
|
||||
assertAbfsStatistics(AbfsStatistic.GET_RESPONSES,
|
||||
getResponses + 3 + 2 * LARGE_OPERATIONS, metricMap);
|
||||
|
||||
} finally {
|
||||
IOUtils.cleanupWithLogger(LOG, out, in);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -45,7 +45,7 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
|||
describe("Testing the initial values of Abfs counters");
|
||||
|
||||
AbfsCounters abfsCounters =
|
||||
new AbfsInstrumentation(getFileSystem().getUri());
|
||||
new AbfsCountersImpl(getFileSystem().getUri());
|
||||
Map<String, Long> metricMap = abfsCounters.toMap();
|
||||
|
||||
for (Map.Entry<String, Long> entry : metricMap.entrySet()) {
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.fs.azurebfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
|
||||
|
||||
public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
|
||||
|
||||
private static final int LARGE_OPERATIONS = 1000;
|
||||
|
||||
public TestAbfsNetworkStatistics() throws Exception {
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to check correct values of read and write throttling statistics in
|
||||
* {@code AbfsClientThrottlingAnalyzer}.
|
||||
*/
|
||||
@Test
|
||||
public void testAbfsThrottlingStatistics() throws IOException {
|
||||
describe("Test to check correct values of read throttle and write "
|
||||
+ "throttle statistics in Abfs");
|
||||
|
||||
AbfsCounters statistics =
|
||||
new AbfsCountersImpl(getFileSystem().getUri());
|
||||
|
||||
/*
|
||||
* Calling the throttle methods to check correct summation and values of
|
||||
* the counters.
|
||||
*/
|
||||
for (int i = 0; i < LARGE_OPERATIONS; i++) {
|
||||
statistics.incrementCounter(AbfsStatistic.READ_THROTTLES, 1);
|
||||
statistics.incrementCounter(AbfsStatistic.WRITE_THROTTLES, 1);
|
||||
}
|
||||
|
||||
Map<String, Long> metricMap = statistics.toMap();
|
||||
|
||||
/*
|
||||
* Test to check read and write throttle statistics gave correct values for
|
||||
* 1000 calls.
|
||||
*/
|
||||
assertAbfsStatistics(AbfsStatistic.READ_THROTTLES, LARGE_OPERATIONS,
|
||||
metricMap);
|
||||
assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS,
|
||||
metricMap);
|
||||
}
|
||||
}
|
|
@ -43,7 +43,7 @@ public class TestAbfsStatistics extends AbstractAbfsIntegrationTest {
|
|||
describe("Testing the counter values after Abfs is initialised");
|
||||
|
||||
AbfsCounters instrumentation =
|
||||
new AbfsInstrumentation(getFileSystem().getUri());
|
||||
new AbfsCountersImpl(getFileSystem().getUri());
|
||||
|
||||
//Testing summation of the counter values.
|
||||
for (int i = 0; i < LARGE_OPS; i++) {
|
||||
|
|
|
@ -100,7 +100,7 @@ public final class TestAbfsClient {
|
|||
private String getUserAgentString(AbfsConfiguration config,
|
||||
boolean includeSSLProvider) throws MalformedURLException {
|
||||
AbfsClient client = new AbfsClient(new URL("https://azure.com"), null,
|
||||
config, null, (AccessTokenProvider) null, null);
|
||||
config, null, (AccessTokenProvider) null, null, null);
|
||||
String sslProviderName = null;
|
||||
if (includeSSLProvider) {
|
||||
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory()
|
||||
|
@ -267,7 +267,7 @@ public final class TestAbfsClient {
|
|||
(currentAuthType == AuthType.OAuth
|
||||
? abfsConfig.getTokenProvider()
|
||||
: null),
|
||||
tracker);
|
||||
tracker, null);
|
||||
|
||||
return testClient;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue