HDFS-14192. Track missing DFS operations in Statistics and StorageStatistics. Contributed by Ayush Saxena.

(cherry picked from commit f048512bb8)
(cherry picked from commit e59ced9c60a7007551ee8a9f83ce8e266e4dbae1)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
This commit is contained in:
Inigo Goiri 2019-01-16 10:14:22 -08:00 committed by Wei-Chiu Chuang
parent a400f396a6
commit 2a3eb04996
3 changed files with 147 additions and 0 deletions

View File

@ -41,12 +41,15 @@ public class DFSOpsCountStatistics extends StorageStatistics {
/** This is for counting distributed file system operations. */
public enum OpType {
ADD_CACHE_DIRECTIVE("op_add_cache_directive"),
ADD_CACHE_POOL("op_add_cache_pool"),
ADD_EC_POLICY("op_add_ec_policy"),
ALLOW_SNAPSHOT("op_allow_snapshot"),
APPEND(CommonStatisticNames.OP_APPEND),
CONCAT("op_concat"),
COPY_FROM_LOCAL_FILE(CommonStatisticNames.OP_COPY_FROM_LOCAL_FILE),
CREATE(CommonStatisticNames.OP_CREATE),
CREATE_ENCRYPTION_ZONE("op_create_encryption_zone"),
CREATE_NON_RECURSIVE(CommonStatisticNames.OP_CREATE_NON_RECURSIVE),
CREATE_SNAPSHOT("op_create_snapshot"),
CREATE_SYM_LINK("op_create_symlink"),
@ -61,6 +64,7 @@ public class DFSOpsCountStatistics extends StorageStatistics {
GET_EC_CODECS("op_get_ec_codecs"),
GET_EC_POLICY("op_get_ec_policy"),
GET_EC_POLICIES("op_get_ec_policies"),
GET_ENCRYPTION_ZONE("op_get_encryption_zone"),
GET_FILE_BLOCK_LOCATIONS("op_get_file_block_locations"),
GET_FILE_CHECKSUM(CommonStatisticNames.OP_GET_FILE_CHECKSUM),
GET_FILE_LINK_STATUS("op_get_file_link_status"),
@ -72,8 +76,13 @@ public class DFSOpsCountStatistics extends StorageStatistics {
GET_STORAGE_POLICY("op_get_storage_policy"),
GET_TRASH_ROOT("op_get_trash_root"),
GET_XATTR("op_get_xattr"),
LIST_CACHE_DIRECTIVE("op_list_cache_directive"),
LIST_CACHE_POOL("op_list_cache_pool"),
LIST_ENCRYPTION_ZONE("op_list_encryption_zone"),
LIST_LOCATED_STATUS(CommonStatisticNames.OP_LIST_LOCATED_STATUS),
LIST_STATUS(CommonStatisticNames.OP_LIST_STATUS),
MODIFY_CACHE_POOL("op_modify_cache_pool"),
MODIFY_CACHE_DIRECTIVE("op_modify_cache_directive"),
MKDIRS(CommonStatisticNames.OP_MKDIRS),
MODIFY_ACL_ENTRIES(CommonStatisticNames.OP_MODIFY_ACL_ENTRIES),
OPEN(CommonStatisticNames.OP_OPEN),
@ -81,6 +90,8 @@ public class DFSOpsCountStatistics extends StorageStatistics {
PRIMITIVE_MKDIR("op_primitive_mkdir"),
REMOVE_ACL(CommonStatisticNames.OP_REMOVE_ACL),
REMOVE_ACL_ENTRIES(CommonStatisticNames.OP_REMOVE_ACL_ENTRIES),
REMOVE_CACHE_DIRECTIVE("op_remove_cache_directive"),
REMOVE_CACHE_POOL("op_remove_cache_pool"),
REMOVE_DEFAULT_ACL(CommonStatisticNames.OP_REMOVE_DEFAULT_ACL),
REMOVE_EC_POLICY("op_remove_ec_policy"),
REMOVE_XATTR("op_remove_xattr"),
@ -91,6 +102,8 @@ public class DFSOpsCountStatistics extends StorageStatistics {
SET_EC_POLICY("op_set_ec_policy"),
SET_OWNER(CommonStatisticNames.OP_SET_OWNER),
SET_PERMISSION(CommonStatisticNames.OP_SET_PERMISSION),
SET_QUOTA_BYTSTORAGEYPE("op_set_quota_bystoragetype"),
SET_QUOTA_USAGE("op_set_quota_usage"),
SET_REPLICATION("op_set_replication"),
SET_STORAGE_POLICY("op_set_storagePolicy"),
SET_TIMES(CommonStatisticNames.OP_SET_TIMES),

View File

@ -997,6 +997,8 @@ public class DistributedFileSystem extends FileSystem
*/
public void setQuota(Path src, final long namespaceQuota,
final long storagespaceQuota) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_QUOTA_USAGE);
Path absF = fixRelativePart(src);
new FileSystemLinkResolver<Void>() {
@Override
@ -1025,6 +1027,8 @@ public class DistributedFileSystem extends FileSystem
public void setQuotaByStorageType(Path src, final StorageType type,
final long quota)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.SET_QUOTA_BYTSTORAGEYPE);
Path absF = fixRelativePart(src);
new FileSystemLinkResolver<Void>() {
@Override
@ -2220,6 +2224,8 @@ public class DistributedFileSystem extends FileSystem
*/
public long addCacheDirective(
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.ADD_CACHE_DIRECTIVE);
Preconditions.checkNotNull(info.getPath());
Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
makeQualified(getUri(), getWorkingDirectory());
@ -2247,6 +2253,8 @@ public class DistributedFileSystem extends FileSystem
*/
public void modifyCacheDirective(
CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MODIFY_CACHE_DIRECTIVE);
if (info.getPath() != null) {
info = new CacheDirectiveInfo.Builder(info).
setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
@ -2263,6 +2271,8 @@ public class DistributedFileSystem extends FileSystem
*/
public void removeCacheDirective(long id)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_CACHE_DIRECTIVE);
dfs.removeCacheDirective(id);
}
@ -2275,6 +2285,8 @@ public class DistributedFileSystem extends FileSystem
*/
public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(
CacheDirectiveInfo filter) throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_CACHE_DIRECTIVE);
if (filter == null) {
filter = new CacheDirectiveInfo.Builder().build();
}
@ -2315,6 +2327,8 @@ public class DistributedFileSystem extends FileSystem
* If the request could not be completed.
*/
public void addCachePool(CachePoolInfo info) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.ADD_CACHE_POOL);
CachePoolInfo.validate(info);
dfs.addCachePool(info);
}
@ -2328,6 +2342,8 @@ public class DistributedFileSystem extends FileSystem
* If the request could not be completed.
*/
public void modifyCachePool(CachePoolInfo info) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.MODIFY_CACHE_POOL);
CachePoolInfo.validate(info);
dfs.modifyCachePool(info);
}
@ -2341,6 +2357,8 @@ public class DistributedFileSystem extends FileSystem
* if the cache pool did not exist, or could not be removed.
*/
public void removeCachePool(String poolName) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.REMOVE_CACHE_POOL);
CachePoolInfo.validateName(poolName);
dfs.removeCachePool(poolName);
}
@ -2354,6 +2372,8 @@ public class DistributedFileSystem extends FileSystem
* If there was an error listing cache pools.
*/
public RemoteIterator<CachePoolEntry> listCachePools() throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_CACHE_POOL);
return dfs.listCachePools();
}
@ -2495,6 +2515,8 @@ public class DistributedFileSystem extends FileSystem
/* HDFS only */
public void createEncryptionZone(final Path path, final String keyName)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE_ENCRYPTION_ZONE);
Path absF = fixRelativePart(path);
new FileSystemLinkResolver<Void>() {
@Override
@ -2522,6 +2544,8 @@ public class DistributedFileSystem extends FileSystem
/* HDFS only */
public EncryptionZone getEZForPath(final Path path)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.GET_ENCRYPTION_ZONE);
Preconditions.checkNotNull(path);
Path absF = fixRelativePart(path);
return new FileSystemLinkResolver<EncryptionZone>() {
@ -2549,6 +2573,8 @@ public class DistributedFileSystem extends FileSystem
/* HDFS only */
public RemoteIterator<EncryptionZone> listEncryptionZones()
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.LIST_ENCRYPTION_ZONE);
return dfs.listEncryptionZones();
}

