HDFS-11224. Lifeline message should be ignored for dead nodes (Contributed by Vinayakumar B)

This commit is contained in:
Vinayakumar B 2016-12-09 14:53:50 +05:30
parent 7d8e440eee
commit d1d4aba71b
5 changed files with 49 additions and 8 deletions

View File

@ -280,6 +280,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
return leavingServiceStatus; return leavingServiceStatus;
} }
@VisibleForTesting
public boolean isHeartbeatedSinceRegistration() {
return heartbeatedSinceRegistration;
}
@VisibleForTesting @VisibleForTesting
public DatanodeStorageInfo getStorageInfo(String storageID) { public DatanodeStorageInfo getStorageInfo(String storageID) {
synchronized (storageMap) { synchronized (storageMap) {

View File

@ -1661,10 +1661,10 @@ public class DatanodeManager {
LOG.debug("Received handleLifeline from nodeReg = " + nodeReg); LOG.debug("Received handleLifeline from nodeReg = " + nodeReg);
} }
DatanodeDescriptor nodeinfo = getDatanode(nodeReg); DatanodeDescriptor nodeinfo = getDatanode(nodeReg);
if (nodeinfo == null) { if (nodeinfo == null || !nodeinfo.isRegistered()) {
// This is null if the DataNode has not yet registered. We expect this // This can happen if the lifeline message comes when DataNode is either
// will never happen, because the DataNode has logic to prevent sending // not registered at all or its marked dead at NameNode and expectes
// lifeline messages until after initial registration is successful. // re-registration. Ignore lifeline messages without registration.
// Lifeline message handling can't send commands back to the DataNode to // Lifeline message handling can't send commands back to the DataNode to
// tell it to register, so simply exit. // tell it to register, so simply exit.
return; return;

View File

@ -505,6 +505,11 @@ class BPServiceActor implements Runnable {
requestBlockReportLease); requestBlockReportLease);
} }
@VisibleForTesting
void sendLifelineForTests() throws IOException {
lifelineSender.sendLifeline();
}
//This must be called only by BPOfferService //This must be called only by BPOfferService
void start() { void start() {
if ((bpThread != null) && (bpThread.isAlive())) { if ((bpThread != null) && (bpThread.isAlive())) {

View File

@ -214,7 +214,9 @@ public class BlockManagerTestUtil {
* @param bm the BlockManager to manipulate * @param bm the BlockManager to manipulate
*/ */
public static void checkHeartbeat(BlockManager bm) { public static void checkHeartbeat(BlockManager bm) {
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck(); HeartbeatManager hbm = bm.getDatanodeManager().getHeartbeatManager();
hbm.restartHeartbeatStopWatch();
hbm.heartbeatCheck();
} }
/** /**

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -90,6 +91,8 @@ public class TestDataNodeLifeline {
private DataNodeMetrics metrics; private DataNodeMetrics metrics;
private DatanodeProtocolClientSideTranslatorPB namenode; private DatanodeProtocolClientSideTranslatorPB namenode;
private FSNamesystem namesystem; private FSNamesystem namesystem;
private DataNode dn;
private BPServiceActor bpsa;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
@ -106,7 +109,7 @@ public class TestDataNodeLifeline {
namesystem = cluster.getNameNode().getNamesystem(); namesystem = cluster.getNameNode().getNamesystem();
// Set up spies on RPC proxies so that we can inject failures. // Set up spies on RPC proxies so that we can inject failures.
DataNode dn = cluster.getDataNodes().get(0); dn = cluster.getDataNodes().get(0);
metrics = dn.getMetrics(); metrics = dn.getMetrics();
assertNotNull(metrics); assertNotNull(metrics);
List<BPOfferService> allBpos = dn.getAllBpOs(); List<BPOfferService> allBpos = dn.getAllBpOs();
@ -118,7 +121,7 @@ public class TestDataNodeLifeline {
assertNotNull(allBpsa); assertNotNull(allBpsa);
assertEquals(1, allBpsa.size()); assertEquals(1, allBpsa.size());
final BPServiceActor bpsa = allBpsa.get(0); bpsa = allBpsa.get(0);
assertNotNull(bpsa); assertNotNull(bpsa);
// Lifeline RPC proxy gets created on separate thread, so poll until found. // Lifeline RPC proxy gets created on separate thread, so poll until found.
@ -257,6 +260,32 @@ public class TestDataNodeLifeline {
getLongCounter("LifelinesNumOps", getMetrics(metrics.name()))); getLongCounter("LifelinesNumOps", getMetrics(metrics.name())));
} }
@Test
public void testLifelineForDeadNode() throws Exception {
long initialCapacity = cluster.getNamesystem(0).getCapacityTotal();
assertTrue(initialCapacity > 0);
dn.setHeartbeatsDisabledForTests(true);
cluster.setDataNodesDead();
assertEquals("Capacity should be 0 after all DNs dead", 0, cluster
.getNamesystem(0).getCapacityTotal());
bpsa.sendLifelineForTests();
assertEquals("Lifeline should be ignored for dead node", 0, cluster
.getNamesystem(0).getCapacityTotal());
// Wait for re-registration and heartbeat
dn.setHeartbeatsDisabledForTests(false);
final DatanodeDescriptor dnDesc = cluster.getNamesystem(0).getBlockManager()
.getDatanodeManager().getDatanodes().iterator().next();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return dnDesc.isAlive() && dnDesc.isHeartbeatedSinceRegistration();
}
}, 100, 5000);
assertEquals("Capacity should include only live capacity", initialCapacity,
cluster.getNamesystem(0).getCapacityTotal());
}
/** /**
* Waits on a {@link CountDownLatch} before calling through to the method. * Waits on a {@link CountDownLatch} before calling through to the method.
*/ */