HDFS-8025. Addendum fix for HDFS-3087 Decomissioning on NN restart can complete without blocks being replicated. Contributed by Ming Ma.
This commit is contained in:
parent
6d2eca081d
commit
069366e1be
|
@ -84,6 +84,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8076. Code cleanup for DFSInputStream: use offset instead of
|
HDFS-8076. Code cleanup for DFSInputStream: use offset instead of
|
||||||
LocatedBlock when possible. (Zhe Zhang via wang)
|
LocatedBlock when possible. (Zhe Zhang via wang)
|
||||||
|
|
||||||
|
HDFS-8025. Addendum fix for HDFS-3087 Decomissioning on NN restart can
|
||||||
|
complete without blocks being replicated. (Ming Ma via wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -3308,6 +3308,11 @@ public class BlockManager {
|
||||||
* liveness. Dead nodes cannot always be safely decommissioned.
|
* liveness. Dead nodes cannot always be safely decommissioned.
|
||||||
*/
|
*/
|
||||||
boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
|
boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
|
||||||
|
if (!node.checkBlockReportReceived()) {
|
||||||
|
LOG.info("Node {} hasn't sent its first block report.", node);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
if (node.isAlive) {
|
if (node.isAlive) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -882,9 +882,12 @@ public class TestDecommission {
|
||||||
int numNamenodes = 1;
|
int numNamenodes = 1;
|
||||||
int numDatanodes = 1;
|
int numDatanodes = 1;
|
||||||
int replicas = 1;
|
int replicas = 1;
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
|
||||||
|
DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 5);
|
||||||
|
|
||||||
startCluster(numNamenodes, numDatanodes, conf);
|
startCluster(numNamenodes, numDatanodes, conf);
|
||||||
Path file1 = new Path("testDecommission.dat");
|
Path file1 = new Path("testDecommissionWithNamenodeRestart.dat");
|
||||||
FileSystem fileSys = cluster.getFileSystem();
|
FileSystem fileSys = cluster.getFileSystem();
|
||||||
writeFile(fileSys, file1, replicas);
|
writeFile(fileSys, file1, replicas);
|
||||||
|
|
||||||
|
@ -894,37 +897,26 @@ public class TestDecommission {
|
||||||
String excludedDatanodeName = info[0].getXferAddr();
|
String excludedDatanodeName = info[0].getXferAddr();
|
||||||
|
|
||||||
writeConfigFile(excludeFile, new ArrayList<String>(Arrays.asList(excludedDatanodeName)));
|
writeConfigFile(excludeFile, new ArrayList<String>(Arrays.asList(excludedDatanodeName)));
|
||||||
|
|
||||||
//Add a new datanode to cluster
|
//Add a new datanode to cluster
|
||||||
cluster.startDataNodes(conf, 1, true, null, null, null, null);
|
cluster.startDataNodes(conf, 1, true, null, null, null, null);
|
||||||
numDatanodes+=1;
|
numDatanodes+=1;
|
||||||
|
|
||||||
assertEquals("Number of datanodes should be 2 ", 2, cluster.getDataNodes().size());
|
assertEquals("Number of datanodes should be 2 ", 2, cluster.getDataNodes().size());
|
||||||
//Restart the namenode
|
//Restart the namenode
|
||||||
cluster.restartNameNode();
|
cluster.restartNameNode();
|
||||||
DatanodeInfo datanodeInfo = NameNodeAdapter.getDatanode(
|
DatanodeInfo datanodeInfo = NameNodeAdapter.getDatanode(
|
||||||
cluster.getNamesystem(), excludedDatanodeID);
|
cluster.getNamesystem(), excludedDatanodeID);
|
||||||
waitNodeState(datanodeInfo, AdminStates.DECOMMISSIONED);
|
waitNodeState(datanodeInfo, AdminStates.DECOMMISSIONED);
|
||||||
|
|
||||||
// Ensure decommissioned datanode is not automatically shutdown
|
// Ensure decommissioned datanode is not automatically shutdown
|
||||||
assertEquals("All datanodes must be alive", numDatanodes,
|
assertEquals("All datanodes must be alive", numDatanodes,
|
||||||
client.datanodeReport(DatanodeReportType.LIVE).length);
|
client.datanodeReport(DatanodeReportType.LIVE).length);
|
||||||
// wait for the block to be replicated
|
assertTrue("Checked if block was replicated after decommission.",
|
||||||
int tries = 0;
|
checkFile(fileSys, file1, replicas, datanodeInfo.getXferAddr(),
|
||||||
while (tries++ < 20) {
|
numDatanodes) == null);
|
||||||
try {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
if (checkFile(fileSys, file1, replicas, datanodeInfo.getXferAddr(),
|
|
||||||
numDatanodes) == null) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} catch (InterruptedException ie) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assertTrue("Checked if block was replicated after decommission, tried "
|
|
||||||
+ tries + " times.", tries < 20);
|
|
||||||
cleanupFile(fileSys, file1);
|
|
||||||
|
|
||||||
|
cleanupFile(fileSys, file1);
|
||||||
// Restart the cluster and ensure recommissioned datanodes
|
// Restart the cluster and ensure recommissioned datanodes
|
||||||
// are allowed to register with the namenode
|
// are allowed to register with the namenode
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue