HADOOP-17471. ABFS to collect IOStatistics (#2731)

The ABFS Filesystem and its input and output streams now implement
the IOStatisticSource interface and provide IOStatistics on
their interactions with Azure Storage.

This includes the min/max/mean durations of all REST API calls.

Contributed by Mehakmeet Singh <mehakmeet.singh@cloudera.com>
This commit is contained in:
Mehakmeet Singh 2021-04-23 14:58:31 +05:30 committed by GitHub
parent dff95c5eca
commit 6085f09db5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 461 additions and 160 deletions

View File

@ -316,6 +316,30 @@ public final class StoreStatisticNames {
public static final String ACTION_HTTP_GET_REQUEST public static final String ACTION_HTTP_GET_REQUEST
= "action_http_get_request"; = "action_http_get_request";
/**
* An HTTP DELETE request was made: {@value}.
*/
public static final String ACTION_HTTP_DELETE_REQUEST
= "action_http_delete_request";
/**
* An HTTP PUT request was made: {@value}.
*/
public static final String ACTION_HTTP_PUT_REQUEST
= "action_http_put_request";
/**
* An HTTP PATCH request was made: {@value}.
*/
public static final String ACTION_HTTP_PATCH_REQUEST
= "action_http_patch_request";
/**
* An HTTP POST request was made: {@value}.
*/
public static final String ACTION_HTTP_POST_REQUEST
= "action_http_post_request";
/** /**
* An HTTP HEAD request was made: {@value}. * An HTTP HEAD request was made: {@value}.
*/ */

View File

@ -19,24 +19,23 @@
package org.apache.hadoop.fs.azurebfs; package org.apache.hadoop.fs.azurebfs;
import java.net.URI; import java.net.URI;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.metrics2.AbstractMetric; import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStoreBuilder;
import org.apache.hadoop.metrics2.MetricStringBuilder; import org.apache.hadoop.metrics2.MetricStringBuilder;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableMetric; import org.apache.hadoop.metrics2.lib.MutableMetric;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
/** /**
* Instrumentation of Abfs counters. * Instrumentation of Abfs counters.
@ -62,6 +61,8 @@ public class AbfsCountersImpl implements AbfsCounters {
private final MetricsRegistry registry = private final MetricsRegistry registry =
new MetricsRegistry("abfsMetrics").setContext(CONTEXT); new MetricsRegistry("abfsMetrics").setContext(CONTEXT);
private final IOStatisticsStore ioStatisticsStore;
private static final AbfsStatistic[] STATISTIC_LIST = { private static final AbfsStatistic[] STATISTIC_LIST = {
CALL_CREATE, CALL_CREATE,
CALL_OPEN, CALL_OPEN,
@ -85,7 +86,17 @@ public class AbfsCountersImpl implements AbfsCounters {
BYTES_SENT, BYTES_SENT,
BYTES_RECEIVED, BYTES_RECEIVED,
READ_THROTTLES, READ_THROTTLES,
WRITE_THROTTLES WRITE_THROTTLES,
SERVER_UNAVAILABLE
};
private static final AbfsStatistic[] DURATION_TRACKER_LIST = {
HTTP_HEAD_REQUEST,
HTTP_GET_REQUEST,
HTTP_DELETE_REQUEST,
HTTP_PUT_REQUEST,
HTTP_PATCH_REQUEST,
HTTP_POST_REQUEST
}; };
public AbfsCountersImpl(URI uri) { public AbfsCountersImpl(URI uri) {
@ -95,9 +106,17 @@ public class AbfsCountersImpl implements AbfsCounters {
fileSystemInstanceId.toString()); fileSystemInstanceId.toString());
registry.tag(METRIC_BUCKET, "Hostname from the FS URL", uri.getHost()); registry.tag(METRIC_BUCKET, "Hostname from the FS URL", uri.getHost());
IOStatisticsStoreBuilder ioStatisticsStoreBuilder = iostatisticsStore();
// Declaring the counters.
for (AbfsStatistic stats : STATISTIC_LIST) { for (AbfsStatistic stats : STATISTIC_LIST) {
ioStatisticsStoreBuilder.withCounters(stats.getStatName());
createCounter(stats); createCounter(stats);
} }
// Declaring the DurationTrackers.
for (AbfsStatistic durationStats : DURATION_TRACKER_LIST) {
ioStatisticsStoreBuilder.withDurationTracking(durationStats.getStatName());
}
ioStatisticsStore = ioStatisticsStoreBuilder.build();
} }
/** /**
@ -149,6 +168,7 @@ public class AbfsCountersImpl implements AbfsCounters {
*/ */
@Override @Override
public void incrementCounter(AbfsStatistic statistic, long value) { public void incrementCounter(AbfsStatistic statistic, long value) {
ioStatisticsStore.incrementCounter(statistic.getStatName(), value);
MutableCounterLong counter = lookupCounter(statistic.getStatName()); MutableCounterLong counter = lookupCounter(statistic.getStatName());
if (counter != null) { if (counter != null) {
counter.incr(value); counter.incr(value);
@ -189,98 +209,35 @@ public class AbfsCountersImpl implements AbfsCounters {
/** /**
* {@inheritDoc} * {@inheritDoc}
* *
* Creating a map of all the counters for testing. * Map of all the counters for testing.
* *
* @return a map of the metrics. * @return a map of the IOStatistics counters.
*/ */
@VisibleForTesting @VisibleForTesting
@Override @Override
public Map<String, Long> toMap() { public Map<String, Long> toMap() {
MetricsToMap metricBuilder = new MetricsToMap(null); return ioStatisticsStore.counters();
registry.snapshot(metricBuilder, true);
return metricBuilder.getMap();
}
protected static class MetricsToMap extends MetricsRecordBuilder {
private final MetricsCollector parent;
private final Map<String, Long> map =
new HashMap<>();
MetricsToMap(MetricsCollector parent) {
this.parent = parent;
}
@Override
public MetricsRecordBuilder tag(MetricsInfo info, String value) {
return this;
}
@Override
public MetricsRecordBuilder add(MetricsTag tag) {
return this;
}
@Override
public MetricsRecordBuilder add(AbstractMetric metric) {
return this;
}
@Override
public MetricsRecordBuilder setContext(String value) {
return this;
}
@Override
public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
return tuple(info, value);
}
@Override
public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
return tuple(info, value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
return tuple(info, value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
return tuple(info, value);
}
public MetricsToMap tuple(MetricsInfo info, long value) {
return tuple(info.name(), value);
}
public MetricsToMap tuple(String name, long value) {
map.put(name, value);
return this;
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
return tuple(info, (long) value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
return tuple(info, (long) value);
}
@Override
public MetricsCollector parent() {
return parent;
} }
/** /**
* Get the map. * Returning the instance of IOStatisticsStore used to collect the metrics
* in AbfsCounters.
* *
* @return the map of metrics. * @return instance of IOStatistics.
*/ */
public Map<String, Long> getMap() { @Override
return map; public IOStatistics getIOStatistics() {
} return ioStatisticsStore;
}
/**
* Tracks the duration of a statistic.
*
* @param key name of the statistic.
* @return DurationTracker for that statistic.
*/
@Override
public DurationTracker trackDuration(String key) {
return ioStatisticsStore.trackDuration(key);
} }
} }

View File

@ -18,7 +18,12 @@
package org.apache.hadoop.fs.azurebfs; package org.apache.hadoop.fs.azurebfs;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames; import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
/** /**
* Statistic which are collected in Abfs. * Statistic which are collected in Abfs.
@ -73,11 +78,45 @@ public enum AbfsStatistic {
READ_THROTTLES("read_throttles", READ_THROTTLES("read_throttles",
"Total number of times a read operation is throttled."), "Total number of times a read operation is throttled."),
WRITE_THROTTLES("write_throttles", WRITE_THROTTLES("write_throttles",
"Total number of times a write operation is throttled."); "Total number of times a write operation is throttled."),
SERVER_UNAVAILABLE("server_unavailable",
"Total number of times HTTP 503 status code is received in response."),
// HTTP Duration Trackers
HTTP_HEAD_REQUEST(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
"Time taken to complete a HEAD request",
AbfsHttpConstants.HTTP_METHOD_HEAD),
HTTP_GET_REQUEST(StoreStatisticNames.ACTION_HTTP_GET_REQUEST,
"Time taken to complete a GET request",
AbfsHttpConstants.HTTP_METHOD_GET),
HTTP_DELETE_REQUEST(StoreStatisticNames.ACTION_HTTP_DELETE_REQUEST,
"Time taken to complete a DELETE request",
AbfsHttpConstants.HTTP_METHOD_DELETE),
HTTP_PUT_REQUEST(StoreStatisticNames.ACTION_HTTP_PUT_REQUEST,
"Time taken to complete a PUT request",
AbfsHttpConstants.HTTP_METHOD_PUT),
HTTP_PATCH_REQUEST(StoreStatisticNames.ACTION_HTTP_PATCH_REQUEST,
"Time taken to complete a PATCH request",
AbfsHttpConstants.HTTP_METHOD_PATCH),
HTTP_POST_REQUEST(StoreStatisticNames.ACTION_HTTP_POST_REQUEST,
"Time taken to complete a POST request",
AbfsHttpConstants.HTTP_METHOD_POST);
private String statName; private String statName;
private String statDescription; private String statDescription;
//For http call stats only.
private String httpCall;
private static final Map<String, String> HTTP_CALL_TO_NAME_MAP = new HashMap<>();
static {
for (AbfsStatistic statistic : values()) {
if (statistic.getHttpCall() != null) {
HTTP_CALL_TO_NAME_MAP.put(statistic.getHttpCall(), statistic.getStatName());
}
}
}
/** /**
* Constructor of AbfsStatistic to set statistic name and description. * Constructor of AbfsStatistic to set statistic name and description.
* *
@ -89,6 +128,19 @@ public enum AbfsStatistic {
this.statDescription = statDescription; this.statDescription = statDescription;
} }
/**
* Constructor for AbfsStatistic for HTTP durationTrackers.
*
* @param statName Name of the statistic.
* @param statDescription Description of the statistic.
* @param httpCall HTTP call associated with the stat name.
*/
AbfsStatistic(String statName, String statDescription, String httpCall) {
this.statName = statName;
this.statDescription = statDescription;
this.httpCall = httpCall;
}
/** /**
* Getter for statistic name. * Getter for statistic name.
* *
@ -106,4 +158,23 @@ public enum AbfsStatistic {
public String getStatDescription() { public String getStatDescription() {
return statDescription; return statDescription;
} }
/**
* Getter for http call for HTTP duration trackers.
*
* @return http call of a statistic.
*/
public String getHttpCall() {
return httpCall;
}
/**
* Get the statistic name using the http call name.
*
* @param httpCall The HTTP call used to get the statistic name.
* @return Statistic name.
*/
public static String getStatNameFromHttpCall(String httpCall) {
return HTTP_CALL_TO_NAME_MAP.get(httpCall);
}
} }

View File

@ -82,6 +82,8 @@ import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
@ -93,13 +95,15 @@ import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*; import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
/** /**
* A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on <a * A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on <a
* href="http://store.azure.com/">Windows Azure</a> * href="http://store.azure.com/">Windows Azure</a>
*/ */
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class AzureBlobFileSystem extends FileSystem { public class AzureBlobFileSystem extends FileSystem
implements IOStatisticsSource {
public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class); public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class);
private URI uri; private URI uri;
private Path workingDir; private Path workingDir;
@ -162,11 +166,8 @@ public class AzureBlobFileSystem extends FileSystem {
sb.append("uri=").append(uri); sb.append("uri=").append(uri);
sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
if (abfsCounters != null) { sb.append(", \nIOStatistics: {").append(ioStatisticsToString(getIOStatistics()));
sb.append(", Statistics: {").append(abfsCounters.formString("{", "=",
"}", true));
sb.append("}"); sb.append("}");
}
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }
@ -425,8 +426,10 @@ public class AzureBlobFileSystem extends FileSystem {
* @param statistic the Statistic to be incremented. * @param statistic the Statistic to be incremented.
*/ */
private void incrementStatistic(AbfsStatistic statistic) { private void incrementStatistic(AbfsStatistic statistic) {
if (abfsCounters != null) {
abfsCounters.incrementCounter(statistic, 1); abfsCounters.incrementCounter(statistic, 1);
} }
}
/** /**
* Performs a check for (.) until root in the path to throw an exception. * Performs a check for (.) until root in the path to throw an exception.
@ -489,7 +492,9 @@ public class AzureBlobFileSystem extends FileSystem {
LOG.debug("AzureBlobFileSystem.close"); LOG.debug("AzureBlobFileSystem.close");
IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager); IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager);
this.isClosed = true; this.isClosed = true;
LOG.debug("Closing Abfs: " + toString()); if (LOG.isDebugEnabled()) {
LOG.debug("Closing Abfs: {}", toString());
}
} }
@Override @Override
@ -1311,6 +1316,12 @@ public class AzureBlobFileSystem extends FileSystem {
return abfsStore.getIsNamespaceEnabled(); return abfsStore.getIsNamespaceEnabled();
} }
/**
* Returns the counter() map in IOStatistics containing all the counters
* and their values.
*
* @return Map of IOStatistics counters.
*/
@VisibleForTesting @VisibleForTesting
Map<String, Long> getInstrumentationMap() { Map<String, Long> getInstrumentationMap() {
return abfsCounters.toMap(); return abfsCounters.toMap();
@ -1331,4 +1342,14 @@ public class AzureBlobFileSystem extends FileSystem {
return super.hasPathCapability(p, capability); return super.hasPathCapability(p, capability);
} }
} }
/**
* Getter for IOStatistic instance in AzureBlobFilesystem.
*
* @return the IOStatistic instance from abfsCounters.
*/
@Override
public IOStatistics getIOStatistics() {
return abfsCounters != null ? abfsCounters.getIOStatistics() : null;
}
} }

View File

@ -25,13 +25,16 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTest
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic; import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
/** /**
* An interface for Abfs counters. * An interface for Abfs counters.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public interface AbfsCounters { public interface AbfsCounters extends IOStatisticsSource, DurationTrackerFactory {
/** /**
* Increment a AbfsStatistic by a long value. * Increment a AbfsStatistic by a long value.
@ -63,4 +66,12 @@ public interface AbfsCounters {
@VisibleForTesting @VisibleForTesting
Map<String, Long> toMap(); Map<String, Long> toMap();
/**
* Start a DurationTracker for a request.
*
* @param key Name of the DurationTracker statistic.
* @return an instance of DurationTracker.
*/
@Override
DurationTracker trackDuration(String key);
} }

