diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index e4e2443caf3..0666b3feb81 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -315,6 +315,7 @@ Each metrics record contains tags such as SessionId and Hostname as additional i | `TotalReadTime` | Total number of milliseconds spent on read operation | | `RemoteBytesRead` | Number of bytes read by remote clients | | `RemoteBytesWritten` | Number of bytes written by remote clients | +| `BPServiceActorInfo` | The information about a block pool service actor | yarn context ============ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 69989fbbc66..4bde758bc19 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -26,10 +26,13 @@ import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; @@ -101,6 +104,9 @@ static enum RunningState { private final DataNode dn; private final DNConf dnConf; private long prevBlockReportId; + private final SortedSet blockReportSizes = + Collections.synchronizedSortedSet(new TreeSet<>()); + private final int maxDataLength; private final IncrementalBlockReportManager ibrManager; @@ -122,6 +128,8 @@ static enum RunningState { prevBlockReportId = ThreadLocalRandom.current().nextLong(); scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval); + // get the value of maxDataLength. + this.maxDataLength = dnConf.getMaxDataLength(); } public DatanodeRegistration getBpRegistration() { @@ -166,6 +174,8 @@ Map getActorInfoMap() { String.valueOf(getScheduler().getLastHearbeatTime())); info.put("LastBlockReport", String.valueOf(getScheduler().getLastBlockReportTime())); + info.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize())); + info.put("maxDataLength", String.valueOf(maxDataLength)); return info; } @@ -305,6 +315,14 @@ void triggerHeartbeatForTests() { } } + private int getMaxBlockReportSize() { + int maxBlockReportSize = 0; + if (!blockReportSizes.isEmpty()) { + maxBlockReportSize = blockReportSizes.last(); + } + return maxBlockReportSize; + } + private long generateUniqueBlockReportId() { // Initialize the block report ID the first time through. // Note that 0 is used on the NN to indicate "uninitialized", so we should @@ -353,12 +371,18 @@ List blockReport(long fullBrLeaseId) throws IOException { boolean success = false; long brSendStartTime = monotonicNow(); long reportId = generateUniqueBlockReportId(); + boolean useBlocksBuffer = + bpRegistration.getNamespaceInfo().isCapabilitySupported( + NamespaceInfo.Capability.STORAGE_BLOCK_REPORT_BUFFERS); + blockReportSizes.clear(); try { if (totalBlockCount < dnConf.blockReportSplitThreshold) { // Below split threshold, send all reports in a single message. DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), reports, new BlockReportContext(1, 0, reportId, fullBrLeaseId, true)); + blockReportSizes.add( + calculateBlockReportPBSize(useBlocksBuffer, reports)); numRPCs = 1; numReportsSent = reports.length; if (cmd != null) { @@ -372,6 +396,8 @@ List blockReport(long fullBrLeaseId) throws IOException { bpRegistration, bpos.getBlockPoolId(), singleReport, new BlockReportContext(reports.length, r, reportId, fullBrLeaseId, true)); + blockReportSizes.add( + calculateBlockReportPBSize(useBlocksBuffer, singleReport)); numReportsSent++; numRPCs++; if (cmd != null) { @@ -437,7 +463,22 @@ DatanodeCommand cacheReport() throws IOException { } return cmd; } - + + private int calculateBlockReportPBSize( + boolean useBlocksBuffer, StorageBlockReport[] reports) { + int reportSize = 0; + + for (StorageBlockReport r : reports) { + if (useBlocksBuffer) { + reportSize += r.getBlocks().getBlocksBuffer().size(); + } else { + // each block costs 10 bytes in PB because of uint64 + reportSize += 10 * r.getBlocks().getBlockListAsLongs().length; + } + } + return reportSize; + } + HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) throws IOException { scheduler.scheduleNextHeartbeat(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 942672edaff..11197e6d17f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -117,6 +117,7 @@ public class DNConf { private final int volFailuresTolerated; private final int volsConfigured; + private final int maxDataLength; public DNConf(Configuration conf) { this.conf = conf; @@ -149,6 +150,8 @@ public DNConf(Configuration conf) { readaheadLength = conf.getLong( HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + maxDataLength = conf.getInt(DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH, + DFSConfigKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); dropCacheBehindWrites = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); @@ -389,4 +392,8 @@ public int getVolFailuresTolerated() { public int getVolsConfigured() { return volsConfigured; } + + int getMaxDataLength() { + return maxDataLength; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html index 22a2733dff4..b35a0a71eee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html @@ -80,6 +80,7 @@ Actor State Last Heartbeat Last Block Report + Last Block Report Size (Max Size) {#dn.BPServiceActorInfo} @@ -89,6 +90,7 @@ {ActorState} {LastHeartbeat}s {#helper_relative_time value="{LastBlockReport}"/} + {maxBlockReportSize|fmt_bytes} ({maxDataLength|fmt_bytes}) {/dn.BPServiceActorInfo} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java index e7b2c7cf5d3..8b0d5cb02fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.lang.management.ManagementFactory; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -26,22 +27,31 @@ import javax.management.ObjectName; import com.google.common.base.Supplier; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.test.GenericTestUtils; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; import org.junit.Assert; import org.junit.Test; import org.mortbay.util.ajax.JSON; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Class for testing {@link DataNodeMXBean} implementation */ public class TestDataNodeMXBean { + + public static final Log LOG = LogFactory.getLog(TestDataNodeMXBean.class); + @Test public void testDataNodeMXBean() throws Exception { Configuration conf = new Configuration(); @@ -99,6 +109,48 @@ private static String replaceDigits(final String s) { return s.replaceAll("[0-9]+", "_DIGITS_"); } + @Test + public void testDataNodeMXBeanBlockSize() throws Exception { + Configuration conf = new Configuration(); + + try(MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).build()) { + DataNode dn = cluster.getDataNodes().get(0); + for (int i = 0; i < 100; i++) { + DFSTestUtil.writeFile( + cluster.getFileSystem(), + new Path("/foo" + String.valueOf(i) + ".txt"), "test content"); + } + DataNodeTestUtils.triggerBlockReport(dn); + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName mxbeanName = new ObjectName( + "Hadoop:service=DataNode,name=DataNodeInfo"); + String bpActorInfo = (String)mbs.getAttribute(mxbeanName, + "BPServiceActorInfo"); + Assert.assertEquals(dn.getBPServiceActorInfo(), bpActorInfo); + LOG.info("bpActorInfo is " + bpActorInfo); + TypeReference>> typeRef + = new TypeReference>>() {}; + ArrayList> bpActorInfoList = + new ObjectMapper().readValue(bpActorInfo, typeRef); + int maxDataLength = + Integer.valueOf(bpActorInfoList.get(0).get("maxDataLength")); + int confMaxDataLength = dn.getConf().getInt( + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); + int maxBlockReportSize = + Integer.valueOf(bpActorInfoList.get(0).get("maxBlockReportSize")); + LOG.info("maxDataLength is " + maxDataLength); + LOG.info("maxBlockReportSize is " + maxBlockReportSize); + assertTrue("maxBlockReportSize should be greater than zero", + maxBlockReportSize > 0); + assertEquals("maxDataLength should be exactly " + + "the same value of ipc.maximum.data.length", + confMaxDataLength, + maxDataLength); + } + } + @Test public void testDataNodeMXBeanBlockCount() throws Exception { Configuration conf = new Configuration();