HDFS-11224. Lifeline message should be ignored for dead nodes (Contributed by Vinayakumar B)

(cherry picked from commit d1d4aba71b)
(cherry picked from commit 199d8b5594)
This commit is contained in:
Vinayakumar B 2016-12-09 14:53:50 +05:30
parent 892ee3f191
commit b1c08737d8
5 changed files with 49 additions and 8 deletions

View File

@ -270,6 +270,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
this.needKeyUpdate = needKeyUpdate;
}
@VisibleForTesting
public boolean isHeartbeatedSinceRegistration() {
return heartbeatedSinceRegistration;
}
@VisibleForTesting
public DatanodeStorageInfo getStorageInfo(String storageID) {
synchronized (storageMap) {

View File

@ -1508,10 +1508,10 @@ public class DatanodeManager {
LOG.debug("Received handleLifeline from nodeReg = " + nodeReg);
}
DatanodeDescriptor nodeinfo = getDatanode(nodeReg);
if (nodeinfo == null) {
// This is null if the DataNode has not yet registered. We expect this
// will never happen, because the DataNode has logic to prevent sending
// lifeline messages until after initial registration is successful.
if (nodeinfo == null || !nodeinfo.isRegistered()) {
// This can happen if the lifeline message comes when DataNode is either
// not registered at all or its marked dead at NameNode and expectes
// re-registration. Ignore lifeline messages without registration.
// Lifeline message handling can't send commands back to the DataNode to
// tell it to register, so simply exit.
return;

View File

@ -463,7 +463,12 @@ class BPServiceActor implements Runnable {
volumeFailureSummary,
requestBlockReportLease);
}
@VisibleForTesting
void sendLifelineForTests() throws IOException {
lifelineSender.sendLifeline();
}
//This must be called only by BPOfferService
void start() {
if ((bpThread != null) && (bpThread.isAlive())) {

View File

@ -213,7 +213,9 @@ public class BlockManagerTestUtil {
* @param bm the BlockManager to manipulate
*/
public static void checkHeartbeat(BlockManager bm) {
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
HeartbeatManager hbm = bm.getDatanodeManager().getHeartbeatManager();
hbm.restartHeartbeatStopWatch();
hbm.heartbeatCheck();
}
/**

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -90,6 +91,8 @@ public class TestDataNodeLifeline {
private DataNodeMetrics metrics;
private DatanodeProtocolClientSideTranslatorPB namenode;
private FSNamesystem namesystem;
private DataNode dn;
private BPServiceActor bpsa;
@Before
public void setup() throws Exception {
@ -106,7 +109,7 @@ public class TestDataNodeLifeline {
namesystem = cluster.getNameNode().getNamesystem();
// Set up spies on RPC proxies so that we can inject failures.
DataNode dn = cluster.getDataNodes().get(0);
dn = cluster.getDataNodes().get(0);
metrics = dn.getMetrics();
assertNotNull(metrics);
List<BPOfferService> allBpos = dn.getAllBpOs();
@ -118,7 +121,7 @@ public class TestDataNodeLifeline {
assertNotNull(allBpsa);
assertEquals(1, allBpsa.size());
final BPServiceActor bpsa = allBpsa.get(0);
bpsa = allBpsa.get(0);
assertNotNull(bpsa);
// Lifeline RPC proxy gets created on separate thread, so poll until found.
@ -257,6 +260,32 @@ public class TestDataNodeLifeline {
getLongCounter("LifelinesNumOps", getMetrics(metrics.name())));
}
@Test
public void testLifelineForDeadNode() throws Exception {
long initialCapacity = cluster.getNamesystem(0).getCapacityTotal();
assertTrue(initialCapacity > 0);
dn.setHeartbeatsDisabledForTests(true);
cluster.setDataNodesDead();
assertEquals("Capacity should be 0 after all DNs dead", 0, cluster
.getNamesystem(0).getCapacityTotal());
bpsa.sendLifelineForTests();
assertEquals("Lifeline should be ignored for dead node", 0, cluster
.getNamesystem(0).getCapacityTotal());
// Wait for re-registration and heartbeat
dn.setHeartbeatsDisabledForTests(false);
final DatanodeDescriptor dnDesc = cluster.getNamesystem(0).getBlockManager()
.getDatanodeManager().getDatanodes().iterator().next();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return dnDesc.isAlive() && dnDesc.isHeartbeatedSinceRegistration();
}
}, 100, 5000);
assertEquals("Capacity should include only live capacity", initialCapacity,
cluster.getNamesystem(0).getCapacityTotal());
}
/**
* Waits on a {@link CountDownLatch} before calling through to the method.
*/