diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f7b09d5fc18..35ab6193142 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -207,6 +207,8 @@ class BPServiceActor implements Runnable { info.put("ActorState", getRunningState()); info.put("LastHeartbeat", String.valueOf(getScheduler().getLastHearbeatTime())); + info.put("LastHeartbeatResponseTime", + String.valueOf(getScheduler().getLastHeartbeatResponseTime())); info.put("LastBlockReport", String.valueOf(getScheduler().getLastBlockReportTime())); info.put("maxBlockReportSize", String.valueOf(getMaxBlockReportSize())); @@ -580,6 +582,8 @@ class BPServiceActor implements Runnable { slowPeers, slowDisks); + scheduler.updateLastHeartbeatResponseTime(monotonicNow()); + if (outliersReportDue) { // If the report was due and successfully sent, schedule the next one. scheduler.scheduleNextOutlierReport(); @@ -1202,6 +1206,9 @@ class BPServiceActor implements Runnable { @VisibleForTesting volatile long lastHeartbeatTime = monotonicNow(); + @VisibleForTesting + private volatile long lastHeartbeatResponseTime = -1; + @VisibleForTesting boolean resetBlockReportTime = true; @@ -1250,6 +1257,10 @@ class BPServiceActor implements Runnable { lastHeartbeatTime = heartbeatTime; } + void updateLastHeartbeatResponseTime(long heartbeatTime) { + this.lastHeartbeatResponseTime = heartbeatTime; + } + void updateLastBlockReportTime(long blockReportTime) { lastBlockReportTime = blockReportTime; } @@ -1262,6 +1273,10 @@ class BPServiceActor implements Runnable { return (monotonicNow() - lastHeartbeatTime)/1000; } + private long getLastHeartbeatResponseTime() { + return (monotonicNow() - lastHeartbeatResponseTime) / 1000; + } + long getLastBlockReportTime() { return (monotonicNow() - lastBlockReportTime)/1000; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index c42abda72bc..d8149b6f3e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3621,8 +3621,12 @@ public class DataNode extends ReconfigurableBase */ @Override // DataNodeMXBean public String getBPServiceActorInfo() { - final ArrayList> infoArray = - new ArrayList>(); + return JSON.toString(getBPServiceActorInfoMap()); + } + + @VisibleForTesting + public List> getBPServiceActorInfoMap() { + final List> infoArray = new ArrayList<>(); for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { if (bpos != null) { for (BPServiceActor actor : bpos.getBPServiceActors()) { @@ -3630,7 +3634,7 @@ public class DataNode extends ReconfigurableBase } } } - return JSON.toString(infoArray); + return infoArray; } /** @@ -3825,6 +3829,29 @@ public class DataNode extends ReconfigurableBase * @return true - if the data node is fully started */ public boolean isDatanodeFullyStarted() { + return isDatanodeFullyStarted(false); + } + + /** + * A datanode is considered to be fully started if all the BP threads are + * alive and all the block pools are initialized. If checkConnectionToActiveNamenode is true, + * the datanode is considered to be fully started if it is also heartbeating to + * active namenode in addition to the above-mentioned conditions. + * + * @param checkConnectionToActiveNamenode if true, performs additional check of whether datanode + * is heartbeating to active namenode. + * @return true if the datanode is fully started and also conditionally connected to active + * namenode, false otherwise. + */ + public boolean isDatanodeFullyStarted(boolean checkConnectionToActiveNamenode) { + if (checkConnectionToActiveNamenode) { + for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) { + if (!bp.isInitialized() || !bp.isAlive() || bp.getActiveNN() == null) { + return false; + } + } + return true; + } for (BPOfferService bp : blockPoolManager.getAllNamenodeThreads()) { if (!bp.isInitialized() || !bp.isAlive()) { return false; @@ -3832,7 +3859,7 @@ public class DataNode extends ReconfigurableBase } return true; } - + @VisibleForTesting public DatanodeID getDatanodeId() { return id; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html index b491d5a04e3..28cba0153c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/datanode.html @@ -84,7 +84,8 @@ Namenode HA State Block Pool ID Actor State - Last Heartbeat + Last Heartbeat Sent + Last Heartbeat Response Last Block Report Last Block Report Size (Max Size) @@ -96,6 +97,7 @@ {BlockPoolID} {ActorState} {LastHeartbeat}s + {LastHeartbeatResponseTime}s {#helper_relative_time value="{LastBlockReport}"/} {maxBlockReportSize|fmt_bytes} ({maxDataLength|fmt_bytes}) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index dd8bb204382..f5638871d40 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2529,6 +2529,24 @@ public class MiniDFSCluster implements AutoCloseable { return restartDataNode(dnprop, false); } + /** + * Wait for the datanode to be fully functional i.e. all the BP service threads are alive, + * all block pools initiated and also connected to active namenode. + * + * @param dn Datanode instance. + * @param timeout Timeout in millis until when we should wait for datanode to be fully + * operational. + * @throws InterruptedException If the thread wait is interrupted. + * @throws TimeoutException If times out while awaiting the fully operational capability of + * datanode. + */ + public void waitDatanodeConnectedToActive(DataNode dn, int timeout) + throws InterruptedException, TimeoutException { + GenericTestUtils.waitFor(() -> dn.isDatanodeFullyStarted(true), + 100, timeout, "Datanode is not connected to active namenode even after " + + timeout + " ms of waiting"); + } + public void waitDatanodeFullyStarted(DataNode dn, int timeout) throws TimeoutException, InterruptedException { GenericTestUtils.waitFor(dn::isDatanodeFullyStarted, 100, timeout, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java index ea43cccbb18..ad4c892b22f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMXBean.java @@ -38,7 +38,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; @@ -294,4 +296,81 @@ public class TestDataNodeMXBean extends SaslDataTransferTestCase { if (cluster != null) {cluster.shutdown();} } } + + @Test + public void testDataNodeMXBeanLastHeartbeats() throws Exception { + Configuration conf = new Configuration(); + try (MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology(2)) + .build()) { + cluster.waitActive(); + cluster.transitionToActive(0); + cluster.transitionToStandby(1); + + DataNode datanode = cluster.getDataNodes().get(0); + + MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); + ObjectName mxbeanName = new ObjectName( + "Hadoop:service=DataNode,name=DataNodeInfo"); + + // Verify and wait until one of the BP service actor identifies active namenode as active + // and another as standby. + cluster.waitDatanodeConnectedToActive(datanode, 5000); + + // Verify that last heartbeat sent to both namenodes in last 5 sec. + assertLastHeartbeatSentTime(datanode, "LastHeartbeat"); + // Verify that last heartbeat response from both namenodes have been received within + // last 5 sec. + assertLastHeartbeatSentTime(datanode, "LastHeartbeatResponseTime"); + + + NameNode sbNameNode = cluster.getNameNode(1); + + // Stopping standby namenode + sbNameNode.stop(); + + // Verify that last heartbeat response time from one of the namenodes would stay much higher + // after stopping one namenode. + GenericTestUtils.waitFor(() -> { + List> bpServiceActorInfo = datanode.getBPServiceActorInfoMap(); + Map bpServiceActorInfo1 = bpServiceActorInfo.get(0); + Map bpServiceActorInfo2 = bpServiceActorInfo.get(1); + + long lastHeartbeatResponseTime1 = + Long.parseLong(bpServiceActorInfo1.get("LastHeartbeatResponseTime")); + long lastHeartbeatResponseTime2 = + Long.parseLong(bpServiceActorInfo2.get("LastHeartbeatResponseTime")); + + LOG.info("Last heartbeat response from namenode 1: {}", lastHeartbeatResponseTime1); + LOG.info("Last heartbeat response from namenode 2: {}", lastHeartbeatResponseTime2); + + return (lastHeartbeatResponseTime1 < 5L && lastHeartbeatResponseTime2 > 5L) || ( + lastHeartbeatResponseTime1 > 5L && lastHeartbeatResponseTime2 < 5L); + + }, 200, 15000, + "Last heartbeat response should be higher than 5s for at least one namenode"); + + // Verify that last heartbeat sent to both namenodes in last 5 sec even though + // the last heartbeat received from one of the namenodes is greater than 5 sec ago. + assertLastHeartbeatSentTime(datanode, "LastHeartbeat"); + } + } + + private static void assertLastHeartbeatSentTime(DataNode datanode, String lastHeartbeat) { + List> bpServiceActorInfo = datanode.getBPServiceActorInfoMap(); + Map bpServiceActorInfo1 = bpServiceActorInfo.get(0); + Map bpServiceActorInfo2 = bpServiceActorInfo.get(1); + + long lastHeartbeatSent1 = + Long.parseLong(bpServiceActorInfo1.get(lastHeartbeat)); + long lastHeartbeatSent2 = + Long.parseLong(bpServiceActorInfo2.get(lastHeartbeat)); + + Assert.assertTrue(lastHeartbeat + " for first bp service actor is higher than 5s", + lastHeartbeatSent1 < 5L); + Assert.assertTrue(lastHeartbeat + " for second bp service actor is higher than 5s", + lastHeartbeatSent2 < 5L); + } + }