HDFS-11224. Lifeline message should be ignored for dead nodes (Contributed by Vinayakumar B)
(cherry picked from commit d1d4aba71b
)
This commit is contained in:
parent
e51f32f74c
commit
199d8b5594
|
@ -274,6 +274,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
return leavingServiceStatus;
|
return leavingServiceStatus;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public boolean isHeartbeatedSinceRegistration() {
|
||||||
|
return heartbeatedSinceRegistration;
|
||||||
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public DatanodeStorageInfo getStorageInfo(String storageID) {
|
public DatanodeStorageInfo getStorageInfo(String storageID) {
|
||||||
synchronized (storageMap) {
|
synchronized (storageMap) {
|
||||||
|
|
|
@ -1578,10 +1578,10 @@ public class DatanodeManager {
|
||||||
LOG.debug("Received handleLifeline from nodeReg = " + nodeReg);
|
LOG.debug("Received handleLifeline from nodeReg = " + nodeReg);
|
||||||
}
|
}
|
||||||
DatanodeDescriptor nodeinfo = getDatanode(nodeReg);
|
DatanodeDescriptor nodeinfo = getDatanode(nodeReg);
|
||||||
if (nodeinfo == null) {
|
if (nodeinfo == null || !nodeinfo.isRegistered()) {
|
||||||
// This is null if the DataNode has not yet registered. We expect this
|
// This can happen if the lifeline message comes when DataNode is either
|
||||||
// will never happen, because the DataNode has logic to prevent sending
|
// not registered at all or its marked dead at NameNode and expectes
|
||||||
// lifeline messages until after initial registration is successful.
|
// re-registration. Ignore lifeline messages without registration.
|
||||||
// Lifeline message handling can't send commands back to the DataNode to
|
// Lifeline message handling can't send commands back to the DataNode to
|
||||||
// tell it to register, so simply exit.
|
// tell it to register, so simply exit.
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -504,7 +504,12 @@ class BPServiceActor implements Runnable {
|
||||||
volumeFailureSummary,
|
volumeFailureSummary,
|
||||||
requestBlockReportLease);
|
requestBlockReportLease);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void sendLifelineForTests() throws IOException {
|
||||||
|
lifelineSender.sendLifeline();
|
||||||
|
}
|
||||||
|
|
||||||
//This must be called only by BPOfferService
|
//This must be called only by BPOfferService
|
||||||
void start() {
|
void start() {
|
||||||
if ((bpThread != null) && (bpThread.isAlive())) {
|
if ((bpThread != null) && (bpThread.isAlive())) {
|
||||||
|
|
|
@ -215,7 +215,9 @@ public class BlockManagerTestUtil {
|
||||||
* @param bm the BlockManager to manipulate
|
* @param bm the BlockManager to manipulate
|
||||||
*/
|
*/
|
||||||
public static void checkHeartbeat(BlockManager bm) {
|
public static void checkHeartbeat(BlockManager bm) {
|
||||||
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
HeartbeatManager hbm = bm.getDatanodeManager().getHeartbeatManager();
|
||||||
|
hbm.restartHeartbeatStopWatch();
|
||||||
|
hbm.heartbeatCheck();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeLifelineProtocolClientSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
@ -90,6 +91,8 @@ public class TestDataNodeLifeline {
|
||||||
private DataNodeMetrics metrics;
|
private DataNodeMetrics metrics;
|
||||||
private DatanodeProtocolClientSideTranslatorPB namenode;
|
private DatanodeProtocolClientSideTranslatorPB namenode;
|
||||||
private FSNamesystem namesystem;
|
private FSNamesystem namesystem;
|
||||||
|
private DataNode dn;
|
||||||
|
private BPServiceActor bpsa;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
|
@ -106,7 +109,7 @@ public class TestDataNodeLifeline {
|
||||||
namesystem = cluster.getNameNode().getNamesystem();
|
namesystem = cluster.getNameNode().getNamesystem();
|
||||||
|
|
||||||
// Set up spies on RPC proxies so that we can inject failures.
|
// Set up spies on RPC proxies so that we can inject failures.
|
||||||
DataNode dn = cluster.getDataNodes().get(0);
|
dn = cluster.getDataNodes().get(0);
|
||||||
metrics = dn.getMetrics();
|
metrics = dn.getMetrics();
|
||||||
assertNotNull(metrics);
|
assertNotNull(metrics);
|
||||||
List<BPOfferService> allBpos = dn.getAllBpOs();
|
List<BPOfferService> allBpos = dn.getAllBpOs();
|
||||||
|
@ -118,7 +121,7 @@ public class TestDataNodeLifeline {
|
||||||
assertNotNull(allBpsa);
|
assertNotNull(allBpsa);
|
||||||
assertEquals(1, allBpsa.size());
|
assertEquals(1, allBpsa.size());
|
||||||
|
|
||||||
final BPServiceActor bpsa = allBpsa.get(0);
|
bpsa = allBpsa.get(0);
|
||||||
assertNotNull(bpsa);
|
assertNotNull(bpsa);
|
||||||
|
|
||||||
// Lifeline RPC proxy gets created on separate thread, so poll until found.
|
// Lifeline RPC proxy gets created on separate thread, so poll until found.
|
||||||
|
@ -257,6 +260,32 @@ public class TestDataNodeLifeline {
|
||||||
getLongCounter("LifelinesNumOps", getMetrics(metrics.name())));
|
getLongCounter("LifelinesNumOps", getMetrics(metrics.name())));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLifelineForDeadNode() throws Exception {
|
||||||
|
long initialCapacity = cluster.getNamesystem(0).getCapacityTotal();
|
||||||
|
assertTrue(initialCapacity > 0);
|
||||||
|
dn.setHeartbeatsDisabledForTests(true);
|
||||||
|
cluster.setDataNodesDead();
|
||||||
|
assertEquals("Capacity should be 0 after all DNs dead", 0, cluster
|
||||||
|
.getNamesystem(0).getCapacityTotal());
|
||||||
|
bpsa.sendLifelineForTests();
|
||||||
|
assertEquals("Lifeline should be ignored for dead node", 0, cluster
|
||||||
|
.getNamesystem(0).getCapacityTotal());
|
||||||
|
// Wait for re-registration and heartbeat
|
||||||
|
dn.setHeartbeatsDisabledForTests(false);
|
||||||
|
final DatanodeDescriptor dnDesc = cluster.getNamesystem(0).getBlockManager()
|
||||||
|
.getDatanodeManager().getDatanodes().iterator().next();
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return dnDesc.isAlive() && dnDesc.isHeartbeatedSinceRegistration();
|
||||||
|
}
|
||||||
|
}, 100, 5000);
|
||||||
|
assertEquals("Capacity should include only live capacity", initialCapacity,
|
||||||
|
cluster.getNamesystem(0).getCapacityTotal());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits on a {@link CountDownLatch} before calling through to the method.
|
* Waits on a {@link CountDownLatch} before calling through to the method.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue