diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index f7da52ae0aa..320c680cc43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -280,6 +280,11 @@ public class DatanodeDescriptor extends DatanodeInfo { return leavingServiceStatus; } + @VisibleForTesting + public boolean isHeartbeatedSinceRegistration() { + return heartbeatedSinceRegistration; + } + @VisibleForTesting public DatanodeStorageInfo getStorageInfo(String storageID) { synchronized (storageMap) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 6477d5cd4ba..cc64a04e45d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1661,10 +1661,10 @@ public class DatanodeManager { LOG.debug("Received handleLifeline from nodeReg = " + nodeReg); } DatanodeDescriptor nodeinfo = getDatanode(nodeReg); - if (nodeinfo == null) { - // This is null if the DataNode has not yet registered. We expect this - // will never happen, because the DataNode has logic to prevent sending - // lifeline messages until after initial registration is successful. + if (nodeinfo == null || !nodeinfo.isRegistered()) { + // This can happen if the lifeline message comes when DataNode is either + // not registered at all or its marked dead at NameNode and expectes + // re-registration. Ignore lifeline messages without registration. // Lifeline message handling can't send commands back to the DataNode to // tell it to register, so simply exit. return; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index f3247fca27d..bb2b792555e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -504,7 +504,12 @@ class BPServiceActor implements Runnable { volumeFailureSummary, requestBlockReportLease); } - + + @VisibleForTesting + void sendLifelineForTests() throws IOException { + lifelineSender.sendLifeline(); + } + //This must be called only by BPOfferService void start() { if ((bpThread != null) && (bpThread.isAlive())) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 1c7442ab1bf..9dd14471d1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -214,7 +214,9 @@ public class BlockManagerTestUtil { * @param bm the BlockManager to manipulate */ public static void checkHeartbeat(BlockManager bm) { - bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck(); + HeartbeatManager hbm = bm.getDatanodeManager().getHeartbeatManager(); + hbm.restartHeartbeatStopWatch(); + hbm.heartbeatCheck(); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java index fd661156e54..df2fe5aa75c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -90,6 +91,8 @@ public class TestDataNodeLifeline { private DataNodeMetrics metrics; private DatanodeProtocolClientSideTranslatorPB namenode; private FSNamesystem namesystem; + private DataNode dn; + private BPServiceActor bpsa; @Before public void setup() throws Exception { @@ -106,7 +109,7 @@ public class TestDataNodeLifeline { namesystem = cluster.getNameNode().getNamesystem(); // Set up spies on RPC proxies so that we can inject failures. - DataNode dn = cluster.getDataNodes().get(0); + dn = cluster.getDataNodes().get(0); metrics = dn.getMetrics(); assertNotNull(metrics); List allBpos = dn.getAllBpOs(); @@ -118,7 +121,7 @@ public class TestDataNodeLifeline { assertNotNull(allBpsa); assertEquals(1, allBpsa.size()); - final BPServiceActor bpsa = allBpsa.get(0); + bpsa = allBpsa.get(0); assertNotNull(bpsa); // Lifeline RPC proxy gets created on separate thread, so poll until found. @@ -257,6 +260,32 @@ public class TestDataNodeLifeline { getLongCounter("LifelinesNumOps", getMetrics(metrics.name()))); } + @Test + public void testLifelineForDeadNode() throws Exception { + long initialCapacity = cluster.getNamesystem(0).getCapacityTotal(); + assertTrue(initialCapacity > 0); + dn.setHeartbeatsDisabledForTests(true); + cluster.setDataNodesDead(); + assertEquals("Capacity should be 0 after all DNs dead", 0, cluster + .getNamesystem(0).getCapacityTotal()); + bpsa.sendLifelineForTests(); + assertEquals("Lifeline should be ignored for dead node", 0, cluster + .getNamesystem(0).getCapacityTotal()); + // Wait for re-registration and heartbeat + dn.setHeartbeatsDisabledForTests(false); + final DatanodeDescriptor dnDesc = cluster.getNamesystem(0).getBlockManager() + .getDatanodeManager().getDatanodes().iterator().next(); + GenericTestUtils.waitFor(new Supplier() { + + @Override + public Boolean get() { + return dnDesc.isAlive() && dnDesc.isHeartbeatedSinceRegistration(); + } + }, 100, 5000); + assertEquals("Capacity should include only live capacity", initialCapacity, + cluster.getNamesystem(0).getCapacityTotal()); + } + /** * Waits on a {@link CountDownLatch} before calling through to the method. */