HADOOP-17016. Adding Common Counters in ABFS (#1991).

Contributed by: Mehakmeet Singh.

Change-Id: Ib84e7a42f28e064df4c6204fcce33e573360bf42
This commit is contained in:
Mehakmeet Singh 2020-06-02 18:31:35 +01:00 committed by Steve Loughran
parent 8a642caca8
commit 1714589609
No known key found for this signature in database
GPG Key ID: D22CF846DBB162A0
7 changed files with 838 additions and 8 deletions

View File

@ -0,0 +1,279 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricStringBuilder;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableMetric;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
/**
* Instrumentation of Abfs counters.
*/
public class AbfsInstrumentation implements AbfsCounters {
/**
* Single context for all the Abfs counters to separate them from other
* counters.
*/
private static final String CONTEXT = "AbfsContext";
/**
* The name of a field added to metrics records that uniquely identifies a
* specific FileSystem instance.
*/
private static final String REGISTRY_ID = "AbfsID";
/**
* The name of a field added to metrics records that indicates the hostname
* portion of the FS URL.
*/
private static final String METRIC_BUCKET = "AbfsBucket";
private final MetricsRegistry registry =
new MetricsRegistry("abfsMetrics").setContext(CONTEXT);
private static final AbfsStatistic[] STATISTIC_LIST = {
CALL_CREATE,
CALL_OPEN,
CALL_GET_FILE_STATUS,
CALL_APPEND,
CALL_CREATE_NON_RECURSIVE,
CALL_DELETE,
CALL_EXIST,
CALL_GET_DELEGATION_TOKEN,
CALL_LIST_STATUS,
CALL_MKDIRS,
CALL_RENAME,
DIRECTORIES_CREATED,
DIRECTORIES_DELETED,
FILES_CREATED,
FILES_DELETED,
ERROR_IGNORED
};
public AbfsInstrumentation(URI uri) {
UUID fileSystemInstanceId = UUID.randomUUID();
registry.tag(REGISTRY_ID,
"A unique identifier for the instance",
fileSystemInstanceId.toString());
registry.tag(METRIC_BUCKET, "Hostname from the FS URL", uri.getHost());
for (AbfsStatistic stats : STATISTIC_LIST) {
createCounter(stats);
}
}
/**
* Look up a Metric from registered set.
*
* @param name name of metric.
* @return the metric or null.
*/
private MutableMetric lookupMetric(String name) {
return getRegistry().get(name);
}
/**
* Look up counter by name.
*
* @param name name of counter.
* @return counter if found, else null.
*/
private MutableCounterLong lookupCounter(String name) {
MutableMetric metric = lookupMetric(name);
if (metric == null) {
return null;
}
if (!(metric instanceof MutableCounterLong)) {
throw new IllegalStateException("Metric " + name
+ " is not a MutableCounterLong: " + metric);
}
return (MutableCounterLong) metric;
}
/**
* Create a counter in the registry.
*
* @param stats AbfsStatistic whose counter needs to be made.
* @return counter or null.
*/
private MutableCounterLong createCounter(AbfsStatistic stats) {
return registry.newCounter(stats.getStatName(),
stats.getStatDescription(), 0L);
}
/**
* {@inheritDoc}
*
* Increment a statistic with some value.
*
* @param statistic AbfsStatistic need to be incremented.
* @param value long value to be incremented by.
*/
@Override
public void incrementCounter(AbfsStatistic statistic, long value) {
MutableCounterLong counter = lookupCounter(statistic.getStatName());
if (counter != null) {
counter.incr(value);
}
}
/**
* Getter for MetricRegistry.
*
* @return MetricRegistry or null.
*/
private MetricsRegistry getRegistry() {
return registry;
}
/**
* {@inheritDoc}
*
* Method to aggregate all the counters in the MetricRegistry and form a
* string with prefix, separator and suffix.
*
* @param prefix string that would be before metric.
* @param separator string that would be between metric name and value.
* @param suffix string that would be after metric value.
* @param all gets all the values even if unchanged.
* @return a String with all the metrics and their values.
*/
@Override
public String formString(String prefix, String separator, String suffix,
boolean all) {
MetricStringBuilder metricStringBuilder = new MetricStringBuilder(null,
prefix, separator, suffix);
registry.snapshot(metricStringBuilder, all);
return metricStringBuilder.toString();
}
/**
* {@inheritDoc}
*
* Creating a map of all the counters for testing.
*
* @return a map of the metrics.
*/
@VisibleForTesting
@Override
public Map<String, Long> toMap() {
MetricsToMap metricBuilder = new MetricsToMap(null);
registry.snapshot(metricBuilder, true);
return metricBuilder.getMap();
}
protected static class MetricsToMap extends MetricsRecordBuilder {
private final MetricsCollector parent;
private final Map<String, Long> map =
new HashMap<>();
MetricsToMap(MetricsCollector parent) {
this.parent = parent;
}
@Override
public MetricsRecordBuilder tag(MetricsInfo info, String value) {
return this;
}
@Override
public MetricsRecordBuilder add(MetricsTag tag) {
return this;
}
@Override
public MetricsRecordBuilder add(AbstractMetric metric) {
return this;
}
@Override
public MetricsRecordBuilder setContext(String value) {
return this;
}
@Override
public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
return tuple(info, value);
}
@Override
public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
return tuple(info, value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
return tuple(info, value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
return tuple(info, value);
}
public MetricsToMap tuple(MetricsInfo info, long value) {
return tuple(info.name(), value);
}
public MetricsToMap tuple(String name, long value) {
map.put(name, value);
return this;
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
return tuple(info, (long) value);
}
@Override
public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
return tuple(info, (long) value);
}
@Override
public MetricsCollector parent() {
return parent;
}
/**
* Get the map.
*
* @return the map of metrics.
*/
public Map<String, Long> getMap() {
return map;
}
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import org.apache.hadoop.fs.StorageStatistics.CommonStatisticNames;
/**
* Statistic which are collected in Abfs.
* Available as metrics in {@link AbfsInstrumentation}.
*/
public enum AbfsStatistic {
CALL_CREATE(CommonStatisticNames.OP_CREATE,
"Calls of create()."),
CALL_OPEN(CommonStatisticNames.OP_OPEN,
"Calls of open()."),
CALL_GET_FILE_STATUS(CommonStatisticNames.OP_GET_FILE_STATUS,
"Calls of getFileStatus()."),
CALL_APPEND(CommonStatisticNames.OP_APPEND,
"Calls of append()."),
CALL_CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE,
"Calls of createNonRecursive()."),
CALL_DELETE(CommonStatisticNames.OP_DELETE,
"Calls of delete()."),
CALL_EXIST(CommonStatisticNames.OP_EXISTS,
"Calls of exist()."),
CALL_GET_DELEGATION_TOKEN(CommonStatisticNames.OP_GET_DELEGATION_TOKEN,
"Calls of getDelegationToken()."),
CALL_LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS,
"Calls of listStatus()."),
CALL_MKDIRS(CommonStatisticNames.OP_MKDIRS,
"Calls of mkdirs()."),
CALL_RENAME(CommonStatisticNames.OP_RENAME,
"Calls of rename()."),
DIRECTORIES_CREATED("directories_created",
"Total number of directories created through the object store."),
DIRECTORIES_DELETED("directories_deleted",
"Total number of directories deleted through the object store."),
FILES_CREATED("files_created",
"Total number of files created through the object store."),
FILES_DELETED("files_deleted",
"Total number of files deleted from the object store."),
ERROR_IGNORED("error_ignored",
"Errors caught and ignored.");
private String statName;
private String statDescription;
/**
* Constructor of AbfsStatistic to set statistic name and description.
*
* @param statName Name of the statistic.
* @param statDescription Description of the statistic.
*/
AbfsStatistic(String statName, String statDescription) {
this.statName = statName;
this.statDescription = statDescription;
}
/**
* Getter for statistic name.
*
* @return Name of statistic.
*/
public String getStatName() {
return statName;
}
/**
* Getter for statistic description.
*
* @return Description of statistic.
*/
public String getStatDescription() {
return statDescription;
}
}

