diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java new file mode 100644 index 00000000000..1bcfe23ee90 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/EmptyStorageStatistics.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Collections; +import java.util.Iterator; + +/** + * EmptyStorageStatistics is a StorageStatistics implementation which has no + * data. + */ +class EmptyStorageStatistics extends StorageStatistics { + EmptyStorageStatistics(String name) { + super(name); + } + + public Iterator getLongStatistics() { + return Collections.emptyIterator(); + } + + public Long getLong(String key) { + return null; + } + + public boolean isTracked(String key) { + return false; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 99494616155..e3b3a54b633 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -49,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.AclEntry; @@ -3546,7 +3547,7 @@ synchronized int getAllThreadLocalDataSize() { /** * Get the Map of Statistics object indexed by URI Scheme. * @return a Map having a key as URI scheme and value as Statistics object - * @deprecated use {@link #getAllStatistics} instead + * @deprecated use {@link #getGlobalStorageStatistics()} */ @Deprecated public static synchronized Map getStatistics() { @@ -3558,8 +3559,10 @@ public static synchronized Map getStatistics() { } /** - * Return the FileSystem classes that have Statistics + * Return the FileSystem classes that have Statistics. + * @deprecated use {@link #getGlobalStorageStatistics()} */ + @Deprecated public static synchronized List getAllStatistics() { return new ArrayList(statisticsTable.values()); } @@ -3568,13 +3571,23 @@ public static synchronized List getAllStatistics() { * Get the statistics for a particular file system * @param cls the class to lookup * @return a statistics object + * @deprecated use {@link #getGlobalStorageStatistics()} */ - public static synchronized - Statistics getStatistics(String scheme, Class cls) { + @Deprecated + public static synchronized Statistics getStatistics(final String scheme, + Class cls) { Statistics result = statisticsTable.get(cls); if (result == null) { - result = new Statistics(scheme); - statisticsTable.put(cls, result); + final Statistics newStats = new Statistics(scheme); + statisticsTable.put(cls, newStats); + result = newStats; + GlobalStorageStatistics.INSTANCE.put(scheme, + new StorageStatisticsProvider() { + @Override + public StorageStatistics provide() { + return new FileSystemStorageStatistics(scheme, newStats); + } + }); } return result; } @@ -3614,4 +3627,26 @@ public static boolean areSymlinksEnabled() { public static void enableSymlinks() { symlinksEnabled = true; } + + /** + * Get the StorageStatistics for this FileSystem object. These statistics are + * per-instance. They are not shared with any other FileSystem object. + * + *

This is a default method which is intended to be overridden by + * subclasses. The default implementation returns an empty storage statistics + * object.

+ * + * @return The StorageStatistics for this FileSystem instance. + * Will never be null. + */ + public StorageStatistics getStorageStatistics() { + return new EmptyStorageStatistics(getUri().toString()); + } + + /** + * Get the global storage statistics. + */ + public static GlobalStorageStatistics getGlobalStorageStatistics() { + return GlobalStorageStatistics.INSTANCE; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java new file mode 100644 index 00000000000..14f7cdd02e6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemStorageStatistics.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData; + +/** + * A basic StorageStatistics instance which simply returns data from + * FileSystem#Statistics. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class FileSystemStorageStatistics extends StorageStatistics { + /** + * The per-class FileSystem statistics. + */ + private final FileSystem.Statistics stats; + + private static final String[] KEYS = { + "bytesRead", + "bytesWritten", + "readOps", + "largeReadOps", + "writeOps", + "bytesReadLocalHost", + "bytesReadDistanceOfOneOrTwo", + "bytesReadDistanceOfThreeOrFour", + "bytesReadDistanceOfFiveOrLarger" + }; + + private static class LongStatisticIterator + implements Iterator { + private final StatisticsData data; + + private int keyIdx; + + LongStatisticIterator(StatisticsData data) { + this.data = data; + this.keyIdx = 0; + } + + @Override + public boolean hasNext() { + return (this.keyIdx < KEYS.length); + } + + @Override + public LongStatistic next() { + if (this.keyIdx >= KEYS.length) { + throw new NoSuchElementException(); + } + String key = KEYS[this.keyIdx++]; + Long val = fetch(data, key); + return new LongStatistic(key, val.longValue()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + private static Long fetch(StatisticsData data, String key) { + switch (key) { + case "bytesRead": + return data.getBytesRead(); + case "bytesWritten": + return data.getBytesWritten(); + case "readOps": + return Long.valueOf(data.getReadOps()); + case "largeReadOps": + return Long.valueOf(data.getLargeReadOps()); + case "writeOps": + return Long.valueOf(data.getWriteOps()); + case "bytesReadLocalHost": + return data.getBytesReadLocalHost(); + case "bytesReadDistanceOfOneOrTwo": + return data.getBytesReadDistanceOfOneOrTwo(); + case "bytesReadDistanceOfThreeOrFour": + return data.getBytesReadDistanceOfThreeOrFour(); + case "bytesReadDistanceOfFiveOrLarger": + return data.getBytesReadDistanceOfFiveOrLarger(); + default: + return null; + } + } + + FileSystemStorageStatistics(String name, FileSystem.Statistics stats) { + super(name); + this.stats = stats; + } + + @Override + public Iterator getLongStatistics() { + return new LongStatisticIterator(stats.getData()); + } + + @Override + public Long getLong(String key) { + return fetch(stats.getData(), key); + } + + /** + * Return true if a statistic is being tracked. + * + * @return True only if the statistic is being tracked. + */ + public boolean isTracked(String key) { + for (String k: KEYS) { + if (k.equals(key)) { + return true; + } + } + return false; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java new file mode 100644 index 00000000000..f22e78c7638 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/GlobalStorageStatistics.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.NoSuchElementException; +import java.util.TreeMap; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Stores global storage statistics objects. + */ +@InterfaceAudience.Public +public enum GlobalStorageStatistics { + /** + * The GlobalStorageStatistics singleton. + */ + INSTANCE; + + /** + * A map of all global StorageStatistics objects, indexed by name. + */ + private final NavigableMap map = new TreeMap<>(); + + /** + * A callback API for creating new StorageStatistics instances. + */ + public interface StorageStatisticsProvider { + StorageStatistics provide(); + } + + /** + * Get the StorageStatistics object with the given name. + * + * @param name The storage statistics object name. + * @return The StorageStatistics object with the given name, or + * null if there is none. + */ + public synchronized StorageStatistics get(String name) { + return map.get(name); + } + + /** + * Create or return the StorageStatistics object with the given name. + * + * @param name The storage statistics object name. + * @param provider An object which can create a new StorageStatistics + * object if needed. + * @return The StorageStatistics object with the given name. + * @throws RuntimeException If the StorageStatisticsProvider provides a new + * StorageStatistics object with the wrong name. + */ + public synchronized StorageStatistics put(String name, + StorageStatisticsProvider provider) { + StorageStatistics stats = map.get(name); + if (stats != null) { + return stats; + } + stats = provider.provide(); + if (!stats.getName().equals(name)) { + throw new RuntimeException("StorageStatisticsProvider for " + name + + " provided a StorageStatistics object for " + stats.getName() + + " instead."); + } + map.put(name, stats); + return stats; + } + + /** + * Get an iterator that we can use to iterate throw all the global storage + * statistics objects. + */ + synchronized public Iterator iterator() { + Entry first = map.firstEntry(); + return new StorageIterator((first == null) ? null : first.getValue()); + } + + private class StorageIterator implements Iterator { + private StorageStatistics next = null; + + StorageIterator(StorageStatistics first) { + this.next = first; + } + + @Override + public boolean hasNext() { + return (next != null); + } + + @Override + public StorageStatistics next() { + if (next == null) { + throw new NoSuchElementException(); + } + synchronized (GlobalStorageStatistics.this) { + StorageStatistics cur = next; + Entry nextEntry = + map.higherEntry(cur.getName()); + next = (nextEntry == null) ? null : nextEntry.getValue(); + return cur; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java new file mode 100644 index 00000000000..4bdef80d6b5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.Iterator; + +/** + * StorageStatistics contains statistics data for a FileSystem or FileContext + * instance. + */ +@InterfaceAudience.Public +public abstract class StorageStatistics { + /** + * A 64-bit storage statistic. + */ + public static class LongStatistic { + private final String name; + private final long value; + + public LongStatistic(String name, long value) { + this.name = name; + this.value = value; + } + + /** + * @return The name of this statistic. + */ + public String getName() { + return name; + } + + /** + * @return The value of this statistic. + */ + public long getValue() { + return value; + } + } + + private final String name; + + public StorageStatistics(String name) { + this.name = name; + } + + /** + * Get the name of this StorageStatistics object. + */ + public String getName() { + return name; + } + + /** + * Get an iterator over all the currently tracked long statistics. + * + * The values returned will depend on the type of FileSystem or FileContext + * object. The values do not necessarily reflect a snapshot in time. + */ + public abstract Iterator getLongStatistics(); + + /** + * Get the value of a statistic. + * + * @return null if the statistic is not being tracked or is not a + * long statistic. + * The value of the statistic, otherwise. + */ + public abstract Long getLong(String key); + + /** + * Return true if a statistic is being tracked. + * + * @return True only if the statistic is being tracked. + */ + public abstract boolean isTracked(String key); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java new file mode 100644 index 00000000000..d9783e6cde9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnionStorageStatistics.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A StorageStatistics instance which combines the outputs of several other + * StorageStatistics instances. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class UnionStorageStatistics extends StorageStatistics { + /** + * The underlying StorageStatistics. + */ + private final StorageStatistics[] stats; + + private class LongStatisticIterator implements Iterator { + private int statIdx; + + private Iterator cur; + + LongStatisticIterator() { + this.statIdx = 0; + this.cur = null; + } + + @Override + public boolean hasNext() { + return (getIter() != null); + } + + private Iterator getIter() { + while ((cur == null) || (!cur.hasNext())) { + if (stats.length >= statIdx) { + return null; + } + cur = stats[statIdx++].getLongStatistics(); + } + return cur; + } + + @Override + public LongStatistic next() { + Iterator iter = getIter(); + if (iter == null) { + throw new NoSuchElementException(); + } + return iter.next(); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + public UnionStorageStatistics(String name, StorageStatistics[] stats) { + super(name); + this.stats = stats; + } + + @Override + public Iterator getLongStatistics() { + return new LongStatisticIterator(); + } + + @Override + public Long getLong(String key) { + for (int i = 0; i < stats.length; i++) { + Long val = stats[i].getLong(key); + if (val != null) { + return val; + } + } + return null; + } + + /** + * Return true if a statistic is being tracked. + * + * @return True only if the statistic is being tracked. + */ + @Override + public boolean isTracked(String key) { + for (int i = 0; i < stats.length; i++) { + if (stats[i].isTracked(key)) { + return true; + } + } + return false; + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java index c42ebace669..1282af94adb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java @@ -131,6 +131,7 @@ public void primitiveMkdir(Path f, FsPermission absolutePermission, public Path fixRelativePart(Path p); public ContentSummary getContentSummary(Path f); public QuotaUsage getQuotaUsage(Path f); + StorageStatistics getStorageStatistics(); } @Test diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java index a8795cc9c5c..d2020b9b72d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestHarFileSystem.java @@ -221,6 +221,7 @@ public Collection getAllStoragePolicies() public Path getTrashRoot(Path path) throws IOException; public Collection getTrashRoots(boolean allUsers) throws IOException; + StorageStatistics getStorageStatistics(); } @Test diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java new file mode 100644 index 00000000000..d58a59f0c0b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOpsCountStatistics.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.fs.StorageStatistics; + +import java.util.EnumMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NoSuchElementException; +import java.util.concurrent.atomic.AtomicLong; + +/** + * This storage statistics tracks how many times each DFS operation was issued. + * + * For each tracked DFS operation, there is a respective entry in the enum + * {@link OpType}. To use, increment the value the {@link DistributedFileSystem} + * and {@link org.apache.hadoop.hdfs.web.WebHdfsFileSystem}. + * + * This class is thread safe, and is generally shared by multiple threads. + */ +public class DFSOpsCountStatistics extends StorageStatistics { + + /** This is for counting file system operations. */ + public enum OpType { + ALLOW_SNAPSHOT("allowSnapshot"), + APPEND("append"), + CONCAT("concat"), + COPY_FROM_LOCAL_FILE("copyFromLocalFile"), + CREATE("create"), + CREATE_NON_RECURSIVE("createNonRecursive"), + CREATE_SNAPSHOT("createSnapshot"), + CREATE_SYM_LINK("createSymlink"), + DELETE("delete"), + DELETE_SNAPSHOT("deleteSnapshot"), + DISALLOW_SNAPSHOT("disallowSnapshot"), + EXISTS("exists"), + GET_BYTES_WITH_FUTURE_GS("getBytesWithFutureGenerationStamps"), + GET_CONTENT_SUMMARY("getContentSummary"), + GET_FILE_BLOCK_LOCATIONS("getFileBlockLocations"), + GET_FILE_CHECKSUM("getFileChecksum"), + GET_FILE_LINK_STATUS("getFileLinkStatus"), + GET_FILE_STATUS("getFileStatus"), + GET_LINK_TARGET("getLinkTarget"), + GET_QUOTA_USAGE("getQuotaUsage"), + GET_STATUS("getStatus"), + GET_STORAGE_POLICIES("getStoragePolicies"), + GET_STORAGE_POLICY("getStoragePolicy"), + GET_XATTR("getXAttr"), + LIST_LOCATED_STATUS("listLocatedStatus"), + LIST_STATUS("listStatus"), + MKDIRS("mkdirs"), + MODIFY_ACL_ENTRIES("modifyAclEntries"), + OPEN("open"), + PRIMITIVE_CREATE("primitiveCreate"), + PRIMITIVE_MKDIR("primitiveMkdir"), + REMOVE_ACL("removeAcl"), + REMOVE_ACL_ENTRIES("removeAclEntries"), + REMOVE_DEFAULT_ACL("removeDefaultAcl"), + REMOVE_XATTR("removeXAttr"), + RENAME("rename"), + RENAME_SNAPSHOT("renameSnapshot"), + RESOLVE_LINK("resolveLink"), + SET_ACL("setAcl"), + SET_OWNER("setOwner"), + SET_PERMISSION("setPermission"), + SET_REPLICATION("setReplication"), + SET_STORAGE_POLICY("setStoragePolicy"), + SET_TIMES("setTimes"), + SET_XATTR("setXAttr"), + TRUNCATE("truncate"), + UNSET_STORAGE_POLICY("unsetStoragePolicy"); + + private final String symbol; + + OpType(String symbol) { + this.symbol = symbol; + } + + public String getSymbol() { + return symbol; + } + + public static OpType fromSymbol(String symbol) { + if (symbol != null) { + for (OpType opType : values()) { + if (opType.getSymbol().equals(symbol)) { + return opType; + } + } + } + return null; + } + } + + public static final String NAME = "DFSOpsCountStatistics"; + + private final Map opsCount = new EnumMap<>(OpType.class); + + public DFSOpsCountStatistics() { + super(NAME); + for (OpType opType : OpType.values()) { + opsCount.put(opType, new AtomicLong(0)); + } + } + + public void incrementOpCounter(OpType op) { + opsCount.get(op).addAndGet(1); + } + + private class LongIterator implements Iterator { + private Iterator> iterator = + opsCount.entrySet().iterator(); + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public LongStatistic next() { + if (!iterator.hasNext()) { + throw new NoSuchElementException(); + } + final Entry entry = iterator.next(); + return new LongStatistic(entry.getKey().name(), entry.getValue().get()); + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + + @Override + public Iterator getLongStatistics() { + return new LongIterator(); + } + + @Override + public Long getLong(String key) { + final OpType type = OpType.fromSymbol(key); + return type == null ? null : opsCount.get(type).get(); + } + + @Override + public boolean isTracked(String key) { + return OpType.fromSymbol(key) == null; + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 8442a6bd120..7fc767fe7a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -48,8 +48,11 @@ import org.apache.hadoop.fs.FileSystemLinkResolver; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.GlobalStorageStatistics; +import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.StorageStatistics; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; @@ -67,6 +70,7 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.impl.CorruptFileBlockIterator; +import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -96,7 +100,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; - /**************************************************************** * Implementation of the abstract FileSystem for the DFS system. * This object is the way end-user code interacts with a Hadoop @@ -114,6 +117,8 @@ public class DistributedFileSystem extends FileSystem { DFSClient dfs; private boolean verifyChecksum = true; + private DFSOpsCountStatistics storageStatistics; + static{ HdfsConfiguration.init(); } @@ -151,6 +156,15 @@ public void initialize(URI uri, Configuration conf) throws IOException { this.dfs = new DFSClient(uri, conf, statistics); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); this.workingDir = getHomeDirectory(); + + storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE + .put(DFSOpsCountStatistics.NAME, + new StorageStatisticsProvider() { + @Override + public StorageStatistics provide() { + return new DFSOpsCountStatistics(); + } + }); } @Override @@ -215,6 +229,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, public BlockLocation[] getFileBlockLocations(Path p, final long start, final long len) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS); final Path absF = fixRelativePart(p); return new FileSystemLinkResolver() { @Override @@ -301,6 +316,7 @@ public Boolean next(final FileSystem fs, final Path p) public FSDataInputStream open(Path f, final int bufferSize) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.OPEN); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -337,6 +353,7 @@ public FSDataOutputStream append(Path f, final int bufferSize, public FSDataOutputStream append(Path f, final EnumSet flag, final int bufferSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.APPEND); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -369,6 +386,7 @@ public FSDataOutputStream append(Path f, final EnumSet flag, final int bufferSize, final Progressable progress, final InetSocketAddress[] favoredNodes) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.APPEND); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -412,6 +430,7 @@ public HdfsDataOutputStream create(final Path f, final Progressable progress, final InetSocketAddress[] favoredNodes) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -445,6 +464,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -469,6 +489,7 @@ protected HdfsDataOutputStream primitiveCreate(Path f, short replication, long blockSize, Progressable progress, ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.PRIMITIVE_CREATE); final DFSOutputStream dfsos = dfs.primitiveCreate( getPathName(fixRelativePart(f)), absolutePermission, flag, true, replication, blockSize, @@ -485,6 +506,7 @@ public FSDataOutputStream createNonRecursive(final Path f, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE); if (flag.contains(CreateFlag.OVERWRITE)) { flag.add(CreateFlag.CREATE); } @@ -510,6 +532,7 @@ public FSDataOutputStream next(final FileSystem fs, final Path p) public boolean setReplication(Path src, final short replication) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_REPLICATION); Path absF = fixRelativePart(src); return new FileSystemLinkResolver() { @Override @@ -534,6 +557,7 @@ public Boolean next(final FileSystem fs, final Path p) public void setStoragePolicy(final Path src, final String policyName) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_STORAGE_POLICY); Path absF = fixRelativePart(src); new FileSystemLinkResolver() { @Override @@ -554,6 +578,7 @@ public Void next(final FileSystem fs, final Path p) public void unsetStoragePolicy(final Path src) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.UNSET_STORAGE_POLICY); Path absF = fixRelativePart(src); new FileSystemLinkResolver() { @Override @@ -578,6 +603,7 @@ public Void next(final FileSystem fs, final Path p) throws IOException { @Override public BlockStoragePolicySpi getStoragePolicy(Path path) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICY); Path absF = fixRelativePart(path); return new FileSystemLinkResolver() { @@ -608,6 +634,7 @@ public Collection getAllStoragePolicies() */ public long getBytesWithFutureGenerationStamps() throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_BYTES_WITH_FUTURE_GS); return dfs.getBytesInFutureBlocks(); } @@ -618,6 +645,7 @@ public long getBytesWithFutureGenerationStamps() throws IOException { @Deprecated public BlockStoragePolicy[] getStoragePolicies() throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_STORAGE_POLICIES); return dfs.getStoragePolicies(); } @@ -632,6 +660,7 @@ public BlockStoragePolicy[] getStoragePolicies() throws IOException { @Override public void concat(Path trg, Path [] psrcs) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CONCAT); // Make target absolute Path absF = fixRelativePart(trg); // Make all srcs absolute @@ -676,6 +705,7 @@ public void concat(Path trg, Path [] psrcs) throws IOException { @Override public boolean rename(Path src, Path dst) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME); final Path absSrc = fixRelativePart(src); final Path absDst = fixRelativePart(dst); @@ -710,6 +740,7 @@ public Boolean next(final FileSystem fs, final Path p) public void rename(Path src, Path dst, final Options.Rename... options) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME); final Path absSrc = fixRelativePart(src); final Path absDst = fixRelativePart(dst); // Try the rename without resolving first @@ -738,6 +769,7 @@ public Void next(final FileSystem fs, final Path p) @Override public boolean truncate(Path f, final long newLength) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.TRUNCATE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -755,6 +787,7 @@ public Boolean next(final FileSystem fs, final Path p) @Override public boolean delete(Path f, final boolean recursive) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DELETE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -772,6 +805,7 @@ public Boolean next(final FileSystem fs, final Path p) @Override public ContentSummary getContentSummary(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -789,6 +823,7 @@ public ContentSummary next(final FileSystem fs, final Path p) @Override public QuotaUsage getQuotaUsage(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_QUOTA_USAGE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -873,6 +908,7 @@ private FileStatus[] listStatusInternal(Path p) throws IOException { stats[i] = partialListing[i].makeQualified(getUri(), p); } statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_STATUS); return stats; } @@ -887,6 +923,7 @@ private FileStatus[] listStatusInternal(Path p) throws IOException { listing.add(fileStatus.makeQualified(getUri(), p)); } statistics.incrementLargeReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_STATUS); // now fetch more entries do { @@ -901,6 +938,7 @@ private FileStatus[] listStatusInternal(Path p) throws IOException { listing.add(fileStatus.makeQualified(getUri(), p)); } statistics.incrementLargeReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_STATUS); } while (thisListing.hasMore()); return listing.toArray(new FileStatus[listing.size()]); @@ -1014,6 +1052,7 @@ private DirListingIterator(Path p, PathFilter filter, thisListing = dfs.listPaths(src, HdfsFileStatus.EMPTY_NAME, needLocation); statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_LOCATED_STATUS); if (thisListing == null) { // the directory does not exist throw new FileNotFoundException("File " + p + " does not exist."); } @@ -1109,6 +1148,7 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException { private boolean mkdirsInternal(Path f, final FsPermission permission, final boolean createParent) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MKDIRS); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1135,6 +1175,7 @@ public Boolean next(final FileSystem fs, final Path p) protected boolean primitiveMkdir(Path f, FsPermission absolutePermission) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.PRIMITIVE_MKDIR); return dfs.primitiveMkdir(getPathName(f), absolutePermission); } @@ -1180,6 +1221,7 @@ public long getDfsUsed() { @Override public FsStatus getStatus(Path p) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_STATUS); return dfs.getDiskStatus(); } @@ -1387,6 +1429,7 @@ public FsServerDefaults getServerDefaults() throws IOException { @Override public FileStatus getFileStatus(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1414,6 +1457,7 @@ public void createSymlink(final Path target, final Path link, throw new UnsupportedOperationException("Symlinks not supported"); } statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK); final Path absF = fixRelativePart(link); new FileSystemLinkResolver() { @Override @@ -1437,6 +1481,7 @@ public boolean supportsSymlinks() { @Override public FileStatus getFileLinkStatus(final Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_LINK_STATUS); final Path absF = fixRelativePart(f); FileStatus status = new FileSystemLinkResolver() { @Override @@ -1466,6 +1511,7 @@ public FileStatus next(final FileSystem fs, final Path p) @Override public Path getLinkTarget(final Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_LINK_TARGET); final Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1487,6 +1533,7 @@ public Path next(final FileSystem fs, final Path p) throws IOException { @Override protected Path resolveLink(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.RESOLVE_LINK); String target = dfs.getLinkTarget(getPathName(fixRelativePart(f))); if (target == null) { throw new FileNotFoundException("File does not exist: " + f.toString()); @@ -1497,6 +1544,7 @@ protected Path resolveLink(Path f) throws IOException { @Override public FileChecksum getFileChecksum(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1516,6 +1564,7 @@ public FileChecksum next(final FileSystem fs, final Path p) public FileChecksum getFileChecksum(Path f, final long length) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM); Path absF = fixRelativePart(f); return new FileSystemLinkResolver() { @Override @@ -1541,6 +1590,7 @@ public FileChecksum next(final FileSystem fs, final Path p) public void setPermission(Path p, final FsPermission permission ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_PERMISSION); Path absF = fixRelativePart(p); new FileSystemLinkResolver() { @Override @@ -1565,6 +1615,7 @@ public void setOwner(Path p, final String username, final String groupname) throw new IOException("username == null && groupname == null"); } statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_OWNER); Path absF = fixRelativePart(p); new FileSystemLinkResolver() { @Override @@ -1586,6 +1637,7 @@ public Void next(final FileSystem fs, final Path p) public void setTimes(Path p, final long mtime, final long atime) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_TIMES); Path absF = fixRelativePart(p); new FileSystemLinkResolver() { @Override @@ -1663,6 +1715,8 @@ public boolean isInSafeMode() throws IOException { /** @see HdfsAdmin#allowSnapshot(Path) */ public void allowSnapshot(final Path path) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.ALLOW_SNAPSHOT); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -1689,6 +1743,8 @@ public Void next(final FileSystem fs, final Path p) /** @see HdfsAdmin#disallowSnapshot(Path) */ public void disallowSnapshot(final Path path) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DISALLOW_SNAPSHOT); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -1716,6 +1772,8 @@ public Void next(final FileSystem fs, final Path p) @Override public Path createSnapshot(final Path path, final String snapshotName) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT); Path absF = fixRelativePart(path); return new FileSystemLinkResolver() { @Override @@ -1741,6 +1799,8 @@ public Path next(final FileSystem fs, final Path p) @Override public void renameSnapshot(final Path path, final String snapshotOldName, final String snapshotNewName) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -1777,6 +1837,8 @@ public SnapshottableDirectoryStatus[] getSnapshottableDirListing() @Override public void deleteSnapshot(final Path snapshotDir, final String snapshotName) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT); Path absF = fixRelativePart(snapshotDir); new FileSystemLinkResolver() { @Override @@ -2024,6 +2086,8 @@ public RemoteIterator listCachePools() throws IOException { @Override public void modifyAclEntries(Path path, final List aclSpec) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2046,6 +2110,8 @@ public Void next(final FileSystem fs, final Path p) throws IOException { @Override public void removeAclEntries(Path path, final List aclSpec) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2067,6 +2133,8 @@ public Void next(final FileSystem fs, final Path p) throws IOException { */ @Override public void removeDefaultAcl(Path path) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL); final Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2087,6 +2155,8 @@ public Void next(final FileSystem fs, final Path p) throws IOException { */ @Override public void removeAcl(Path path) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_ACL); final Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2108,6 +2178,8 @@ public Void next(final FileSystem fs, final Path p) throws IOException { @Override public void setAcl(Path path, final List aclSpec) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_ACL); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2206,6 +2278,8 @@ public RemoteIterator listEncryptionZones() @Override public void setXAttr(Path path, final String name, final byte[] value, final EnumSet flag) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_XATTR); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @@ -2225,6 +2299,8 @@ public Void next(final FileSystem fs, final Path p) throws IOException { @Override public byte[] getXAttr(Path path, final String name) throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_XATTR); final Path absF = fixRelativePart(path); return new FileSystemLinkResolver() { @Override @@ -2290,6 +2366,8 @@ public List next(final FileSystem fs, final Path p) @Override public void removeXAttr(Path path, final String name) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR); Path absF = fixRelativePart(path); new FileSystemLinkResolver() { @Override @@ -2446,4 +2524,5 @@ protected Path fixRelativePart(Path p) { Statistics getFsStatistics() { return statistics; } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java index bf1602b05e3..55639aee89e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java @@ -62,6 +62,11 @@ import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.GlobalStorageStatistics; +import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider; +import org.apache.hadoop.fs.StorageStatistics; +import org.apache.hadoop.hdfs.DFSOpsCountStatistics; +import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options; import org.apache.hadoop.fs.Path; @@ -143,6 +148,8 @@ public class WebHdfsFileSystem extends FileSystem private static final ObjectReader READER = new ObjectMapper().reader(Map.class); + private DFSOpsCountStatistics storageStatistics; + /** * Return the protocol scheme for the FileSystem. *

@@ -239,6 +246,15 @@ public synchronized void initialize(URI uri, Configuration conf CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); this.initializeRestCsrf(conf); this.delegationToken = null; + + storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE + .put(DFSOpsCountStatistics.NAME, + new StorageStatisticsProvider() { + @Override + public StorageStatistics provide() { + return new DFSOpsCountStatistics(); + } + }); } /** @@ -983,6 +999,7 @@ HdfsFileStatus decodeResponse(Map json) { @Override public FileStatus getFileStatus(Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_STATUS); return makeQualified(getHdfsFileStatus(f), f); } @@ -1012,6 +1029,7 @@ AclStatus decodeResponse(Map json) { @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MKDIRS); final HttpOpParam.Op op = PutOpParam.Op.MKDIRS; return new FsPathBooleanRunner(op, f, new PermissionParam(applyUMask(permission)) @@ -1024,6 +1042,7 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException { public void createSymlink(Path destination, Path f, boolean createParent ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_SYM_LINK); final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK; new FsPathRunner(op, f, new DestinationParam(makeQualified(destination).toUri().getPath()), @@ -1034,6 +1053,7 @@ public void createSymlink(Path destination, Path f, boolean createParent @Override public boolean rename(final Path src, final Path dst) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME); final HttpOpParam.Op op = PutOpParam.Op.RENAME; return new FsPathBooleanRunner(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()) @@ -1045,6 +1065,7 @@ public boolean rename(final Path src, final Path dst) throws IOException { public void rename(final Path src, final Path dst, final Options.Rename... options) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME); final HttpOpParam.Op op = PutOpParam.Op.RENAME; new FsPathRunner(op, src, new DestinationParam(makeQualified(dst).toUri().getPath()), @@ -1056,6 +1077,7 @@ public void rename(final Path src, final Path dst, public void setXAttr(Path p, String name, byte[] value, EnumSet flag) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_XATTR); final HttpOpParam.Op op = PutOpParam.Op.SETXATTR; if (value != null) { new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam( @@ -1069,6 +1091,8 @@ public void setXAttr(Path p, String name, byte[] value, @Override public byte[] getXAttr(Path p, final String name) throws IOException { + statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_XATTR); final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS; return new FsPathResponseRunner(op, p, new XAttrNameParam(name), new XAttrEncodingParam(XAttrCodec.HEX)) { @@ -1125,6 +1149,7 @@ List decodeResponse(Map json) throws IOException { @Override public void removeXAttr(Path p, String name) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_XATTR); final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR; new FsPathRunner(op, p, new XAttrNameParam(name)).run(); } @@ -1137,6 +1162,7 @@ public void setOwner(final Path p, final String owner, final String group } statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_OWNER); final HttpOpParam.Op op = PutOpParam.Op.SETOWNER; new FsPathRunner(op, p, new OwnerParam(owner), new GroupParam(group) @@ -1147,6 +1173,7 @@ public void setOwner(final Path p, final String owner, final String group public void setPermission(final Path p, final FsPermission permission ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_PERMISSION); final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION; new FsPathRunner(op, p,new PermissionParam(permission)).run(); } @@ -1155,6 +1182,7 @@ public void setPermission(final Path p, final FsPermission permission public void modifyAclEntries(Path path, List aclSpec) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.MODIFY_ACL_ENTRIES); final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES; new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); } @@ -1163,6 +1191,7 @@ public void modifyAclEntries(Path path, List aclSpec) public void removeAclEntries(Path path, List aclSpec) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_ACL_ENTRIES); final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES; new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run(); } @@ -1170,6 +1199,7 @@ public void removeAclEntries(Path path, List aclSpec) @Override public void removeDefaultAcl(Path path) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_DEFAULT_ACL); final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL; new FsPathRunner(op, path).run(); } @@ -1177,6 +1207,7 @@ public void removeDefaultAcl(Path path) throws IOException { @Override public void removeAcl(Path path) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.REMOVE_ACL); final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL; new FsPathRunner(op, path).run(); } @@ -1185,6 +1216,7 @@ public void removeAcl(Path path) throws IOException { public void setAcl(final Path p, final List aclSpec) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_ACL); final HttpOpParam.Op op = PutOpParam.Op.SETACL; new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run(); } @@ -1193,6 +1225,7 @@ public void setAcl(final Path p, final List aclSpec) public Path createSnapshot(final Path path, final String snapshotName) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_SNAPSHOT); final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT; return new FsPathResponseRunner(op, path, new SnapshotNameParam(snapshotName)) { @@ -1207,6 +1240,7 @@ Path decodeResponse(Map json) { public void deleteSnapshot(final Path path, final String snapshotName) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DELETE_SNAPSHOT); final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT; new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run(); } @@ -1215,6 +1249,7 @@ public void deleteSnapshot(final Path path, final String snapshotName) public void renameSnapshot(final Path path, final String snapshotOldName, final String snapshotNewName) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.RENAME_SNAPSHOT); final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT; new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName), new SnapshotNameParam(snapshotNewName)).run(); @@ -1224,6 +1259,7 @@ public void renameSnapshot(final Path path, final String snapshotOldName, public boolean setReplication(final Path p, final short replication ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_REPLICATION); final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION; return new FsPathBooleanRunner(op, p, new ReplicationParam(replication) @@ -1234,6 +1270,7 @@ public boolean setReplication(final Path p, final short replication public void setTimes(final Path p, final long mtime, final long atime ) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.SET_TIMES); final HttpOpParam.Op op = PutOpParam.Op.SETTIMES; new FsPathRunner(op, p, new ModificationTimeParam(mtime), @@ -1256,6 +1293,7 @@ public short getDefaultReplication() { @Override public void concat(final Path trg, final Path [] srcs) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CONCAT); final HttpOpParam.Op op = PostOpParam.Op.CONCAT; new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run(); } @@ -1265,6 +1303,7 @@ public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE); final HttpOpParam.Op op = PutOpParam.Op.CREATE; return new FsPathOutputStreamRunner(op, f, bufferSize, @@ -1282,6 +1321,7 @@ public FSDataOutputStream createNonRecursive(final Path f, final int bufferSize, final short replication, final long blockSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE_NON_RECURSIVE); final HttpOpParam.Op op = PutOpParam.Op.CREATE; return new FsPathOutputStreamRunner(op, f, bufferSize, @@ -1298,6 +1338,7 @@ public FSDataOutputStream createNonRecursive(final Path f, public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.APPEND); final HttpOpParam.Op op = PostOpParam.Op.APPEND; return new FsPathOutputStreamRunner(op, f, bufferSize, @@ -1308,6 +1349,7 @@ public FSDataOutputStream append(final Path f, final int bufferSize, @Override public boolean truncate(Path f, long newLength) throws IOException { statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.TRUNCATE); final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE; return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run(); @@ -1315,6 +1357,8 @@ public boolean truncate(Path f, long newLength) throws IOException { @Override public boolean delete(Path f, boolean recursive) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.DELETE); final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; return new FsPathBooleanRunner(op, f, new RecursiveParam(recursive) @@ -1325,6 +1369,7 @@ public boolean delete(Path f, boolean recursive) throws IOException { public FSDataInputStream open(final Path f, final int bufferSize ) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.OPEN); return new FSDataInputStream(new WebHdfsInputStream(f, bufferSize)); } @@ -1424,6 +1469,7 @@ protected URL getResolvedUrl(final HttpURLConnection connection @Override public FileStatus[] listStatus(final Path f) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.LIST_STATUS); final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS; return new FsPathResponseRunner(op, f) { @@ -1519,6 +1565,7 @@ public BlockLocation[] getFileBlockLocations(final FileStatus status, public BlockLocation[] getFileBlockLocations(final Path p, final long offset, final long length) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_BLOCK_LOCATIONS); final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS; return new FsPathResponseRunner(op, p, @@ -1540,6 +1587,7 @@ public void access(final Path path, final FsAction mode) throws IOException { @Override public ContentSummary getContentSummary(final Path p) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_CONTENT_SUMMARY); final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY; return new FsPathResponseRunner(op, p) { @@ -1554,6 +1602,7 @@ ContentSummary decodeResponse(Map json) { public MD5MD5CRC32FileChecksum getFileChecksum(final Path p ) throws IOException { statistics.incrementReadOps(1); + storageStatistics.incrementOpCounter(OpType.GET_FILE_CHECKSUM); final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM; return new FsPathResponseRunner(op, p) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index d5a10f261ed..e4a99d2856c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -42,10 +42,15 @@ import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Random; import java.util.Set; import org.apache.commons.lang.ArrayUtils; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; @@ -54,28 +59,34 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.GlobalStorageStatistics; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.StorageStatistics.LongStatistic; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.impl.LeaseRenewer; +import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType; import org.apache.hadoop.hdfs.net.Peer; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.web.HftpFileSystem; +import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -83,6 +94,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.log4j.Level; import org.junit.Assert; import org.junit.Test; @@ -93,9 +105,15 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import org.mockito.internal.util.reflection.Whitebox; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestDistributedFileSystem { private static final Random RAN = new Random(); + private static final Logger LOG = LoggerFactory.getLogger( + TestDistributedFileSystem.class); static { GenericTestUtils.setLogLevel(DFSClient.LOG, Level.ALL); @@ -416,54 +434,84 @@ public void testDFSClient() throws Exception { } @Test - public void testStatistics() throws Exception { + public void testStatistics() throws IOException { + FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME, + DistributedFileSystem.class).reset(); + @SuppressWarnings("unchecked") + ThreadLocal data = (ThreadLocal) + Whitebox.getInternalState( + FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME, + DistributedFileSystem.class), "threadData"); + data.set(null); + int lsLimit = 2; final Configuration conf = getTestConfiguration(); conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, lsLimit); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); try { + cluster.waitActive(); final FileSystem fs = cluster.getFileSystem(); Path dir = new Path("/test"); Path file = new Path(dir, "file"); - - int readOps = DFSTestUtil.getStatistics(fs).getReadOps(); - int writeOps = DFSTestUtil.getStatistics(fs).getWriteOps(); - int largeReadOps = DFSTestUtil.getStatistics(fs).getLargeReadOps(); + + int readOps = 0; + int writeOps = 0; + int largeReadOps = 0; + + long opCount = getOpStatistics(OpType.MKDIRS); fs.mkdirs(dir); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.MKDIRS, opCount + 1); + opCount = getOpStatistics(OpType.CREATE); FSDataOutputStream out = fs.create(file, (short)1); out.close(); checkStatistics(fs, readOps, ++writeOps, largeReadOps); - + checkOpStatistics(OpType.CREATE, opCount + 1); + + opCount = getOpStatistics(OpType.GET_FILE_STATUS); FileStatus status = fs.getFileStatus(file); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.GET_FILE_STATUS, opCount + 1); + opCount = getOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS); fs.getFileBlockLocations(file, 0, 0); checkStatistics(fs, ++readOps, writeOps, largeReadOps); - + checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 1); fs.getFileBlockLocations(status, 0, 0); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.GET_FILE_BLOCK_LOCATIONS, opCount + 2); + opCount = getOpStatistics(OpType.OPEN); FSDataInputStream in = fs.open(file); in.close(); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.OPEN, opCount + 1); + opCount = getOpStatistics(OpType.SET_REPLICATION); fs.setReplication(file, (short)2); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.SET_REPLICATION, opCount + 1); + opCount = getOpStatistics(OpType.RENAME); Path file1 = new Path(dir, "file1"); fs.rename(file, file1); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.RENAME, opCount + 1); + opCount = getOpStatistics(OpType.GET_CONTENT_SUMMARY); fs.getContentSummary(file1); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.GET_CONTENT_SUMMARY, opCount + 1); // Iterative ls test + long mkdirOp = getOpStatistics(OpType.MKDIRS); + long listStatusOp = getOpStatistics(OpType.LIST_STATUS); for (int i = 0; i < 10; i++) { Path p = new Path(dir, Integer.toString(i)); fs.mkdirs(p); + mkdirOp++; FileStatus[] list = fs.listStatus(dir); if (list.length > lsLimit) { // if large directory, then count readOps and largeReadOps by @@ -471,41 +519,131 @@ public void testStatistics() throws Exception { int iterations = (int)Math.ceil((double)list.length/lsLimit); largeReadOps += iterations; readOps += iterations; + listStatusOp += iterations; } else { // Single iteration in listStatus - no large read operation done readOps++; + listStatusOp++; } // writeOps incremented by 1 for mkdirs // readOps and largeReadOps incremented by 1 or more checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.MKDIRS, mkdirOp); + checkOpStatistics(OpType.LIST_STATUS, listStatusOp); } + opCount = getOpStatistics(OpType.GET_STATUS); fs.getStatus(file1); checkStatistics(fs, ++readOps, writeOps, largeReadOps); - + checkOpStatistics(OpType.GET_STATUS, opCount + 1); + + opCount = getOpStatistics(OpType.GET_FILE_CHECKSUM); fs.getFileChecksum(file1); checkStatistics(fs, ++readOps, writeOps, largeReadOps); + checkOpStatistics(OpType.GET_FILE_CHECKSUM, opCount + 1); + opCount = getOpStatistics(OpType.SET_PERMISSION); fs.setPermission(file1, new FsPermission((short)0777)); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.SET_PERMISSION, opCount + 1); + opCount = getOpStatistics(OpType.SET_TIMES); fs.setTimes(file1, 0L, 0L); checkStatistics(fs, readOps, ++writeOps, largeReadOps); - + checkOpStatistics(OpType.SET_TIMES, opCount + 1); + + opCount = getOpStatistics(OpType.SET_OWNER); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); fs.setOwner(file1, ugi.getUserName(), ugi.getGroupNames()[0]); + checkOpStatistics(OpType.SET_OWNER, opCount + 1); checkStatistics(fs, readOps, ++writeOps, largeReadOps); - + + opCount = getOpStatistics(OpType.DELETE); fs.delete(dir, true); checkStatistics(fs, readOps, ++writeOps, largeReadOps); + checkOpStatistics(OpType.DELETE, opCount + 1); } finally { if (cluster != null) cluster.shutdown(); } } - + + @SuppressWarnings("ThrowableResultOfMethodCallIgnored") + @Test (timeout = 180000) + public void testConcurrentStatistics() + throws IOException, InterruptedException { + FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME, + DistributedFileSystem.class).reset(); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder( + new Configuration()).build(); + cluster.waitActive(); + final FileSystem fs = cluster.getFileSystem(); + final int numThreads = 5; + final ExecutorService threadPool = + HadoopExecutors.newFixedThreadPool(numThreads); + + try { + final CountDownLatch allExecutorThreadsReady = + new CountDownLatch(numThreads); + final CountDownLatch startBlocker = new CountDownLatch(1); + final CountDownLatch allDone = new CountDownLatch(numThreads); + final AtomicReference childError = new AtomicReference<>(); + + for (int i = 0; i < numThreads; i++) { + threadPool.submit(new Runnable() { + @Override + public void run() { + allExecutorThreadsReady.countDown(); + try { + startBlocker.await(); + final FileSystem fs = cluster.getFileSystem(); + fs.mkdirs(new Path("/testStatisticsParallelChild")); + } catch (Throwable t) { + LOG.error("Child failed when calling mkdir", t); + childError.compareAndSet(null, t); + } finally { + allDone.countDown(); + } + } + }); + } + + final long oldMkdirOpCount = getOpStatistics(OpType.MKDIRS); + + // wait until all threads are ready + allExecutorThreadsReady.await(); + // all threads start making directories + startBlocker.countDown(); + // wait until all threads are done + allDone.await(); + + assertNull("Child failed with exception " + childError.get(), + childError.get()); + + checkStatistics(fs, 0, numThreads, 0); + // check the single operation count stat + checkOpStatistics(OpType.MKDIRS, numThreads + oldMkdirOpCount); + // iterate all the operation counts + for (Iterator opCountIter = + FileSystem.getGlobalStorageStatistics() + .get(DFSOpsCountStatistics.NAME).getLongStatistics(); + opCountIter.hasNext();) { + final LongStatistic opCount = opCountIter.next(); + if (OpType.MKDIRS.getSymbol().equals(opCount.getName())) { + assertEquals("Unexpected op count from iterator!", + numThreads + oldMkdirOpCount, opCount.getValue()); + } + LOG.info(opCount.getName() + "\t" + opCount.getValue()); + } + } finally { + threadPool.shutdownNow(); + cluster.shutdown(); + } + } + /** Checks statistics. -1 indicates do not check for the operations */ private void checkStatistics(FileSystem fs, int readOps, int writeOps, int largeReadOps) { @@ -575,6 +713,17 @@ private void testReadFileSystemStatistics(int expectedDistance) } } + private static void checkOpStatistics(OpType op, long count) { + assertEquals("Op " + op.getSymbol() + " has unexpected count!", + count, getOpStatistics(op)); + } + + private static long getOpStatistics(OpType op) { + return GlobalStorageStatistics.INSTANCE.get( + DFSOpsCountStatistics.NAME) + .getLong(op.getSymbol()); + } + @Test public void testFileChecksum() throws Exception { GenericTestUtils.setLogLevel(HftpFileSystem.LOG, Level.ALL);