HADOOP-13065. Add a new interface for retrieving FS and FC Statistics (Mingliang Liu via cmccabe)

(cherry picked from commit 687233f20d)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
This commit is contained in:
Colin Patrick Mccabe 2016-05-11 13:45:39 -07:00
parent 788f80b617
commit 15bfcde238
12 changed files with 1011 additions and 18 deletions

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.util.Collections;
import java.util.Iterator;
/**
* EmptyStorageStatistics is a StorageStatistics implementation which has no
* data.
*/
class EmptyStorageStatistics extends StorageStatistics {
EmptyStorageStatistics(String name) {
super(name);
}
public Iterator<LongStatistic> getLongStatistics() {
return Collections.emptyIterator();
}
public Long getLong(String key) {
return null;
}
public boolean isTracked(String key) {
return false;
}
}

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.permission.AclEntry;
@ -3546,7 +3547,7 @@ public abstract class FileSystem extends Configured implements Closeable {
/**
* Get the Map of Statistics object indexed by URI Scheme.
* @return a Map having a key as URI scheme and value as Statistics object
* @deprecated use {@link #getAllStatistics} instead
* @deprecated use {@link #getGlobalStorageStatistics()}
*/
@Deprecated
public static synchronized Map<String, Statistics> getStatistics() {
@ -3558,8 +3559,10 @@ public abstract class FileSystem extends Configured implements Closeable {
}
/**
* Return the FileSystem classes that have Statistics
* Return the FileSystem classes that have Statistics.
* @deprecated use {@link #getGlobalStorageStatistics()}
*/
@Deprecated
public static synchronized List<Statistics> getAllStatistics() {
return new ArrayList<Statistics>(statisticsTable.values());
}
@ -3568,13 +3571,23 @@ public abstract class FileSystem extends Configured implements Closeable {
* Get the statistics for a particular file system
* @param cls the class to lookup
* @return a statistics object
* @deprecated use {@link #getGlobalStorageStatistics()}
*/
public static synchronized
Statistics getStatistics(String scheme, Class<? extends FileSystem> cls) {
@Deprecated
public static synchronized Statistics getStatistics(final String scheme,
Class<? extends FileSystem> cls) {
Statistics result = statisticsTable.get(cls);
if (result == null) {
result = new Statistics(scheme);
statisticsTable.put(cls, result);
final Statistics newStats = new Statistics(scheme);
statisticsTable.put(cls, newStats);
result = newStats;
GlobalStorageStatistics.INSTANCE.put(scheme,
new StorageStatisticsProvider() {
@Override
public StorageStatistics provide() {
return new FileSystemStorageStatistics(scheme, newStats);
}
});
}
return result;
}
@ -3614,4 +3627,26 @@ public abstract class FileSystem extends Configured implements Closeable {
public static void enableSymlinks() {
symlinksEnabled = true;
}
/**
* Get the StorageStatistics for this FileSystem object. These statistics are
* per-instance. They are not shared with any other FileSystem object.
*
* <p>This is a default method which is intended to be overridden by
* subclasses. The default implementation returns an empty storage statistics
* object.</p>
*
* @return The StorageStatistics for this FileSystem instance.
* Will never be null.
*/
public StorageStatistics getStorageStatistics() {
return new EmptyStorageStatistics(getUri().toString());
}
/**
* Get the global storage statistics.
*/
public static GlobalStorageStatistics getGlobalStorageStatistics() {
return GlobalStorageStatistics.INSTANCE;
}
}

View File

@ -0,0 +1,136 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
/**
* A basic StorageStatistics instance which simply returns data from
* FileSystem#Statistics.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FileSystemStorageStatistics extends StorageStatistics {
/**
* The per-class FileSystem statistics.
*/
private final FileSystem.Statistics stats;
private static final String[] KEYS = {
"bytesRead",
"bytesWritten",
"readOps",
"largeReadOps",
"writeOps",
"bytesReadLocalHost",
"bytesReadDistanceOfOneOrTwo",
"bytesReadDistanceOfThreeOrFour",
"bytesReadDistanceOfFiveOrLarger"
};
private static class LongStatisticIterator
implements Iterator<LongStatistic> {
private final StatisticsData data;
private int keyIdx;
LongStatisticIterator(StatisticsData data) {
this.data = data;
this.keyIdx = 0;
}
@Override
public boolean hasNext() {
return (this.keyIdx < KEYS.length);
}
@Override
public LongStatistic next() {
if (this.keyIdx >= KEYS.length) {
throw new NoSuchElementException();
}
String key = KEYS[this.keyIdx++];
Long val = fetch(data, key);
return new LongStatistic(key, val.longValue());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
private static Long fetch(StatisticsData data, String key) {
switch (key) {
case "bytesRead":
return data.getBytesRead();
case "bytesWritten":
return data.getBytesWritten();
case "readOps":
return Long.valueOf(data.getReadOps());
case "largeReadOps":
return Long.valueOf(data.getLargeReadOps());
case "writeOps":
return Long.valueOf(data.getWriteOps());
case "bytesReadLocalHost":
return data.getBytesReadLocalHost();
case "bytesReadDistanceOfOneOrTwo":
return data.getBytesReadDistanceOfOneOrTwo();
case "bytesReadDistanceOfThreeOrFour":
return data.getBytesReadDistanceOfThreeOrFour();
case "bytesReadDistanceOfFiveOrLarger":
return data.getBytesReadDistanceOfFiveOrLarger();
default:
return null;
}
}
FileSystemStorageStatistics(String name, FileSystem.Statistics stats) {
super(name);
this.stats = stats;
}
@Override
public Iterator<LongStatistic> getLongStatistics() {
return new LongStatisticIterator(stats.getData());
}
@Override
public Long getLong(String key) {
return fetch(stats.getData(), key);
}
/**
* Return true if a statistic is being tracked.
*
* @return True only if the statistic is being tracked.
*/
public boolean isTracked(String key) {
for (String k: KEYS) {
if (k.equals(key)) {
return true;
}
}
return false;
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Stores global storage statistics objects.
*/
@InterfaceAudience.Public
public enum GlobalStorageStatistics {
/**
* The GlobalStorageStatistics singleton.
*/
INSTANCE;
/**
* A map of all global StorageStatistics objects, indexed by name.
*/
private final NavigableMap<String, StorageStatistics> map = new TreeMap<>();
/**
* A callback API for creating new StorageStatistics instances.
*/
public interface StorageStatisticsProvider {
StorageStatistics provide();
}
/**
* Get the StorageStatistics object with the given name.
*
* @param name The storage statistics object name.
* @return The StorageStatistics object with the given name, or
* null if there is none.
*/
public synchronized StorageStatistics get(String name) {
return map.get(name);
}
/**
* Create or return the StorageStatistics object with the given name.
*
* @param name The storage statistics object name.
* @param provider An object which can create a new StorageStatistics
* object if needed.
* @return The StorageStatistics object with the given name.
* @throws RuntimeException If the StorageStatisticsProvider provides a new
* StorageStatistics object with the wrong name.
*/
public synchronized StorageStatistics put(String name,
StorageStatisticsProvider provider) {
StorageStatistics stats = map.get(name);
if (stats != null) {
return stats;
}
stats = provider.provide();
if (!stats.getName().equals(name)) {
throw new RuntimeException("StorageStatisticsProvider for " + name +
" provided a StorageStatistics object for " + stats.getName() +
" instead.");
}
map.put(name, stats);
return stats;
}
/**
* Get an iterator that we can use to iterate throw all the global storage
* statistics objects.
*/
synchronized public Iterator<StorageStatistics> iterator() {
Entry<String, StorageStatistics> first = map.firstEntry();
return new StorageIterator((first == null) ? null : first.getValue());
}
private class StorageIterator implements Iterator<StorageStatistics> {
private StorageStatistics next = null;
StorageIterator(StorageStatistics first) {
this.next = first;
}
@Override
public boolean hasNext() {
return (next != null);
}
@Override
public StorageStatistics next() {
if (next == null) {
throw new NoSuchElementException();
}
synchronized (GlobalStorageStatistics.this) {
StorageStatistics cur = next;
Entry<String, StorageStatistics> nextEntry =
map.higherEntry(cur.getName());
next = (nextEntry == null) ? null : nextEntry.getValue();
return cur;
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
}

View File

@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import java.util.Iterator;
/**
* StorageStatistics contains statistics data for a FileSystem or FileContext
* instance.
*/
@InterfaceAudience.Public
public abstract class StorageStatistics {
/**
* A 64-bit storage statistic.
*/
public static class LongStatistic {
private final String name;
private final long value;
public LongStatistic(String name, long value) {
this.name = name;
this.value = value;
}
/**
* @return The name of this statistic.
*/
public String getName() {
return name;
}
/**
* @return The value of this statistic.
*/
public long getValue() {
return value;
}
}
private final String name;
public StorageStatistics(String name) {
this.name = name;
}
/**
* Get the name of this StorageStatistics object.
*/
public String getName() {
return name;
}
/**
* Get an iterator over all the currently tracked long statistics.
*
* The values returned will depend on the type of FileSystem or FileContext
* object. The values do not necessarily reflect a snapshot in time.
*/
public abstract Iterator<LongStatistic> getLongStatistics();
/**
* Get the value of a statistic.
*
* @return null if the statistic is not being tracked or is not a
* long statistic.
* The value of the statistic, otherwise.
*/
public abstract Long getLong(String key);
/**
* Return true if a statistic is being tracked.
*
* @return True only if the statistic is being tracked.
*/
public abstract boolean isTracked(String key);
}

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* A StorageStatistics instance which combines the outputs of several other
* StorageStatistics instances.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class UnionStorageStatistics extends StorageStatistics {
/**
* The underlying StorageStatistics.
*/
private final StorageStatistics[] stats;
private class LongStatisticIterator implements Iterator<LongStatistic> {
private int statIdx;
private Iterator<LongStatistic> cur;
LongStatisticIterator() {
this.statIdx = 0;
this.cur = null;
}
@Override
public boolean hasNext() {
return (getIter() != null);
}
private Iterator<LongStatistic> getIter() {
while ((cur == null) || (!cur.hasNext())) {
if (stats.length >= statIdx) {
return null;
}
cur = stats[statIdx++].getLongStatistics();
}
return cur;
}
@Override
public LongStatistic next() {
Iterator<LongStatistic> iter = getIter();
if (iter == null) {
throw new NoSuchElementException();
}
return iter.next();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
public UnionStorageStatistics(String name, StorageStatistics[] stats) {
super(name);
this.stats = stats;
}
@Override
public Iterator<LongStatistic> getLongStatistics() {
return new LongStatisticIterator();
}
@Override
public Long getLong(String key) {
for (int i = 0; i < stats.length; i++) {
Long val = stats[i].getLong(key);
if (val != null) {
return val;
}
}
return null;
}
/**
* Return true if a statistic is being tracked.
*
* @return True only if the statistic is being tracked.
*/
@Override
public boolean isTracked(String key) {
for (int i = 0; i < stats.length; i++) {
if (stats[i].isTracked(key)) {
return true;
}
}
return false;
}
}

View File

@ -131,6 +131,7 @@ public class TestFilterFileSystem {
public Path fixRelativePart(Path p);
public ContentSummary getContentSummary(Path f);
public QuotaUsage getQuotaUsage(Path f);
StorageStatistics getStorageStatistics();
}
@Test

View File

@ -221,6 +221,7 @@ public class TestHarFileSystem {
public Path getTrashRoot(Path path) throws IOException;
public Collection<FileStatus> getTrashRoots(boolean allUsers) throws IOException;
StorageStatistics getStorageStatistics();
}
@Test

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.fs.StorageStatistics;
import java.util.EnumMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicLong;
/**
* This storage statistics tracks how many times each DFS operation was issued.
*
* For each tracked DFS operation, there is a respective entry in the enum
* {@link OpType}. To use, increment the value the {@link DistributedFileSystem}
* and {@link org.apache.hadoop.hdfs.web.WebHdfsFileSystem}.
*
* This class is thread safe, and is generally shared by multiple threads.
*/
public class DFSOpsCountStatistics extends StorageStatistics {
/** This is for counting file system operations. */
public enum OpType {
ALLOW_SNAPSHOT("allowSnapshot"),
APPEND("append"),
CONCAT("concat"),
COPY_FROM_LOCAL_FILE("copyFromLocalFile"),
CREATE("create"),
CREATE_NON_RECURSIVE("createNonRecursive"),
CREATE_SNAPSHOT("createSnapshot"),
CREATE_SYM_LINK("createSymlink"),
DELETE("delete"),
DELETE_SNAPSHOT("deleteSnapshot"),
DISALLOW_SNAPSHOT("disallowSnapshot"),
EXISTS("exists"),
GET_BYTES_WITH_FUTURE_GS("getBytesWithFutureGenerationStamps"),
GET_CONTENT_SUMMARY("getContentSummary"),
GET_FILE_BLOCK_LOCATIONS("getFileBlockLocations"),
GET_FILE_CHECKSUM("getFileChecksum"),
GET_FILE_LINK_STATUS("getFileLinkStatus"),
GET_FILE_STATUS("getFileStatus"),
GET_LINK_TARGET("getLinkTarget"),
GET_QUOTA_USAGE("getQuotaUsage"),
GET_STATUS("getStatus"),
GET_STORAGE_POLICIES("getStoragePolicies"),
GET_STORAGE_POLICY("getStoragePolicy"),
GET_XATTR("getXAttr"),
LIST_LOCATED_STATUS("listLocatedStatus"),
LIST_STATUS("listStatus"),
MKDIRS("mkdirs"),
MODIFY_ACL_ENTRIES("modifyAclEntries"),
OPEN("open"),
PRIMITIVE_CREATE("primitiveCreate"),
PRIMITIVE_MKDIR("primitiveMkdir"),
REMOVE_ACL("removeAcl"),
REMOVE_ACL_ENTRIES("removeAclEntries"),
REMOVE_DEFAULT_ACL("removeDefaultAcl"),
REMOVE_XATTR("removeXAttr"),
RENAME("rename"),
RENAME_SNAPSHOT("renameSnapshot"),
RESOLVE_LINK("resolveLink"),
SET_ACL("setAcl"),
SET_OWNER("setOwner"),
SET_PERMISSION("setPermission"),
SET_REPLICATION("setReplication"),
SET_STORAGE_POLICY("setStoragePolicy"),
SET_TIMES("setTimes"),
SET_XATTR("setXAttr"),
TRUNCATE("truncate"),
UNSET_STORAGE_POLICY("unsetStoragePolicy");
private final String symbol;
OpType(String symbol) {
this.symbol = symbol;
}
public String getSymbol() {
return symbol;
}
public static OpType fromSymbol(String symbol) {
if (symbol != null) {
for (OpType opType : values()) {
if (opType.getSymbol().equals(symbol)) {
return opType;
}
}
}
return null;
}
}
public static final String NAME = "DFSOpsCountStatistics";
private final Map<OpType, AtomicLong> opsCount = new EnumMap<>(OpType.class);
public DFSOpsCountStatistics() {
super(NAME);
for (OpType opType : OpType.values()) {
opsCount.put(opType, new AtomicLong(0));
}
}
public void incrementOpCounter(OpType op) {
opsCount.get(op).addAndGet(1);
}
private class LongIterator implements Iterator<LongStatistic> {
private Iterator<Entry<OpType, AtomicLong>> iterator =
opsCount.entrySet().iterator();
@Override
public boolean hasNext() {
return iterator.hasNext();
}
@Override
public LongStatistic next() {
if (!iterator.hasNext()) {
throw new NoSuchElementException();
}
final Entry<OpType, AtomicLong> entry = iterator.next();
return new LongStatistic(entry.getKey().name(), entry.getValue().get());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
@Override
public Iterator<LongStatistic> getLongStatistics() {
return new LongIterator();
}
@Override
public Long getLong(String key) {
final OpType type = OpType.fromSymbol(key);
return type == null ? null : opsCount.get(type).get();
}
@Override
public boolean isTracked(String key) {
return OpType.fromSymbol(key) == null;
}
}

View File

@ -48,8 +48,11 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemLinkResolver;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
@ -67,6 +70,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
@ -96,7 +100,6 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
* This object is the way end-user code interacts with a Hadoop
@ -114,6 +117,8 @@ public class DistributedFileSystem extends FileSystem {
DFSClient dfs;
private boolean verifyChecksum = true;
private DFSOpsCountStatistics storageStatistics;
static{
HdfsConfiguration.init();
}
@ -151,6 +156,15 @@ public class DistributedFileSystem extends FileSystem {
this.dfs = new DFSClient(uri, conf, statistics);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE
.put(DFSOpsCountStatistics.NAME,
new StorageStatisticsProvider() {
@Override
public StorageStatistics provide() {
return new DFSOpsCountStatistics();
}
});
}
@Override
@ -215,6 +229,7 @@ public class DistributedFileSystem extends FileSystem {
public BlockLocation[] getFileBlockLocations(Path p,
final long start, final long len) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
final Path absF = fixRelativePart(p);
return new FileSystemLinkResolver<BlockLocation[]>() {
@Override
@ -301,6 +316,7 @@ public class DistributedFileSystem extends FileSystem {
public FSDataInputStream open(Path f, final int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.OPEN);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataInputStream>() {
@Override
@ -337,6 +353,7 @@ public class DistributedFileSystem extends FileSystem {
public FSDataOutputStream append(Path f, final EnumSet<CreateFlag> flag,
final int bufferSize, final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.APPEND);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
@ -369,6 +386,7 @@ public class DistributedFileSystem extends FileSystem {
final int bufferSize, final Progressable progress,
final InetSocketAddress[] favoredNodes) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.APPEND);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
@ -412,6 +430,7 @@ public class DistributedFileSystem extends FileSystem {
final Progressable progress, final InetSocketAddress[] favoredNodes)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<HdfsDataOutputStream>() {
@Override
@ -445,6 +464,7 @@ public class DistributedFileSystem extends FileSystem {
final Progressable progress, final ChecksumOpt checksumOpt)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
@ -469,6 +489,7 @@ public class DistributedFileSystem extends FileSystem {
short replication, long blockSize, Progressable progress,
ChecksumOpt checksumOpt) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.PRIMITIVE_CREATE);
final DFSOutputStream dfsos = dfs.primitiveCreate(
getPathName(fixRelativePart(f)),
absolutePermission, flag, true, replication, blockSize,
@ -485,6 +506,7 @@ public class DistributedFileSystem extends FileSystem {
final int bufferSize, final short replication, final long blockSize,
final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE);
if (flag.contains(CreateFlag.OVERWRITE)) {
flag.add(CreateFlag.CREATE);
}
@ -510,6 +532,7 @@ public class DistributedFileSystem extends FileSystem {
public boolean setReplication(Path src, final short replication)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_REPLICATION);
Path absF = fixRelativePart(src);
return new FileSystemLinkResolver<Boolean>() {
@Override
@ -534,6 +557,7 @@ public class DistributedFileSystem extends FileSystem {
public void setStoragePolicy(final Path src, final String policyName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY);
Path absF = fixRelativePart(src);
new FileSystemLinkResolver<Void>() {
@Override
@ -554,6 +578,7 @@ public class DistributedFileSystem extends FileSystem {
public void unsetStoragePolicy(final Path src)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY);
Path absF = fixRelativePart(src);
new FileSystemLinkResolver<Void>() {
@Override
@ -578,6 +603,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICY);
Path absF = fixRelativePart(path);
return new FileSystemLinkResolver<BlockStoragePolicySpi>() {
@ -608,6 +634,7 @@ public class DistributedFileSystem extends FileSystem {
*/
public long getBytesWithFutureGenerationStamps() throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_BYTES_WITH_FUTURE_GS);
return dfs.getBytesInFutureBlocks();
}
@ -618,6 +645,7 @@ public class DistributedFileSystem extends FileSystem {
@Deprecated
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICIES);
return dfs.getStoragePolicies();
}
@ -632,6 +660,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public void concat(Path trg, Path [] psrcs) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CONCAT);
// Make target absolute
Path absF = fixRelativePart(trg);
// Make all srcs absolute
@ -676,6 +705,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public boolean rename(Path src, Path dst) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.RENAME);
final Path absSrc = fixRelativePart(src);
final Path absDst = fixRelativePart(dst);
@ -710,6 +740,7 @@ public class DistributedFileSystem extends FileSystem {
public void rename(Path src, Path dst, final Options.Rename... options)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.RENAME);
final Path absSrc = fixRelativePart(src);
final Path absDst = fixRelativePart(dst);
// Try the rename without resolving first
@ -738,6 +769,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public boolean truncate(Path f, final long newLength) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.TRUNCATE);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<Boolean>() {
@Override
@ -755,6 +787,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public boolean delete(Path f, final boolean recursive) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DELETE);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<Boolean>() {
@Override
@ -772,6 +805,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<ContentSummary>() {
@Override
@ -789,6 +823,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public QuotaUsage getQuotaUsage(Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_QUOTA_USAGE);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<QuotaUsage>() {
@Override
@ -873,6 +908,7 @@ public class DistributedFileSystem extends FileSystem {
stats[i] = partialListing[i].makeQualified(getUri(), p);
}
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
return stats;
}
@ -887,6 +923,7 @@ public class DistributedFileSystem extends FileSystem {
listing.add(fileStatus.makeQualified(getUri(), p));
}
statistics.incrementLargeReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
// now fetch more entries
do {
@ -901,6 +938,7 @@ public class DistributedFileSystem extends FileSystem {
listing.add(fileStatus.makeQualified(getUri(), p));
}
statistics.incrementLargeReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
} while (thisListing.hasMore());
return listing.toArray(new FileStatus[listing.size()]);
@ -1014,6 +1052,7 @@ public class DistributedFileSystem extends FileSystem {
thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME,
needLocation);
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_LOCATED_STATUS);
if (thisListing == null) { // the directory does not exist
throw new FileNotFoundException("File " + p + " does not exist.");
}
@ -1109,6 +1148,7 @@ public class DistributedFileSystem extends FileSystem {
private boolean mkdirsInternal(Path f, final FsPermission permission,
final boolean createParent) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MKDIRS);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<Boolean>() {
@Override
@ -1135,6 +1175,7 @@ public class DistributedFileSystem extends FileSystem {
protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.PRIMITIVE_MKDIR);
return dfs.primitiveMkdir(getPathName(f), absolutePermission);
}
@ -1180,6 +1221,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public FsStatus getStatus(Path p) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_STATUS);
return dfs.getDiskStatus();
}
@ -1387,6 +1429,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public FileStatus getFileStatus(Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FileStatus>() {
@Override
@ -1414,6 +1457,7 @@ public class DistributedFileSystem extends FileSystem {
throw new UnsupportedOperationException("Symlinks not supported");
}
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK);
final Path absF = fixRelativePart(link);
new FileSystemLinkResolver<Void>() {
@Override
@ -1437,6 +1481,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public FileStatus getFileLinkStatus(final Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_LINK_STATUS);
final Path absF = fixRelativePart(f);
FileStatus status = new FileSystemLinkResolver<FileStatus>() {
@Override
@ -1466,6 +1511,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public Path getLinkTarget(final Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_LINK_TARGET);
final Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<Path>() {
@Override
@ -1487,6 +1533,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
protected Path resolveLink(Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.RESOLVE_LINK);
String target = dfs.getLinkTarget(getPathName(fixRelativePart(f)));
if (target == null) {
throw new FileNotFoundException("File does not exist: " + f.toString());
@ -1497,6 +1544,7 @@ public class DistributedFileSystem extends FileSystem {
@Override
public FileChecksum getFileChecksum(Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FileChecksum>() {
@Override
@ -1516,6 +1564,7 @@ public class DistributedFileSystem extends FileSystem {
public FileChecksum getFileChecksum(Path f, final long length)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FileChecksum>() {
@Override
@ -1541,6 +1590,7 @@ public class DistributedFileSystem extends FileSystem {
public void setPermission(Path p, final FsPermission permission
) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_PERMISSION);
Path absF = fixRelativePart(p);
new FileSystemLinkResolver<Void>() {
@Override
@ -1565,6 +1615,7 @@ public class DistributedFileSystem extends FileSystem {
throw new IOException("username == null && groupname == null");
}
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_OWNER);
Path absF = fixRelativePart(p);
new FileSystemLinkResolver<Void>() {
@Override
@ -1586,6 +1637,7 @@ public class DistributedFileSystem extends FileSystem {
public void setTimes(Path p, final long mtime, final long atime)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_TIMES);
Path absF = fixRelativePart(p);
new FileSystemLinkResolver<Void>() {
@Override
@ -1663,6 +1715,8 @@ public class DistributedFileSystem extends FileSystem {
/** @see HdfsAdmin#allowSnapshot(Path) */
public void allowSnapshot(final Path path) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -1689,6 +1743,8 @@ public class DistributedFileSystem extends FileSystem {
/** @see HdfsAdmin#disallowSnapshot(Path) */
public void disallowSnapshot(final Path path) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -1716,6 +1772,8 @@ public class DistributedFileSystem extends FileSystem {
@Override
public Path createSnapshot(final Path path, final String snapshotName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT);
Path absF = fixRelativePart(path);
return new FileSystemLinkResolver<Path>() {
@Override
@ -1741,6 +1799,8 @@ public class DistributedFileSystem extends FileSystem {
@Override
public void renameSnapshot(final Path path, final String snapshotOldName,
final String snapshotNewName) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -1777,6 +1837,8 @@ public class DistributedFileSystem extends FileSystem {
@Override
public void deleteSnapshot(final Path snapshotDir, final String snapshotName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT);
Path absF = fixRelativePart(snapshotDir);
new FileSystemLinkResolver<Void>() {
@Override
@ -2024,6 +2086,8 @@ public class DistributedFileSystem extends FileSystem {
@Override
public void modifyAclEntries(Path path, final List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -2046,6 +2110,8 @@ public class DistributedFileSystem extends FileSystem {
@Override
public void removeAclEntries(Path path, final List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -2067,6 +2133,8 @@ public class DistributedFileSystem extends FileSystem {
*/
@Override
public void removeDefaultAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL);
final Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -2087,6 +2155,8 @@ public class DistributedFileSystem extends FileSystem {
*/
@Override
public void removeAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL);
final Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -2108,6 +2178,8 @@ public class DistributedFileSystem extends FileSystem {
@Override
public void setAcl(Path path, final List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_ACL);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -2206,6 +2278,8 @@ public class DistributedFileSystem extends FileSystem {
@Override
public void setXAttr(Path path, final String name, final byte[] value,
final EnumSet<XAttrSetFlag> flag) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_XATTR);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@ -2225,6 +2299,8 @@ public class DistributedFileSystem extends FileSystem {
@Override
public byte[] getXAttr(Path path, final String name) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_XATTR);
final Path absF = fixRelativePart(path);
return new FileSystemLinkResolver<byte[]>() {
@Override
@ -2290,6 +2366,8 @@ public class DistributedFileSystem extends FileSystem {
@Override
public void removeXAttr(Path path, final String name) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -2446,4 +2524,5 @@ public class DistributedFileSystem extends FileSystem {
Statistics getFsStatistics() {
return statistics;
}
}

View File

@ -62,6 +62,11 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
@ -143,6 +148,8 @@ public class WebHdfsFileSystem extends FileSystem
private static final ObjectReader READER =
new ObjectMapper().reader(Map.class);
private DFSOpsCountStatistics storageStatistics;
/**
* Return the protocol scheme for the FileSystem.
* <p/>
@ -239,6 +246,15 @@ public class WebHdfsFileSystem extends FileSystem
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.initializeRestCsrf(conf);
this.delegationToken = null;
storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE
.put(DFSOpsCountStatistics.NAME,
new StorageStatisticsProvider() {
@Override
public StorageStatistics provide() {
return new DFSOpsCountStatistics();
}
});
}
/**
@ -983,6 +999,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public FileStatus getFileStatus(Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS);
return makeQualified(getHdfsFileStatus(f), f);
}
@ -1012,6 +1029,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MKDIRS);
final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
return new FsPathBooleanRunner(op, f,
new PermissionParam(applyUMask(permission))
@ -1024,6 +1042,7 @@ public class WebHdfsFileSystem extends FileSystem
public void createSymlink(Path destination, Path f, boolean createParent
) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK);
final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
new FsPathRunner(op, f,
new DestinationParam(makeQualified(destination).toUri().getPath()),
@ -1034,6 +1053,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.RENAME);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
return new FsPathBooleanRunner(op, src,
new DestinationParam(makeQualified(dst).toUri().getPath())
@ -1045,6 +1065,7 @@ public class WebHdfsFileSystem extends FileSystem
public void rename(final Path src, final Path dst,
final Options.Rename... options) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.RENAME);
final HttpOpParam.Op op = PutOpParam.Op.RENAME;
new FsPathRunner(op, src,
new DestinationParam(makeQualified(dst).toUri().getPath()),
@ -1056,6 +1077,7 @@ public class WebHdfsFileSystem extends FileSystem
public void setXAttr(Path p, String name, byte[] value,
EnumSet<XAttrSetFlag> flag) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_XATTR);
final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
if (value != null) {
new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
@ -1069,6 +1091,8 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public byte[] getXAttr(Path p, final String name) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_XATTR);
final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
new XAttrEncodingParam(XAttrCodec.HEX)) {
@ -1125,6 +1149,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public void removeXAttr(Path p, String name) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
new FsPathRunner(op, p, new XAttrNameParam(name)).run();
}
@ -1137,6 +1162,7 @@ public class WebHdfsFileSystem extends FileSystem
}
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_OWNER);
final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
new FsPathRunner(op, p,
new OwnerParam(owner), new GroupParam(group)
@ -1147,6 +1173,7 @@ public class WebHdfsFileSystem extends FileSystem
public void setPermission(final Path p, final FsPermission permission
) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_PERMISSION);
final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
new FsPathRunner(op, p,new PermissionParam(permission)).run();
}
@ -1155,6 +1182,7 @@ public class WebHdfsFileSystem extends FileSystem
public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES);
final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@ -1163,6 +1191,7 @@ public class WebHdfsFileSystem extends FileSystem
public void removeAclEntries(Path path, List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
}
@ -1170,6 +1199,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public void removeDefaultAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
new FsPathRunner(op, path).run();
}
@ -1177,6 +1207,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public void removeAcl(Path path) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_ACL);
final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
new FsPathRunner(op, path).run();
}
@ -1185,6 +1216,7 @@ public class WebHdfsFileSystem extends FileSystem
public void setAcl(final Path p, final List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_ACL);
final HttpOpParam.Op op = PutOpParam.Op.SETACL;
new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
}
@ -1193,6 +1225,7 @@ public class WebHdfsFileSystem extends FileSystem
public Path createSnapshot(final Path path, final String snapshotName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT);
final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
return new FsPathResponseRunner<Path>(op, path,
new SnapshotNameParam(snapshotName)) {
@ -1207,6 +1240,7 @@ public class WebHdfsFileSystem extends FileSystem
public void deleteSnapshot(final Path path, final String snapshotName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT);
final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
}
@ -1215,6 +1249,7 @@ public class WebHdfsFileSystem extends FileSystem
public void renameSnapshot(final Path path, final String snapshotOldName,
final String snapshotNewName) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT);
final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
new SnapshotNameParam(snapshotNewName)).run();
@ -1224,6 +1259,7 @@ public class WebHdfsFileSystem extends FileSystem
public boolean setReplication(final Path p, final short replication
) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_REPLICATION);
final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
return new FsPathBooleanRunner(op, p,
new ReplicationParam(replication)
@ -1234,6 +1270,7 @@ public class WebHdfsFileSystem extends FileSystem
public void setTimes(final Path p, final long mtime, final long atime
) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_TIMES);
final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
new FsPathRunner(op, p,
new ModificationTimeParam(mtime),
@ -1256,6 +1293,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public void concat(final Path trg, final Path [] srcs) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CONCAT);
final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
}
@ -1265,6 +1303,7 @@ public class WebHdfsFileSystem extends FileSystem
final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE);
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
return new FsPathOutputStreamRunner(op, f, bufferSize,
@ -1282,6 +1321,7 @@ public class WebHdfsFileSystem extends FileSystem
final int bufferSize, final short replication, final long blockSize,
final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE);
final HttpOpParam.Op op = PutOpParam.Op.CREATE;
return new FsPathOutputStreamRunner(op, f, bufferSize,
@ -1298,6 +1338,7 @@ public class WebHdfsFileSystem extends FileSystem
public FSDataOutputStream append(final Path f, final int bufferSize,
final Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.APPEND);
final HttpOpParam.Op op = PostOpParam.Op.APPEND;
return new FsPathOutputStreamRunner(op, f, bufferSize,
@ -1308,6 +1349,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public boolean truncate(Path f, long newLength) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.TRUNCATE);
final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();
@ -1315,6 +1357,8 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public boolean delete(Path f, boolean recursive) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.DELETE);
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
return new FsPathBooleanRunner(op, f,
new RecursiveParam(recursive)
@ -1325,6 +1369,7 @@ public class WebHdfsFileSystem extends FileSystem
public FSDataInputStream open(final Path f, final int bufferSize
) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.OPEN);
return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize));
}
@ -1424,6 +1469,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_STATUS);
final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
return new FsPathResponseRunner<FileStatus[]>(op, f) {
@ -1519,6 +1565,7 @@ public class WebHdfsFileSystem extends FileSystem
public BlockLocation[] getFileBlockLocations(final Path p,
final long offset, final long length) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS);
final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
return new FsPathResponseRunner<BlockLocation[]>(op, p,
@ -1540,6 +1587,7 @@ public class WebHdfsFileSystem extends FileSystem
@Override
public ContentSummary getContentSummary(final Path p) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY);
final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
return new FsPathResponseRunner<ContentSummary>(op, p) {
@ -1554,6 +1602,7 @@ public class WebHdfsFileSystem extends FileSystem
public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM);
final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {

View File

@ -42,10 +42,15 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.apache.commons.lang.ArrayUtils;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStorageLocation;
@ -54,28 +59,34 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -83,6 +94,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
@ -93,9 +105,15 @@ import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
import com.google.common.collect.Lists;
import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestDistributedFileSystem {
private static final Random RAN = new Random();
private static final Logger LOG = LoggerFactory.getLogger(
TestDistributedFileSystem.class);
static {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL);
@ -416,54 +434,84 @@ public class TestDistributedFileSystem {
}
@Test
public void testStatistics() throws Exception {
public void testStatistics() throws IOException {
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
DistributedFileSystem.class).reset();
@SuppressWarnings("unchecked")
ThreadLocal<StatisticsData> data = (ThreadLocal<StatisticsData>)
Whitebox.getInternalState(
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
DistributedFileSystem.class), "threadData");
data.set(null);
int lsLimit = 2;
final Configuration conf = getTestConfiguration();
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, lsLimit);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try {
cluster.waitActive();
final FileSystem fs = cluster.getFileSystem();
Path dir = new Path("/test");
Path file = new Path(dir, "file");
int readOps = DFSTestUtil.getStatistics(fs).getReadOps();
int writeOps = DFSTestUtil.getStatistics(fs).getWriteOps();
int largeReadOps = DFSTestUtil.getStatistics(fs).getLargeReadOps();
int readOps = 0;
int writeOps = 0;
int largeReadOps = 0;
long opCount = getOpStatistics(OpType.MKDIRS);
fs.mkdirs(dir);
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
checkOpStatistics(OpType.MKDIRS, opCount + 1);
opCount = getOpStatistics(OpType.CREATE);
FSDataOutputStream out = fs.create(file, (short)1);
out.close();
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
checkOpStatistics(OpType.CREATE, opCount + 1);
opCount = getOpStatistics(OpType.GET_FILE_STATUS);
FileStatus status = fs.getFileStatus(file);
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
checkOpStatistics(OpType.GET_FILE_STATUS, opCount + 1);
opCount = getOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS);
fs.getFileBlockLocations(file, 0, 0);
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 1);
fs.getFileBlockLocations(status, 0, 0);
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 2);
opCount = getOpStatistics(OpType.OPEN);
FSDataInputStream in = fs.open(file);
in.close();
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
checkOpStatistics(OpType.OPEN, opCount + 1);
opCount = getOpStatistics(OpType.SET_REPLICATION);
fs.setReplication(file, (short)2);
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
checkOpStatistics(OpType.SET_REPLICATION, opCount + 1);
opCount = getOpStatistics(OpType.RENAME);
Path file1 = new Path(dir, "file1");
fs.rename(file, file1);
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
checkOpStatistics(OpType.RENAME, opCount + 1);
opCount = getOpStatistics(OpType.GET_CONTENT_SUMMARY);
fs.getContentSummary(file1);
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
checkOpStatistics(OpType.GET_CONTENT_SUMMARY, opCount + 1);
// Iterative ls test
long mkdirOp = getOpStatistics(OpType.MKDIRS);
long listStatusOp = getOpStatistics(OpType.LIST_STATUS);
for (int i = 0; i < 10; i++) {
Path p = new Path(dir, Integer.toString(i));
fs.mkdirs(p);
mkdirOp++;
FileStatus[] list = fs.listStatus(dir);
if (list.length > lsLimit) {
// if large directory, then count readOps and largeReadOps by
@ -471,34 +519,50 @@ public class TestDistributedFileSystem {
int iterations = (int)Math.ceil((double)list.length/lsLimit);
largeReadOps += iterations;
readOps += iterations;
listStatusOp += iterations;
} else {
// Single iteration in listStatus - no large read operation done
readOps++;
listStatusOp++;
}
// writeOps incremented by 1 for mkdirs
// readOps and largeReadOps incremented by 1 or more
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
checkOpStatistics(OpType.MKDIRS, mkdirOp);
checkOpStatistics(OpType.LIST_STATUS, listStatusOp);
}
opCount = getOpStatistics(OpType.GET_STATUS);
fs.getStatus(file1);
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
checkOpStatistics(OpType.GET_STATUS, opCount + 1);
opCount = getOpStatistics(OpType.GET_FILE_CHECKSUM);
fs.getFileChecksum(file1);
checkStatistics(fs, ++readOps, writeOps, largeReadOps);
checkOpStatistics(OpType.GET_FILE_CHECKSUM, opCount + 1);
opCount = getOpStatistics(OpType.SET_PERMISSION);
fs.setPermission(file1, new FsPermission((short)0777));
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
checkOpStatistics(OpType.SET_PERMISSION, opCount + 1);
opCount = getOpStatistics(OpType.SET_TIMES);
fs.setTimes(file1, 0L, 0L);
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
checkOpStatistics(OpType.SET_TIMES, opCount + 1);
opCount = getOpStatistics(OpType.SET_OWNER);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
fs.setOwner(file1, ugi.getUserName(), ugi.getGroupNames()[0]);
checkOpStatistics(OpType.SET_OWNER, opCount + 1);
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
opCount = getOpStatistics(OpType.DELETE);
fs.delete(dir, true);
checkStatistics(fs, readOps, ++writeOps, largeReadOps);
checkOpStatistics(OpType.DELETE, opCount + 1);
} finally {
if (cluster != null) cluster.shutdown();
@ -506,6 +570,80 @@ public class TestDistributedFileSystem {
}
@SuppressWarnings("ThrowableResultOfMethodCallIgnored")
@Test (timeout = 180000)
public void testConcurrentStatistics()
throws IOException, InterruptedException {
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
DistributedFileSystem.class).reset();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(
new Configuration()).build();
cluster.waitActive();
final FileSystem fs = cluster.getFileSystem();
final int numThreads = 5;
final ExecutorService threadPool =
HadoopExecutors.newFixedThreadPool(numThreads);
try {
final CountDownLatch allExecutorThreadsReady =
new CountDownLatch(numThreads);
final CountDownLatch startBlocker = new CountDownLatch(1);
final CountDownLatch allDone = new CountDownLatch(numThreads);
final AtomicReference<Throwable> childError = new AtomicReference<>();
for (int i = 0; i < numThreads; i++) {
threadPool.submit(new Runnable() {
@Override
public void run() {
allExecutorThreadsReady.countDown();
try {
startBlocker.await();
final FileSystem fs = cluster.getFileSystem();
fs.mkdirs(new Path("/testStatisticsParallelChild"));
} catch (Throwable t) {
LOG.error("Child failed when calling mkdir", t);
childError.compareAndSet(null, t);
} finally {
allDone.countDown();
}
}
});
}
final long oldMkdirOpCount = getOpStatistics(OpType.MKDIRS);
// wait until all threads are ready
allExecutorThreadsReady.await();
// all threads start making directories
startBlocker.countDown();
// wait until all threads are done
allDone.await();
assertNull("Child failed with exception " + childError.get(),
childError.get());
checkStatistics(fs, 0, numThreads, 0);
// check the single operation count stat
checkOpStatistics(OpType.MKDIRS, numThreads + oldMkdirOpCount);
// iterate all the operation counts
for (Iterator<LongStatistic> opCountIter =
FileSystem.getGlobalStorageStatistics()
.get(DFSOpsCountStatistics.NAME).getLongStatistics();
opCountIter.hasNext();) {
final LongStatistic opCount = opCountIter.next();
if (OpType.MKDIRS.getSymbol().equals(opCount.getName())) {
assertEquals("Unexpected op count from iterator!",
numThreads + oldMkdirOpCount, opCount.getValue());
}
LOG.info(opCount.getName() + "\t" + opCount.getValue());
}
} finally {
threadPool.shutdownNow();
cluster.shutdown();
}
}
/** Checks statistics. -1 indicates do not check for the operations */
private void checkStatistics(FileSystem fs, int readOps, int writeOps,
int largeReadOps) {
@ -575,6 +713,17 @@ public class TestDistributedFileSystem {
}
}
private static void checkOpStatistics(OpType op, long count) {
assertEquals("Op " + op.getSymbol() + " has unexpected count!",
count, getOpStatistics(op));
}
private static long getOpStatistics(OpType op) {
return GlobalStorageStatistics.INSTANCE.get(
DFSOpsCountStatistics.NAME)
.getLong(op.getSymbol());
}
@Test
public void testFileChecksum() throws Exception {
GenericTestUtils.setLogLevel(HftpFileSystem.LOG, Level.ALL);