View File

@ -37,6 +37,7 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.security.NoSuchAlgorithmException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
@ -85,6 +86,8 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@ -800,6 +803,111 @@ public class TestDistributedFileSystem {
}
}
@Test
public void testStatistics2() throws IOException, NoSuchAlgorithmException {
HdfsConfiguration conf = new HdfsConfiguration();
File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
final Path jksPath = new Path(tmpDir.toString(), "test.jks");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
Path dir = new Path("/testStat");
dfs.mkdirs(dir);
int readOps = 0;
int writeOps = 0;
FileSystem.clearStatistics();
// Quota Commands.
long opCount = getOpStatistics(OpType.SET_QUOTA_USAGE);
dfs.setQuota(dir, 100, 1000);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.SET_QUOTA_USAGE, opCount + 1);
opCount = getOpStatistics(OpType.SET_QUOTA_BYTSTORAGEYPE);
dfs.setQuotaByStorageType(dir, StorageType.DEFAULT, 2000);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.SET_QUOTA_BYTSTORAGEYPE, opCount + 1);
opCount = getOpStatistics(OpType.GET_QUOTA_USAGE);
dfs.getQuotaUsage(dir);
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.GET_QUOTA_USAGE, opCount + 1);
// Cache Commands.
CachePoolInfo cacheInfo =
new CachePoolInfo("pool1").setMode(new FsPermission((short) 0));
opCount = getOpStatistics(OpType.ADD_CACHE_POOL);
dfs.addCachePool(cacheInfo);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.ADD_CACHE_POOL, opCount + 1);
CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder()
.setPath(new Path(".")).setPool("pool1").build();
opCount = getOpStatistics(OpType.ADD_CACHE_DIRECTIVE);
long id = dfs.addCacheDirective(directive);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.ADD_CACHE_DIRECTIVE, opCount + 1);
opCount = getOpStatistics(OpType.LIST_CACHE_DIRECTIVE);
dfs.listCacheDirectives(null);
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.LIST_CACHE_DIRECTIVE, opCount + 1);
opCount = getOpStatistics(OpType.MODIFY_CACHE_DIRECTIVE);
dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
.setReplication((short) 2).build());
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.MODIFY_CACHE_DIRECTIVE, opCount + 1);
opCount = getOpStatistics(OpType.REMOVE_CACHE_DIRECTIVE);
dfs.removeCacheDirective(id);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.REMOVE_CACHE_DIRECTIVE, opCount + 1);
opCount = getOpStatistics(OpType.MODIFY_CACHE_POOL);
dfs.modifyCachePool(cacheInfo);
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.MODIFY_CACHE_POOL, opCount + 1);
opCount = getOpStatistics(OpType.LIST_CACHE_POOL);
dfs.listCachePools();
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.LIST_CACHE_POOL, opCount + 1);
opCount = getOpStatistics(OpType.REMOVE_CACHE_POOL);
dfs.removeCachePool(cacheInfo.getPoolName());
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.REMOVE_CACHE_POOL, opCount + 1);
// Crypto Commands.
final KeyProvider provider =
cluster.getNameNode().getNamesystem().getProvider();
final KeyProvider.Options options = KeyProvider.options(conf);
provider.createKey("key", options);
provider.flush();
opCount = getOpStatistics(OpType.CREATE_ENCRYPTION_ZONE);
dfs.createEncryptionZone(dir, "key");
checkStatistics(dfs, readOps, ++writeOps, 0);
checkOpStatistics(OpType.CREATE_ENCRYPTION_ZONE, opCount + 1);
opCount = getOpStatistics(OpType.LIST_ENCRYPTION_ZONE);
dfs.listEncryptionZones();
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.LIST_ENCRYPTION_ZONE, opCount + 1);
opCount = getOpStatistics(OpType.GET_ENCRYPTION_ZONE);
dfs.getEZForPath(dir);
checkStatistics(dfs, ++readOps, writeOps, 0);
checkOpStatistics(OpType.GET_ENCRYPTION_ZONE, opCount + 1);
}
}
@Test
public void testECStatistics() throws IOException {
try (MiniDFSCluster cluster =