HDFS-5662. Can't decommission a DataNode due to file's replication factor larger than the rest of the cluster size. Contributed by Brandon Li

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1552131 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Brandon Li 2013-12-18 22:26:33 +00:00
parent c9d74139bc
commit fc966461e0
3 changed files with 94 additions and 5 deletions

View File

@ -839,6 +839,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5592. statechangeLog of completeFile should be logged only in case of success. HDFS-5592. statechangeLog of completeFile should be logged only in case of success.
(Vinayakumar via umamahesh) (Vinayakumar via umamahesh)
HDFS-5662. Can't decommission a DataNode due to file's replication factor
larger than the rest of the cluster size. (brandonli)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -2893,6 +2893,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
*/ */
boolean isReplicationInProgress(DatanodeDescriptor srcNode) { boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
boolean status = false; boolean status = false;
boolean firstReplicationLog = true;
int underReplicatedBlocks = 0; int underReplicatedBlocks = 0;
int decommissionOnlyReplicas = 0; int decommissionOnlyReplicas = 0;
int underReplicatedInOpenFiles = 0; int underReplicatedInOpenFiles = 0;
@ -2907,10 +2908,17 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
int curExpectedReplicas = getReplication(block); int curExpectedReplicas = getReplication(block);
if (isNeededReplication(block, curExpectedReplicas, curReplicas)) { if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
if (curExpectedReplicas > curReplicas) { if (curExpectedReplicas > curReplicas) {
//Log info about one block for this node which needs replication // Log info about one block for this node which needs replication
if (!status) { if (!status) {
status = true; status = true;
logBlockReplicationInfo(block, srcNode, num); if (firstReplicationLog) {
logBlockReplicationInfo(block, srcNode, num);
}
// Allowing decommission as long as default replication is met
if (curReplicas >= defaultReplication) {
status = false;
firstReplicationLog = false;
}
} }
underReplicatedBlocks++; underReplicatedBlocks++;
if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) { if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {

View File

@ -336,6 +336,58 @@ public class TestDecommission {
testDecommission(1, 6); testDecommission(1, 6);
} }
/**
* Tests decommission with replicas on the target datanode cannot be migrated
* to other datanodes and satisfy the replication factor. Make sure the
* datanode won't get stuck in decommissioning state.
*/
@Test(timeout = 360000)
public void testDecommission2() throws IOException {
LOG.info("Starting test testDecommission");
int numNamenodes = 1;
int numDatanodes = 4;
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 3);
startCluster(numNamenodes, numDatanodes, conf);
ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = new ArrayList<ArrayList<DatanodeInfo>>(
numNamenodes);
namenodeDecomList.add(0, new ArrayList<DatanodeInfo>(numDatanodes));
Path file1 = new Path("testDecommission2.dat");
int replicas = 4;
// Start decommissioning one namenode at a time
ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(0);
FileSystem fileSys = cluster.getFileSystem(0);
FSNamesystem ns = cluster.getNamesystem(0);
writeFile(fileSys, file1, replicas);
int deadDecomissioned = ns.getNumDecomDeadDataNodes();
int liveDecomissioned = ns.getNumDecomLiveDataNodes();
// Decommission one node. Verify that node is decommissioned.
DatanodeInfo decomNode = decommissionNode(0, decommissionedNodes,
AdminStates.DECOMMISSIONED);
decommissionedNodes.add(decomNode);
assertEquals(deadDecomissioned, ns.getNumDecomDeadDataNodes());
assertEquals(liveDecomissioned + 1, ns.getNumDecomLiveDataNodes());
// Ensure decommissioned datanode is not automatically shutdown
DFSClient client = getDfsClient(cluster.getNameNode(0), conf);
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
numDatanodes));
cleanupFile(fileSys, file1);
// Restart the cluster and ensure recommissioned datanodes
// are allowed to register with the namenode
cluster.shutdown();
startCluster(1, 4, conf);
cluster.shutdown();
}
/** /**
* Tests recommission for non federated cluster * Tests recommission for non federated cluster
*/ */
@ -388,7 +440,20 @@ public class TestDecommission {
DFSClient client = getDfsClient(cluster.getNameNode(i), conf); DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
assertEquals("All datanodes must be alive", numDatanodes, assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length); client.datanodeReport(DatanodeReportType.LIVE).length);
assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), numDatanodes)); // wait for the block to be replicated
int tries = 0;
while (tries++ < 20) {
try {
Thread.sleep(1000);
if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
numDatanodes) == null) {
break;
}
} catch (InterruptedException ie) {
}
}
assertTrue("Checked if block was replicated after decommission, tried "
+ tries + " times.", tries < 20);
cleanupFile(fileSys, file1); cleanupFile(fileSys, file1);
} }
} }
@ -429,12 +494,25 @@ public class TestDecommission {
DFSClient client = getDfsClient(cluster.getNameNode(i), conf); DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
assertEquals("All datanodes must be alive", numDatanodes, assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length); client.datanodeReport(DatanodeReportType.LIVE).length);
assertNull(checkFile(fileSys, file1, replicas, decomNode.getXferAddr(), numDatanodes)); int tries =0;
// wait for the block to be replicated
while (tries++ < 20) {
try {
Thread.sleep(1000);
if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
numDatanodes) == null) {
break;
}
} catch (InterruptedException ie) {
}
}
assertTrue("Checked if block was replicated after decommission, tried "
+ tries + " times.", tries < 20);
// stop decommission and check if the new replicas are removed // stop decommission and check if the new replicas are removed
recomissionNode(decomNode); recomissionNode(decomNode);
// wait for the block to be deleted // wait for the block to be deleted
int tries = 0; tries = 0;
while (tries++ < 20) { while (tries++ < 20) {
try { try {
Thread.sleep(1000); Thread.sleep(1000);