View File

@ -39,9 +39,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken; import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import static java.lang.Math.max; import static java.lang.Math.max;
import static java.lang.Math.min; import static java.lang.Math.min;
@ -485,10 +482,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
AbfsPerfTracker tracker = client.getAbfsPerfTracker(); AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) {
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = IOStatisticsBinding.trackDuration((IOStatisticsStore) ioStatistics, op = client.read(path, position, b, offset, length,
StoreStatisticNames.ACTION_HTTP_GET_REQUEST, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
() -> client.read(path, position, b, offset, length,
tolerateOobAppends ? "*" : eTag, cachedSasToken.get()));
cachedSasToken.update(op.getSasToken()); cachedSasToken.update(op.getSasToken());
if (streamStatistics != null) { if (streamStatistics != null) {
streamStatistics.remoteReadOperation(); streamStatistics.remoteReadOperation();

View File

@ -45,9 +45,6 @@ import org.apache.hadoop.fs.azurebfs.utils.CachedSASToken;
import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSExceptionMessages;
@ -450,11 +447,9 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
} }
} }
final Future<Void> job = final Future<Void> job =
completionService.submit(IOStatisticsBinding completionService.submit(() -> {
.trackDurationOfCallable((IOStatisticsStore) ioStatistics, AbfsPerfTracker tracker =
StreamStatisticNames.TIME_SPENT_ON_PUT_REQUEST, client.getAbfsPerfTracker();
() -> {
AbfsPerfTracker tracker = client.getAbfsPerfTracker();
try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker,
"writeCurrentBufferToService", "append")) { "writeCurrentBufferToService", "append")) {
AppendRequestParameters.Mode AppendRequestParameters.Mode
@ -474,8 +469,7 @@ public class AbfsOutputStream extends OutputStream implements Syncable,
perfInfo.registerSuccess(true); perfInfo.registerSuccess(true);
return null; return null;
} }
}) });
);
if (outputStreamStatistics != null) { if (outputStreamStatistics != null) {
if (job.isCancelled()) { if (job.isCancelled()) {

View File

@ -19,12 +19,12 @@
package org.apache.hadoop.fs.azurebfs.services; package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.List; import java.util.List;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -34,6 +34,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationExcep
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
/** /**
* The AbfsRestOperation for Rest AbfsClient. * The AbfsRestOperation for Rest AbfsClient.
@ -167,12 +168,30 @@ public class AbfsRestOperation {
this.abfsCounters = client.getAbfsCounters(); this.abfsCounters = client.getAbfsCounters();
} }
/**
* Execute a AbfsRestOperation. Track the Duration of a request if
* abfsCounters isn't null.
*
*/
public void execute() throws AzureBlobFileSystemException {
try {
IOStatisticsBinding.trackDurationOfInvocation(abfsCounters,
AbfsStatistic.getStatNameFromHttpCall(method),
() -> completeExecute());
} catch (AzureBlobFileSystemException aze) {
throw aze;
} catch (IOException e) {
throw new UncheckedIOException("Error while tracking Duration of an "
+ "AbfsRestOperation call", e);
}
}
/** /**
* Executes the REST operation with retry, by issuing one or more * Executes the REST operation with retry, by issuing one or more
* HTTP operations. * HTTP operations.
*/ */
@VisibleForTesting private void completeExecute() throws AzureBlobFileSystemException {
public void execute() throws AzureBlobFileSystemException {
// see if we have latency reports from the previous requests // see if we have latency reports from the previous requests
String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency(); String latencyHeader = this.client.getAbfsPerfTracker().getClientLatency();
if (latencyHeader != null && !latencyHeader.isEmpty()) { if (latencyHeader != null && !latencyHeader.isEmpty()) {
@ -259,6 +278,8 @@ public class AbfsRestOperation {
&& httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) { && httpOperation.getStatusCode() <= HttpURLConnection.HTTP_PARTIAL) {
incrementCounter(AbfsStatistic.BYTES_RECEIVED, incrementCounter(AbfsStatistic.BYTES_RECEIVED,
httpOperation.getBytesReceived()); httpOperation.getBytesReceived());
} else if (httpOperation.getStatusCode() == HttpURLConnection.HTTP_UNAVAILABLE) {
incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1);
} }
} catch (UnknownHostException ex) { } catch (UnknownHostException ex) {
String hostname = null; String hostname = null;

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_DELETE_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_HEAD_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PUT_REQUEST;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
public class ITestAbfsDurationTrackers extends AbstractAbfsIntegrationTest {
private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsDurationTrackers.class);
private static final AbfsStatistic[] HTTP_DURATION_TRACKER_LIST = {
HTTP_HEAD_REQUEST,
HTTP_GET_REQUEST,
HTTP_DELETE_REQUEST,
HTTP_PUT_REQUEST,
};
public ITestAbfsDurationTrackers() throws Exception {
}
/**
* Test to check if DurationTrackers for Abfs HTTP calls work correctly and
* track the duration of the http calls.
*/
@Test
public void testAbfsHttpCallsDurations() throws IOException {
describe("test to verify if the DurationTrackers for abfs http calls "
+ "work as expected.");
AzureBlobFileSystem fs = getFileSystem();
Path testFilePath = path(getMethodName());
// Declaring output and input stream.
AbfsOutputStream out = null;
AbfsInputStream in = null;
try {
// PUT the file.
out = createAbfsOutputStreamWithFlushEnabled(fs, testFilePath);
out.write('a');
out.hflush();
// GET the file.
in = fs.getAbfsStore().openFileForRead(testFilePath, fs.getFsStatistics());
int res = in.read();
LOG.info("Result of Read: {}", res);
// DELETE the file.
fs.delete(testFilePath, false);
// extract the IOStatistics from the filesystem.
IOStatistics ioStatistics = extractStatistics(fs);
LOG.info(ioStatisticsToPrettyString(ioStatistics));
assertDurationTracker(ioStatistics);
} finally {
IOUtils.cleanupWithLogger(LOG, out, in);
}
}
/**
* A method to assert that all the DurationTrackers for the http calls are
* working correctly.
*
* @param ioStatistics the IOStatisticsSource in use.
*/
private void assertDurationTracker(IOStatistics ioStatistics) {
for (AbfsStatistic abfsStatistic : HTTP_DURATION_TRACKER_LIST) {
Assertions.assertThat(lookupMeanStatistic(ioStatistics,
abfsStatistic.getStatName() + StoreStatisticNames.SUFFIX_MEAN).mean())
.describedAs("The DurationTracker Named " + abfsStatistic.getStatName()
+ " Doesn't match the expected value.")
.isGreaterThan(0.0);
}
}
}

View File

@ -31,8 +31,14 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl; import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation; import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
public class ITestAbfsInputStreamStatistics public class ITestAbfsInputStreamStatistics
extends AbstractAbfsIntegrationTest { extends AbstractAbfsIntegrationTest {
private static final int OPERATIONS = 10; private static final int OPERATIONS = 10;
@ -386,12 +392,13 @@ public class ITestAbfsInputStreamStatistics
abfsInputStream = abfsInputStream =
abfss.openFileForRead(actionHttpGetRequestPath, fs.getFsStatistics()); abfss.openFileForRead(actionHttpGetRequestPath, fs.getFsStatistics());
abfsInputStream.read(); abfsInputStream.read();
AbfsInputStreamStatisticsImpl abfsInputStreamStatistics = IOStatistics ioStatistics = extractStatistics(fs);
(AbfsInputStreamStatisticsImpl) abfsInputStream.getStreamStatistics(); LOG.info("AbfsInputStreamStats info: {}",
ioStatisticsToPrettyString(ioStatistics));
LOG.info("AbfsInputStreamStats info: {}", abfsInputStreamStatistics.toString());
Assertions.assertThat( Assertions.assertThat(
abfsInputStreamStatistics.getActionHttpGetRequest()) lookupMeanStatistic(ioStatistics,
AbfsStatistic.HTTP_GET_REQUEST.getStatName()
+ StoreStatisticNames.SUFFIX_MEAN).mean())
.describedAs("Mismatch in time taken by a GET request") .describedAs("Mismatch in time taken by a GET request")
.isGreaterThan(0.0); .isGreaterThan(0.0);
} finally { } finally {

View File

@ -28,6 +28,12 @@ import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl; import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
/** /**
* Test AbfsOutputStream statistics. * Test AbfsOutputStream statistics.
@ -241,10 +247,13 @@ public class ITestAbfsOutputStreamStatistics
outputStream.write('a'); outputStream.write('a');
outputStream.hflush(); outputStream.hflush();
AbfsOutputStreamStatisticsImpl abfsOutputStreamStatistics = IOStatistics ioStatistics = extractStatistics(fs);
getAbfsOutputStreamStatistics(outputStream); LOG.info("AbfsOutputStreamStats info: {}",
LOG.info("AbfsOutputStreamStats info: {}", abfsOutputStreamStatistics.toString()); ioStatisticsToPrettyString(ioStatistics));
Assertions.assertThat(abfsOutputStreamStatistics.getTimeSpentOnPutRequest()) Assertions.assertThat(
lookupMeanStatistic(ioStatistics,
AbfsStatistic.HTTP_PUT_REQUEST.getStatName()
+ StoreStatisticNames.SUFFIX_MEAN).mean())
.describedAs("Mismatch in timeSpentOnPutRequest DurationTracker") .describedAs("Mismatch in timeSpentOnPutRequest DurationTracker")
.isGreaterThan(0.0); .isGreaterThan(0.0);
} }

View File

@ -26,6 +26,7 @@ import org.junit.Test;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
/** /**
* Tests AzureBlobFileSystem Statistics. * Tests AzureBlobFileSystem Statistics.
@ -46,14 +47,21 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
AbfsCounters abfsCounters = AbfsCounters abfsCounters =
new AbfsCountersImpl(getFileSystem().getUri()); new AbfsCountersImpl(getFileSystem().getUri());
Map<String, Long> metricMap = abfsCounters.toMap(); IOStatistics ioStatistics = abfsCounters.getIOStatistics();
for (Map.Entry<String, Long> entry : metricMap.entrySet()) { //Initial value verification for counters
String key = entry.getKey(); for (Map.Entry<String, Long> entry : ioStatistics.counters().entrySet()) {
Long value = entry.getValue(); checkInitialValue(entry.getKey(), entry.getValue(), 0);
}
//Verify if initial value of statistic is 0. //Initial value verification for gauges
checkInitialValue(key, value); for (Map.Entry<String, Long> entry : ioStatistics.gauges().entrySet()) {
checkInitialValue(entry.getKey(), entry.getValue(), 0);
}
//Initial value verifications for DurationTrackers
for (Map.Entry<String, Long> entry : ioStatistics.maximums().entrySet()) {
checkInitialValue(entry.getKey(), entry.getValue(), -1);
} }
} }
@ -251,8 +259,10 @@ public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
* *
* @param statName name of the statistic to be checked. * @param statName name of the statistic to be checked.
* @param statValue value of the statistic. * @param statValue value of the statistic.
* @param expectedInitialValue initial value expected from this statistic.
*/ */
private void checkInitialValue(String statName, long statValue) { private void checkInitialValue(String statName, long statValue,
assertEquals("Mismatch in " + statName, 0, statValue); long expectedInitialValue) {
assertEquals("Mismatch in " + statName, expectedInitialValue, statValue);
} }
} }

View File

@ -39,13 +39,12 @@ public class ITestAbfsStreamStatistics extends AbstractAbfsIntegrationTest {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(ITestAbfsStreamStatistics.class); LoggerFactory.getLogger(ITestAbfsStreamStatistics.class);
private static final int LARGE_NUMBER_OF_OPS = 999999; private static final int LARGE_NUMBER_OF_OPS = 99;
/*** /***
* Testing {@code incrementReadOps()} in class {@code AbfsInputStream} and * Testing {@code incrementReadOps()} in class {@code AbfsInputStream} and
* {@code incrementWriteOps()} in class {@code AbfsOutputStream}. * {@code incrementWriteOps()} in class {@code AbfsOutputStream}.
* *
* @throws Exception
*/ */
@Test @Test
public void testAbfsStreamOps() throws Exception { public void testAbfsStreamOps() throws Exception {

View File

@ -21,13 +21,31 @@ package org.apache.hadoop.fs.azurebfs;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import org.assertj.core.api.Assertions;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters; import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StoreStatisticNames;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_PATCH_REQUEST;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.HTTP_POST_REQUEST;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic;
public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest { public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
private static final Logger LOG =
LoggerFactory.getLogger(TestAbfsNetworkStatistics.class);
private static final int LARGE_OPERATIONS = 1000; private static final int LARGE_OPERATIONS = 1000;
private static final AbfsStatistic[] HTTP_DURATION_TRACKER_LIST = {
HTTP_POST_REQUEST,
HTTP_PATCH_REQUEST
};
public TestAbfsNetworkStatistics() throws Exception { public TestAbfsNetworkStatistics() throws Exception {
} }
@ -64,4 +82,58 @@ public class TestAbfsNetworkStatistics extends AbstractAbfsIntegrationTest {
assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS, assertAbfsStatistics(AbfsStatistic.WRITE_THROTTLES, LARGE_OPERATIONS,
metricMap); metricMap);
} }
/**
* Test to check if the DurationTrackers are tracking as expected whilst
* doing some work.
*/
@Test
public void testAbfsNetworkDurationTrackers()
throws IOException, InterruptedException {
describe("Test to verify the actual values of DurationTrackers are "
+ "greater than 0.0 while tracking some work.");
AbfsCounters abfsCounters = new AbfsCountersImpl(getFileSystem().getUri());
// Start dummy work for the DurationTrackers and start tracking.
try (DurationTracker ignoredPatch =
abfsCounters.trackDuration(AbfsStatistic.getStatNameFromHttpCall(AbfsHttpConstants.HTTP_METHOD_PATCH));
DurationTracker ignoredPost =
abfsCounters.trackDuration(AbfsStatistic.getStatNameFromHttpCall(AbfsHttpConstants.HTTP_METHOD_POST))
) {
// Emulates doing some work.
Thread.sleep(10);
LOG.info("Execute some Http requests...");
}
// Extract the iostats from the abfsCounters instance.
IOStatistics ioStatistics = extractStatistics(abfsCounters);
// Asserting that the durationTrackers have mean > 0.0.
for (AbfsStatistic abfsStatistic : HTTP_DURATION_TRACKER_LIST) {
Assertions.assertThat(lookupMeanStatistic(ioStatistics,
abfsStatistic.getStatName() + StoreStatisticNames.SUFFIX_MEAN).mean())
.describedAs("The DurationTracker Named " + abfsStatistic.getStatName()
+ " Doesn't match the expected value")
.isGreaterThan(0.0);
}
}
/**
* Test to check if abfs counter for HTTP 503 statusCode works correctly
* when incremented.
*/
@Test
public void testAbfsHTTP503ErrorCounter() throws IOException {
describe("tests to verify the expected value of the HTTP 503 error "
+ "counter is equal to number of times incremented.");
AbfsCounters abfsCounters = new AbfsCountersImpl(getFileSystem().getUri());
// Incrementing the server_unavailable counter.
for (int i = 0; i < LARGE_OPERATIONS; i++) {
abfsCounters.incrementCounter(AbfsStatistic.SERVER_UNAVAILABLE, 1);
}
// Getting the IOStatistics counter map from abfsCounters.
Map<String, Long> metricsMap = abfsCounters.toMap();
assertAbfsStatistics(AbfsStatistic.SERVER_UNAVAILABLE, LARGE_OPERATIONS,
metricsMap);
}
} }