View File

@ -30,6 +30,7 @@
import java.util.List; import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -68,6 +69,7 @@
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
@ -78,6 +80,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
/** /**
@ -94,6 +97,7 @@ public class AzureBlobFileSystem extends FileSystem {
private boolean delegationTokenEnabled = false; private boolean delegationTokenEnabled = false;
private AbfsDelegationTokenManager delegationTokenManager; private AbfsDelegationTokenManager delegationTokenManager;
private AbfsCounters instrumentation;
@Override @Override
public void initialize(URI uri, Configuration configuration) public void initialize(URI uri, Configuration configuration)
@ -109,7 +113,7 @@ public void initialize(URI uri, Configuration configuration)
LOG.trace("AzureBlobFileSystemStore init complete"); LOG.trace("AzureBlobFileSystemStore init complete");
final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration(); final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration();
instrumentation = new AbfsInstrumentation(uri);
this.setWorkingDirectory(this.getHomeDirectory()); this.setWorkingDirectory(this.getHomeDirectory());
if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) { if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) {
@ -146,6 +150,11 @@ public String toString() {
sb.append("uri=").append(uri); sb.append("uri=").append(uri);
sb.append(", user='").append(abfsStore.getUser()).append('\''); sb.append(", user='").append(abfsStore.getUser()).append('\'');
sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\'');
if (instrumentation != null) {
sb.append(", Statistics: {").append(instrumentation.formString("{", "=",
"}", true));
sb.append("}");
}
sb.append('}'); sb.append('}');
return sb.toString(); return sb.toString();
} }
@ -162,7 +171,7 @@ public URI getUri() {
@Override @Override
public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { public FSDataInputStream open(final Path path, final int bufferSize) throws IOException {
LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize); LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize);
statIncrement(CALL_OPEN);
Path qualifiedPath = makeQualified(path); Path qualifiedPath = makeQualified(path);
try { try {
@ -183,6 +192,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
overwrite, overwrite,
blockSize); blockSize);
statIncrement(CALL_CREATE);
trailingPeriodCheck(f); trailingPeriodCheck(f);
Path qualifiedPath = makeQualified(f); Path qualifiedPath = makeQualified(f);
@ -190,6 +200,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, fi
try { try {
OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite, OutputStream outputStream = abfsStore.createFile(qualifiedPath, statistics, overwrite,
permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf())); permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf()));
statIncrement(FILES_CREATED);
return new FSDataOutputStream(outputStream, statistics); return new FSDataOutputStream(outputStream, statistics);
} catch(AzureBlobFileSystemException ex) { } catch(AzureBlobFileSystemException ex) {
checkException(f, ex); checkException(f, ex);
@ -203,6 +214,7 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final boolean overwrite, final int bufferSize, final short replication, final long blockSize,
final Progressable progress) throws IOException { final Progressable progress) throws IOException {
statIncrement(CALL_CREATE_NON_RECURSIVE);
final Path parent = f.getParent(); final Path parent = f.getParent();
final FileStatus parentFileStatus = tryGetFileStatus(parent); final FileStatus parentFileStatus = tryGetFileStatus(parent);
@ -246,7 +258,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
"AzureBlobFileSystem.append path: {} bufferSize: {}", "AzureBlobFileSystem.append path: {} bufferSize: {}",
f.toString(), f.toString(),
bufferSize); bufferSize);
statIncrement(CALL_APPEND);
Path qualifiedPath = makeQualified(f); Path qualifiedPath = makeQualified(f);
try { try {
@ -261,7 +273,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, final Progr
public boolean rename(final Path src, final Path dst) throws IOException { public boolean rename(final Path src, final Path dst) throws IOException {
LOG.debug( LOG.debug(
"AzureBlobFileSystem.rename src: {} dst: {}", src.toString(), dst.toString()); "AzureBlobFileSystem.rename src: {} dst: {}", src.toString(), dst.toString());
statIncrement(CALL_RENAME);
trailingPeriodCheck(dst); trailingPeriodCheck(dst);
Path parentFolder = src.getParent(); Path parentFolder = src.getParent();
@ -328,7 +340,7 @@ public boolean rename(final Path src, final Path dst) throws IOException {
public boolean delete(final Path f, final boolean recursive) throws IOException { public boolean delete(final Path f, final boolean recursive) throws IOException {
LOG.debug( LOG.debug(
"AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive); "AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive);
statIncrement(CALL_DELETE);
Path qualifiedPath = makeQualified(f); Path qualifiedPath = makeQualified(f);
if (f.isRoot()) { if (f.isRoot()) {
@ -353,7 +365,7 @@ public boolean delete(final Path f, final boolean recursive) throws IOException
public FileStatus[] listStatus(final Path f) throws IOException { public FileStatus[] listStatus(final Path f) throws IOException {
LOG.debug( LOG.debug(
"AzureBlobFileSystem.listStatus path: {}", f.toString()); "AzureBlobFileSystem.listStatus path: {}", f.toString());
statIncrement(CALL_LIST_STATUS);
Path qualifiedPath = makeQualified(f); Path qualifiedPath = makeQualified(f);
try { try {
@ -365,6 +377,24 @@ public FileStatus[] listStatus(final Path f) throws IOException {
} }
} }
/**
* Increment of an Abfs statistic.
*
* @param statistic AbfsStatistic that needs increment.
*/
private void statIncrement(AbfsStatistic statistic) {
incrementStatistic(statistic);
}
/**
* Method for incrementing AbfsStatistic by a long value.
*
* @param statistic the Statistic to be incremented.
*/
private void incrementStatistic(AbfsStatistic statistic) {
instrumentation.incrementCounter(statistic, 1);
}
/** /**
* Performs a check for (.) until root in the path to throw an exception. * Performs a check for (.) until root in the path to throw an exception.
* The purpose is to differentiate between dir/dir1 and dir/dir1. * The purpose is to differentiate between dir/dir1 and dir/dir1.
@ -394,7 +424,7 @@ private void trailingPeriodCheck(Path path) throws IllegalArgumentException {
public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { public boolean mkdirs(final Path f, final FsPermission permission) throws IOException {
LOG.debug( LOG.debug(
"AzureBlobFileSystem.mkdirs path: {} permissions: {}", f, permission); "AzureBlobFileSystem.mkdirs path: {} permissions: {}", f, permission);
statIncrement(CALL_MKDIRS);
trailingPeriodCheck(f); trailingPeriodCheck(f);
final Path parentFolder = f.getParent(); final Path parentFolder = f.getParent();
@ -408,6 +438,7 @@ public boolean mkdirs(final Path f, final FsPermission permission) throws IOExce
try { try {
abfsStore.createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission, abfsStore.createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission,
FsPermission.getUMask(getConf())); FsPermission.getUMask(getConf()));
statIncrement(DIRECTORIES_CREATED);
return true; return true;
} catch (AzureBlobFileSystemException ex) { } catch (AzureBlobFileSystemException ex) {
checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS); checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS);
@ -425,12 +456,13 @@ public synchronized void close() throws IOException {
LOG.debug("AzureBlobFileSystem.close"); LOG.debug("AzureBlobFileSystem.close");
IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager); IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager);
this.isClosed = true; this.isClosed = true;
LOG.debug("Closing Abfs: " + toString());
} }
@Override @Override
public FileStatus getFileStatus(final Path f) throws IOException { public FileStatus getFileStatus(final Path f) throws IOException {
LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f); LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f);
statIncrement(CALL_GET_FILE_STATUS);
Path qualifiedPath = makeQualified(f); Path qualifiedPath = makeQualified(f);
try { try {
@ -567,6 +599,11 @@ private boolean deleteRoot() throws IOException {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
delete(fs.getPath(), fs.isDirectory()); delete(fs.getPath(), fs.isDirectory());
if (fs.isDirectory()) {
statIncrement(DIRECTORIES_DELETED);
} else {
statIncrement(FILES_DELETED);
}
return null; return null;
} }
}); });
@ -926,11 +963,25 @@ public void access(final Path path, final FsAction mode) throws IOException {
} }
} }
/**
* Incrementing exists() calls from superclass for statistic collection.
*
* @param f source path.
* @return true if the path exists.
* @throws IOException
*/
@Override
public boolean exists(Path f) throws IOException {
statIncrement(CALL_EXIST);
return super.exists(f);
}
private FileStatus tryGetFileStatus(final Path f) { private FileStatus tryGetFileStatus(final Path f) {
try { try {
return getFileStatus(f); return getFileStatus(f);
} catch (IOException ex) { } catch (IOException ex) {
LOG.debug("File not found {}", f); LOG.debug("File not found {}", f);
statIncrement(ERROR_IGNORED);
return null; return null;
} }
} }
@ -947,6 +998,7 @@ private boolean fileSystemExists() throws IOException {
// there is not way to get the storage error code // there is not way to get the storage error code
// workaround here is to check its status code. // workaround here is to check its status code.
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
statIncrement(ERROR_IGNORED);
return false; return false;
} }
} }
@ -1120,6 +1172,7 @@ private Throwable getRootCause(Throwable throwable) {
*/ */
@Override @Override
public synchronized Token<?> getDelegationToken(final String renewer) throws IOException { public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
statIncrement(CALL_GET_DELEGATION_TOKEN);
return this.delegationTokenEnabled ? this.delegationTokenManager.getDelegationToken(renewer) return this.delegationTokenEnabled ? this.delegationTokenManager.getDelegationToken(renewer)
: super.getDelegationToken(renewer); : super.getDelegationToken(renewer);
} }
@ -1182,6 +1235,11 @@ boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException {
return abfsStore.getIsNamespaceEnabled(); return abfsStore.getIsNamespaceEnabled();
} }
@VisibleForTesting
Map<String, Long> getInstrumentationMap() {
return instrumentation.toMap();
}
@Override @Override
public boolean hasPathCapability(final Path path, final String capability) public boolean hasPathCapability(final Path path, final String capability)
throws IOException { throws IOException {

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.azurebfs.AbfsStatistic;
/**
* An interface for Abfs counters.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface AbfsCounters {
/**
* Increment a AbfsStatistic by a long value.
*
* @param statistic AbfsStatistic to be incremented.
* @param value the value to increment the statistic by.
*/
void incrementCounter(AbfsStatistic statistic, long value);
/**
* Form a String of the all the statistics and present in an organized manner.
*
* @param prefix the prefix to be set.
* @param separator the separator between the statistic name and value.
* @param suffix the suffix to be used.
* @param all enable all the statistics to be displayed or not.
* @return String of all the statistics and their values.
*/
String formString(String prefix, String separator, String suffix,
boolean all);
/**
* Convert all the statistics into a key-value pair map to be used for
* testing.
*
* @return map with statistic name as key and statistic value as the map
* value.
*/
@VisibleForTesting
Map<String, Long> toMap();
}

View File

@ -21,6 +21,7 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Hashtable; import java.util.Hashtable;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
@ -409,4 +410,18 @@ protected AbfsOutputStream createAbfsOutputStreamWithFlushEnabled(
return (AbfsOutputStream) abfss.createFile(path, fs.getFsStatistics(), return (AbfsOutputStream) abfss.createFile(path, fs.getFsStatistics(),
true, FsPermission.getDefault(), FsPermission.getUMask(fs.getConf())); true, FsPermission.getDefault(), FsPermission.getUMask(fs.getConf()));
} }
/**
* Custom assertion for AbfsStatistics which have statistics, expected
* value and map of statistics and value as its parameters.
* @param statistic the AbfsStatistics which needs to be asserted.
* @param expectedValue the expected value of the statistics.
* @param metricMap map of (String, Long) with statistics name as key and
* statistics value as map value.
*/
protected void assertAbfsStatistics(AbfsStatistic statistic,
long expectedValue, Map<String, Long> metricMap) {
assertEquals("Mismatch in " + statistic.getStatName(), expectedValue,
(long) metricMap.get(statistic.getStatName()));
}
} }

