From d3fed8e653ed9e18d3a29a11c4b24a628ac770bb Mon Sep 17 00:00:00 2001 From: Benoy Antony Date: Mon, 29 Jun 2015 11:00:22 -0700 Subject: [PATCH] HDFS-7390. Provide JMX metrics per storage type. (Benoy Antony) --- .../server/blockmanagement/BlockManager.java | 15 +- .../blockmanagement/BlockStatsMXBean.java | 36 +++++ .../blockmanagement/DatanodeStatistics.java | 6 + .../blockmanagement/HeartbeatManager.java | 96 +++++++++++- .../blockmanagement/StorageTypeStats.java | 115 ++++++++++++++ .../blockmanagement/TestBlockStatsMXBean.java | 146 ++++++++++++++++++ 6 files changed, 412 insertions(+), 2 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStatsMXBean.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 5bd4980f1e8..0b60a979ebd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -39,6 +39,8 @@ import java.util.TreeSet; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; +import javax.management.ObjectName; + import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; @@ -85,6 +87,7 @@ import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.util.LightWeightLinkedSet; +import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Daemon; @@ -94,6 +97,7 @@ import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,7 +105,7 @@ import org.slf4j.LoggerFactory; * Keeps information related to the blocks stored in the Hadoop cluster. */ @InterfaceAudience.Private -public class BlockManager { +public class BlockManager implements BlockStatsMXBean { public static final Logger LOG = LoggerFactory.getLogger(BlockManager.class); public static final Logger blockLog = NameNode.blockStateChangeLog; @@ -129,6 +133,7 @@ public class BlockManager { private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L); private final long startupDelayBlockDeletionInMs; private final BlockReportLeaseManager blockReportLeaseManager; + private ObjectName mxBeanName; /** Used by metrics */ public long getPendingReplicationBlocksCount() { @@ -468,6 +473,7 @@ public class BlockManager { pendingReplications.start(); datanodeManager.activate(conf); this.replicationThread.start(); + mxBeanName = MBeans.register("NameNode", "BlockStats", this); } public void close() { @@ -3944,6 +3950,8 @@ public class BlockManager { public void shutdown() { stopReplicationInitializer(); blocksMap.close(); + MBeans.unregister(mxBeanName); + mxBeanName = null; } public void clear() { @@ -3954,4 +3962,9 @@ public class BlockManager { public BlockReportLeaseManager getBlockReportLeaseManager() { return blockReportLeaseManager; } + + @Override // BlockStatsMXBean + public Map getStorageTypeStats() { + return datanodeManager.getDatanodeStatistics().getStorageTypeStats(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStatsMXBean.java new file mode 100644 index 00000000000..f22c537a93d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockStatsMXBean.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.Map; + +import org.apache.hadoop.fs.StorageType; + +/** + * This is an interface used to retrieve statistic information related to + * block management. + */ +public interface BlockStatsMXBean { + + /** + * The statistics of storage types. + * + * @return get storage statistics per storage type + */ + Map getStorageTypeStats(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java index c9bc3e5ea67..33eca2ed1fc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.blockmanagement; +import java.util.Map; + +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.protocol.ClientProtocol; /** Datanode statistics */ @@ -71,4 +74,7 @@ public interface DatanodeStatistics { /** @return the expired heartbeats */ public int getExpiredHeartbeats(); + + /** @return Storage Tier statistics*/ + Map getStorageTypeStats(); } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java index 9017fe1ef94..cc9365d4091 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java @@ -18,9 +18,15 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.IdentityHashMap; import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.protocol.DatanodeID; @@ -189,6 +195,11 @@ class HeartbeatManager implements DatanodeStatistics { return stats.expiredHeartbeats; } + @Override + public Map getStorageTypeStats() { + return stats.statsMap.get(); + } + synchronized void register(final DatanodeDescriptor d) { if (!d.isAlive) { addDatanode(d); @@ -393,6 +404,9 @@ class HeartbeatManager implements DatanodeStatistics { * For decommissioning/decommissioned nodes, only used capacity is counted. */ private static class Stats { + + private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap(); + private long capacityTotal = 0L; private long capacityUsed = 0L; private long capacityRemaining = 0L; @@ -420,6 +434,14 @@ class HeartbeatManager implements DatanodeStatistics { } cacheCapacity += node.getCacheCapacity(); cacheUsed += node.getCacheUsed(); + Set storageTypes = new HashSet<>(); + for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) { + statsMap.addStorage(storageInfo, node); + storageTypes.add(storageInfo.getStorageType()); + } + for (StorageType storageType : storageTypes) { + statsMap.addNode(storageType, node); + } } private void subtract(final DatanodeDescriptor node) { @@ -436,6 +458,14 @@ class HeartbeatManager implements DatanodeStatistics { } cacheCapacity -= node.getCacheCapacity(); cacheUsed -= node.getCacheUsed(); + Set storageTypes = new HashSet<>(); + for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) { + statsMap.subtractStorage(storageInfo, node); + storageTypes.add(storageInfo.getStorageType()); + } + for (StorageType storageType : storageTypes) { + statsMap.subtractNode(storageType, node); + } } /** Increment expired heartbeat counter. */ @@ -443,5 +473,69 @@ class HeartbeatManager implements DatanodeStatistics { expiredHeartbeats++; } } -} + /** StorageType specific statistics. + * For decommissioning/decommissioned nodes, only used capacity is counted. + */ + + static final class StorageTypeStatsMap { + + private Map storageTypeStatsMap = + new IdentityHashMap<>(); + + private StorageTypeStatsMap() {} + + private StorageTypeStatsMap(StorageTypeStatsMap other) { + storageTypeStatsMap = + new IdentityHashMap<>(other.storageTypeStatsMap); + for (Map.Entry entry : + storageTypeStatsMap.entrySet()) { + entry.setValue(new StorageTypeStats(entry.getValue())); + } + } + + private Map get() { + return Collections.unmodifiableMap(storageTypeStatsMap); + } + + private void addNode(StorageType storageType, + final DatanodeDescriptor node) { + StorageTypeStats storageTypeStats = + storageTypeStatsMap.get(storageType); + if (storageTypeStats == null) { + storageTypeStats = new StorageTypeStats(); + storageTypeStatsMap.put(storageType, storageTypeStats); + } + storageTypeStats.addNode(node); + } + + private void addStorage(final DatanodeStorageInfo info, + final DatanodeDescriptor node) { + StorageTypeStats storageTypeStats = + storageTypeStatsMap.get(info.getStorageType()); + if (storageTypeStats == null) { + storageTypeStats = new StorageTypeStats(); + storageTypeStatsMap.put(info.getStorageType(), storageTypeStats); + } + storageTypeStats.addStorage(info, node); + } + + private void subtractStorage(final DatanodeStorageInfo info, + final DatanodeDescriptor node) { + StorageTypeStats storageTypeStats = + storageTypeStatsMap.get(info.getStorageType()); + if (storageTypeStats != null) { + storageTypeStats.subtractStorage(info, node); + } + } + + private void subtractNode(StorageType storageType, + final DatanodeDescriptor node) { + StorageTypeStats storageTypeStats = + storageTypeStatsMap.get(storageType); + if (storageTypeStats != null) { + storageTypeStats.subtractNode(node); + } + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java new file mode 100644 index 00000000000..45dcc8d672a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.beans.ConstructorProperties; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Statistics per StorageType. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class StorageTypeStats { + private long capacityTotal = 0L; + private long capacityUsed = 0L; + private long capacityRemaining = 0L; + private long blockPoolUsed = 0L; + private int nodesInService = 0; + + @ConstructorProperties({"capacityTotal", + "capacityUsed", "capacityRemaining", "blockPoolUsed", "nodesInService"}) + public StorageTypeStats(long capacityTotal, long capacityUsed, + long capacityRemaining, long blockPoolUsed, int nodesInService) { + this.capacityTotal = capacityTotal; + this.capacityUsed = capacityUsed; + this.capacityRemaining = capacityRemaining; + this.blockPoolUsed = blockPoolUsed; + this.nodesInService = nodesInService; + } + + public long getCapacityTotal() { + return capacityTotal; + } + + public long getCapacityUsed() { + return capacityUsed; + } + + public long getCapacityRemaining() { + return capacityRemaining; + } + + public long getBlockPoolUsed() { + return blockPoolUsed; + } + + public int getNodesInService() { + return nodesInService; + } + + StorageTypeStats() {} + + StorageTypeStats(StorageTypeStats other) { + capacityTotal = other.capacityTotal; + capacityUsed = other.capacityUsed; + capacityRemaining = other.capacityRemaining; + blockPoolUsed = other.blockPoolUsed; + nodesInService = other.nodesInService; + } + + void addStorage(final DatanodeStorageInfo info, + final DatanodeDescriptor node) { + capacityUsed += info.getDfsUsed(); + blockPoolUsed += info.getBlockPoolUsed(); + if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + capacityTotal += info.getCapacity(); + capacityRemaining += info.getRemaining(); + } else { + capacityTotal += info.getDfsUsed(); + } + } + + void addNode(final DatanodeDescriptor node) { + if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService++; + } + } + + void subtractStorage(final DatanodeStorageInfo info, + final DatanodeDescriptor node) { + capacityUsed -= info.getDfsUsed(); + blockPoolUsed -= info.getBlockPoolUsed(); + if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + capacityTotal -= info.getCapacity(); + capacityRemaining -= info.getRemaining(); + } else { + capacityTotal -= info.getDfsUsed(); + } + } + + void subtractNode(final DatanodeDescriptor node) { + if (!(node.isDecommissionInProgress() || node.isDecommissioned())) { + nodesInService--; + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java new file mode 100644 index 00000000000..43d983d7cf5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockStatsMXBean.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mortbay.util.ajax.JSON; + +/** + * Class for testing {@link BlockStatsMXBean} implementation + */ +public class TestBlockStatsMXBean { + + private MiniDFSCluster cluster; + + @Before + public void setup() throws IOException { + HdfsConfiguration conf = new HdfsConfiguration(); + cluster = null; + StorageType[][] types = new StorageType[6][]; + for (int i=0; i<3; i++) { + types[i] = new StorageType[] {StorageType.RAM_DISK, StorageType.DISK}; + } + for (int i=3; i< 5; i++) { + types[i] = new StorageType[] {StorageType.RAM_DISK, StorageType.ARCHIVE}; + } + types[5] = new StorageType[] {StorageType.RAM_DISK, StorageType.ARCHIVE, + StorageType.ARCHIVE}; + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(6). + storageTypes(types).storagesPerDatanode(3).build(); + cluster.waitActive(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testStorageTypeStats() throws Exception { + Map storageTypeStatsMap = + cluster.getNamesystem().getBlockManager().getStorageTypeStats(); + assertTrue(storageTypeStatsMap.containsKey(StorageType.RAM_DISK)); + assertTrue(storageTypeStatsMap.containsKey(StorageType.DISK)); + assertTrue(storageTypeStatsMap.containsKey(StorageType.ARCHIVE)); + + StorageTypeStats storageTypeStats = + storageTypeStatsMap.get(StorageType.RAM_DISK); + assertEquals(6, storageTypeStats.getNodesInService()); + + storageTypeStats = storageTypeStatsMap.get(StorageType.DISK); + assertEquals(3, storageTypeStats.getNodesInService()); + + storageTypeStats = storageTypeStatsMap.get(StorageType.ARCHIVE); + assertEquals(3, storageTypeStats.getNodesInService()); + } + + protected static String readOutput(URL url) throws IOException { + StringBuilder out = new StringBuilder(); + InputStream in = url.openConnection().getInputStream(); + byte[] buffer = new byte[64 * 1024]; + int len = in.read(buffer); + while (len > 0) { + out.append(new String(buffer, 0, len)); + len = in.read(buffer); + } + return out.toString(); + } + + @Test + @SuppressWarnings("unchecked") + public void testStorageTypeStatsJMX() throws Exception { + URL baseUrl = new URL (cluster.getHttpUri(0)); + String result = readOutput(new URL(baseUrl, "/jmx")); + System.out.println(result); + + Map stat = (Map) JSON.parse(result); + Object[] beans =(Object[]) stat.get("beans"); + Map blockStats = null; + for (Object bean : beans) { + Map map = (Map) bean; + if (map.get("name").equals("Hadoop:service=NameNode,name=BlockStats")) { + blockStats = map; + } + } + assertNotNull(blockStats); + Object[] storageTypeStatsList = + (Object[])blockStats.get("StorageTypeStats"); + assertNotNull(storageTypeStatsList); + assertEquals (3, storageTypeStatsList.length); + + Set typesPresent = new HashSet<> (); + for (Object obj : storageTypeStatsList) { + Map entry = (Map)obj; + String storageType = (String)entry.get("key"); + Map storageTypeStats = (Map)entry.get("value"); + typesPresent.add(storageType); + if (storageType.equals("ARCHIVE") || storageType.equals("DISK") ) { + assertEquals(3l, storageTypeStats.get("nodesInService")); + } else if (storageType.equals("RAM_DISK")) { + assertEquals(6l, storageTypeStats.get("nodesInService")); + } + else { + fail(); + } + } + + assertTrue(typesPresent.contains("ARCHIVE")); + assertTrue(typesPresent.contains("DISK")); + assertTrue(typesPresent.contains("RAM_DISK")); + } +}