HDFS-3087. Decomissioning on NN restart can complete without blocks being replicated. Contributed by Rushabh S Shah.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1580886 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Kihwal Lee 2014-03-24 15:39:00 +00:00
parent 0710b5890b
commit 7a18c4a199
4 changed files with 79 additions and 1 deletions

View File

@ -265,6 +265,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration. HDFS-6112. NFS Gateway docs are incorrect for allowed hosts configuration.
(atm) (atm)
HDFS-3087. Decomissioning on NN restart can complete without blocks being
replicated. (Rushabh S Shah via kihwal)
Release 2.4.0 - UNRELEASED Release 2.4.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -695,5 +695,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
public void setLastCachingDirectiveSentTimeMs(long time) { public void setLastCachingDirectiveSentTimeMs(long time) {
this.lastCachingDirectiveSentTimeMs = time; this.lastCachingDirectiveSentTimeMs = time;
} }
/**
* checks whether atleast first block report has been received
* @return
*/
public boolean checkBlockReportReceived() {
if(this.getStorageInfos().length == 0) {
return false;
}
for(DatanodeStorageInfo storageInfo: this.getStorageInfos()) {
if(storageInfo.getBlockReportCount() == 0 )
return false;
}
return true;
}
} }

View File

@ -711,7 +711,7 @@ public class DatanodeManager {
boolean checkDecommissionState(DatanodeDescriptor node) { boolean checkDecommissionState(DatanodeDescriptor node) {
// Check to see if all blocks in this decommissioned // Check to see if all blocks in this decommissioned
// node has reached their target replication factor. // node has reached their target replication factor.
if (node.isDecommissionInProgress()) { if (node.isDecommissionInProgress() && node.checkBlockReportReceived()) {
if (!blockManager.isReplicationInProgress(node)) { if (!blockManager.isReplicationInProgress(node)) {
node.setDecommissioned(); node.setDecommissioned();
LOG.info("Decommission complete for " + node); LOG.info("Decommission complete for " + node);

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
@ -829,4 +830,63 @@ public class TestDecommission {
fdos.close(); fdos.close();
} }
/**
* Tests restart of namenode while datanode hosts are added to exclude file
**/
@Test(timeout=360000)
public void testDecommissionWithNamenodeRestart()throws IOException, InterruptedException {
LOG.info("Starting test testDecommissionWithNamenodeRestart");
int numNamenodes = 1;
int numDatanodes = 1;
int replicas = 1;
startCluster(numNamenodes, numDatanodes, conf);
Path file1 = new Path("testDecommission.dat");
FileSystem fileSys = cluster.getFileSystem();
writeFile(fileSys, file1, replicas);
DFSClient client = getDfsClient(cluster.getNameNode(), conf);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
DatanodeID excludedDatanodeID = info[0];
String excludedDatanodeName = info[0].getXferAddr();
writeConfigFile(excludeFile, new ArrayList<String>(Arrays.asList(excludedDatanodeName)));
//Add a new datanode to cluster
cluster.startDataNodes(conf, 1, true, null, null, null, null);
numDatanodes+=1;
assertEquals("Number of datanodes should be 2 ", 2, cluster.getDataNodes().size());
//Restart the namenode
cluster.restartNameNode();
DatanodeInfo datanodeInfo = NameNodeAdapter.getDatanode(
cluster.getNamesystem(), excludedDatanodeID);
waitNodeState(datanodeInfo, AdminStates.DECOMMISSIONED);
// Ensure decommissioned datanode is not automatically shutdown
assertEquals("All datanodes must be alive", numDatanodes,
client.datanodeReport(DatanodeReportType.LIVE).length);
// wait for the block to be replicated
int tries = 0;
while (tries++ < 20) {
try {
Thread.sleep(1000);
if (checkFile(fileSys, file1, replicas, datanodeInfo.getXferAddr(),
numDatanodes) == null) {
break;
}
} catch (InterruptedException ie) {
}
}
assertTrue("Checked if block was replicated after decommission, tried "
+ tries + " times.", tries < 20);
cleanupFile(fileSys, file1);
// Restart the cluster and ensure recommissioned datanodes
// are allowed to register with the namenode
cluster.shutdown();
startCluster(numNamenodes, numDatanodes, conf);
cluster.shutdown();
}
} }