View File

@ -0,0 +1,258 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.util.Map;
import org.junit.Test;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
import org.apache.hadoop.fs.permission.FsPermission;
/**
* Tests AzureBlobFileSystem Statistics.
*/
public class ITestAbfsStatistics extends AbstractAbfsIntegrationTest {
private static final int NUMBER_OF_OPS = 10;
public ITestAbfsStatistics() throws Exception {
}
/**
* Testing the initial value of statistics.
*/
@Test
public void testInitialStatsValues() throws IOException {
describe("Testing the initial values of Abfs counters");
AbfsCounters abfsCounters =
new AbfsInstrumentation(getFileSystem().getUri());
Map<String, Long> metricMap = abfsCounters.toMap();
for (Map.Entry<String, Long> entry : metricMap.entrySet()) {
String key = entry.getKey();
Long value = entry.getValue();
//Verify if initial value of statistic is 0.
checkInitialValue(key, value);
}
}
/**
* Testing statistics by creating files and directories.
*/
@Test
public void testCreateStatistics() throws IOException {
describe("Testing counter values got by creating directories and files in"
+ " Abfs");
AzureBlobFileSystem fs = getFileSystem();
Path createFilePath = path(getMethodName());
Path createDirectoryPath = path(getMethodName() + "Dir");
fs.mkdirs(createDirectoryPath);
fs.createNonRecursive(createFilePath, FsPermission
.getDefault(), false, 1024, (short) 1, 1024, null);
Map<String, Long> metricMap = fs.getInstrumentationMap();
/*
Test of statistic values after creating a directory and a file ;
getFileStatus is called 1 time after creating file and 1 time at time of
initialising.
*/
assertAbfsStatistics(AbfsStatistic.CALL_CREATE, 1, metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_CREATE_NON_RECURSIVE, 1, metricMap);
assertAbfsStatistics(AbfsStatistic.FILES_CREATED, 1, metricMap);
assertAbfsStatistics(AbfsStatistic.DIRECTORIES_CREATED, 1, metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_MKDIRS, 1, metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_GET_FILE_STATUS, 2, metricMap);
//re-initialising Abfs to reset statistic values.
fs.initialize(fs.getUri(), fs.getConf());
/*
Creating 10 directories and files; Directories and files can't be created
with same name, hence <Name> + i to give unique names.
*/
for (int i = 0; i < NUMBER_OF_OPS; i++) {
fs.mkdirs(path(getMethodName() + "Dir" + i));
fs.createNonRecursive(path(getMethodName() + i),
FsPermission.getDefault(), false, 1024, (short) 1,
1024, null);
}
metricMap = fs.getInstrumentationMap();
/*
Test of statistics values after creating 10 directories and files;
getFileStatus is called 1 time at initialise() plus number of times file
is created.
*/
assertAbfsStatistics(AbfsStatistic.CALL_CREATE, NUMBER_OF_OPS, metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_CREATE_NON_RECURSIVE, NUMBER_OF_OPS,
metricMap);
assertAbfsStatistics(AbfsStatistic.FILES_CREATED, NUMBER_OF_OPS, metricMap);
assertAbfsStatistics(AbfsStatistic.DIRECTORIES_CREATED, NUMBER_OF_OPS,
metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_MKDIRS, NUMBER_OF_OPS, metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_GET_FILE_STATUS,
1 + NUMBER_OF_OPS, metricMap);
}
/**
* Testing statistics by deleting files and directories.
*/
@Test
public void testDeleteStatistics() throws IOException {
describe("Testing counter values got by deleting directory and files "
+ "in Abfs");
AzureBlobFileSystem fs = getFileSystem();
/*
This directory path needs to be root for triggering the
directories_deleted counter.
*/
Path createDirectoryPath = path("/");
Path createFilePath = path(getMethodName());
/*
creating a directory and a file inside that directory.
The directory is root. Hence, no parent. This allows us to invoke
deleteRoot() method to see the population of directories_deleted and
files_deleted counters.
*/
fs.mkdirs(createDirectoryPath);
fs.create(path(createDirectoryPath + getMethodName()));
fs.delete(createDirectoryPath, true);
Map<String, Long> metricMap = fs.getInstrumentationMap();
/*
Test for op_delete, files_deleted, op_list_status.
since directory is delete recursively op_delete is called 2 times.
1 file is deleted, 1 listStatus() call is made.
*/
assertAbfsStatistics(AbfsStatistic.CALL_DELETE, 2, metricMap);
assertAbfsStatistics(AbfsStatistic.FILES_DELETED, 1, metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_LIST_STATUS, 1, metricMap);
/*
creating a root directory and deleting it recursively to see if
directories_deleted is called or not.
*/
fs.mkdirs(createDirectoryPath);
fs.create(createFilePath);
fs.delete(createDirectoryPath, true);
metricMap = fs.getInstrumentationMap();
//Test for directories_deleted.
assertAbfsStatistics(AbfsStatistic.DIRECTORIES_DELETED, 1, metricMap);
}
/**
* Testing statistics of open, append, rename and exists method calls.
*/
@Test
public void testOpenAppendRenameExists() throws IOException {
describe("Testing counter values on calling open, append and rename and "
+ "exists methods on Abfs");
AzureBlobFileSystem fs = getFileSystem();
Path createFilePath = path(getMethodName());
Path destCreateFilePath = path(getMethodName() + "New");
fs.create(createFilePath);
fs.open(createFilePath);
fs.append(createFilePath);
assertTrue(fs.rename(createFilePath, destCreateFilePath));
Map<String, Long> metricMap = fs.getInstrumentationMap();
//Testing single method calls to open, append and rename.
assertAbfsStatistics(AbfsStatistic.CALL_OPEN, 1, metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_APPEND, 1, metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_RENAME, 1, metricMap);
//Testing if file exists at path.
assertTrue(String.format("File with name %s should exist",
destCreateFilePath),
fs.exists(destCreateFilePath));
assertFalse(String.format("File with name %s should not exist",
createFilePath),
fs.exists(createFilePath));
metricMap = fs.getInstrumentationMap();
//Testing exists() calls.
assertAbfsStatistics(AbfsStatistic.CALL_EXIST, 2, metricMap);
//re-initialising Abfs to reset statistic values.
fs.initialize(fs.getUri(), fs.getConf());
fs.create(destCreateFilePath);
for (int i = 0; i < NUMBER_OF_OPS; i++) {
fs.open(destCreateFilePath);
fs.append(destCreateFilePath);
}
metricMap = fs.getInstrumentationMap();
//Testing large number of method calls to open, append.
assertAbfsStatistics(AbfsStatistic.CALL_OPEN, NUMBER_OF_OPS, metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_APPEND, NUMBER_OF_OPS, metricMap);
for (int i = 0; i < NUMBER_OF_OPS; i++) {
// rename and then back to earlier name for no error while looping.
assertTrue(fs.rename(destCreateFilePath, createFilePath));
assertTrue(fs.rename(createFilePath, destCreateFilePath));
//check if first name is existing and 2nd is not existing.
assertTrue(String.format("File with name %s should exist",
destCreateFilePath),
fs.exists(destCreateFilePath));
assertFalse(String.format("File with name %s should not exist",
createFilePath),
fs.exists(createFilePath));
}
metricMap = fs.getInstrumentationMap();
/*
Testing exists() calls and rename calls. Since both were called 2
times in 1 loop. 2*numberOfOps is expectedValue.
*/
assertAbfsStatistics(AbfsStatistic.CALL_RENAME, 2 * NUMBER_OF_OPS,
metricMap);
assertAbfsStatistics(AbfsStatistic.CALL_EXIST, 2 * NUMBER_OF_OPS,
metricMap);
}
/**
* Method to check initial value of the statistics which should be 0.
*
* @param statName name of the statistic to be checked.
* @param statValue value of the statistic.
*/
private void checkInitialValue(String statName, long statValue) {
assertEquals("Mismatch in " + statName, 0, statValue);
}
}

View File

@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs;
import java.io.IOException;
import java.util.Map;
import org.junit.Test;
import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
/**
* Unit tests for Abfs common counters.
*/
public class TestAbfsStatistics extends AbstractAbfsIntegrationTest {
private static final int LARGE_OPS = 100;
public TestAbfsStatistics() throws Exception {
}
/**
* Tests for op_get_delegation_token and error_ignore counter values.
*/
@Test
public void testInitializeStats() throws IOException {
describe("Testing the counter values after Abfs is initialised");
AbfsCounters instrumentation =
new AbfsInstrumentation(getFileSystem().getUri());
//Testing summation of the counter values.
for (int i = 0; i < LARGE_OPS; i++) {
instrumentation.incrementCounter(AbfsStatistic.CALL_GET_DELEGATION_TOKEN, 1);
instrumentation.incrementCounter(AbfsStatistic.ERROR_IGNORED, 1);
}
Map<String, Long> metricMap = instrumentation.toMap();
assertAbfsStatistics(AbfsStatistic.CALL_GET_DELEGATION_TOKEN, LARGE_OPS,
metricMap);
assertAbfsStatistics(AbfsStatistic.ERROR_IGNORED, LARGE_OPS, metricMap);
}
}