HDFS-2114. re-commission of a decommissioned node does not delete excess replicas. Contributed by John George.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1148981 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ee39160ab4
commit
08928d067b
|
@ -861,6 +861,9 @@ Trunk (unreleased changes)
|
||||||
HDFS-2152. TestWriteConfigurationToDFS causing the random failures. (Uma
|
HDFS-2152. TestWriteConfigurationToDFS causing the random failures. (Uma
|
||||||
Maheswara Rao G via atm)
|
Maheswara Rao G via atm)
|
||||||
|
|
||||||
|
HDFS-2114. re-commission of a decommissioned node does not delete
|
||||||
|
excess replicas. (John George via mattf)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -1877,6 +1877,25 @@ public class BlockManager {
|
||||||
+ srcNode.isDecommissionInProgress());
|
+ srcNode.isDecommissionInProgress());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* On stopping decommission, check if the node has excess replicas.
|
||||||
|
* If there are any excess replicas, call processOverReplicatedBlock()
|
||||||
|
*/
|
||||||
|
public void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode) {
|
||||||
|
final Iterator<? extends Block> it = srcNode.getBlockIterator();
|
||||||
|
while(it.hasNext()) {
|
||||||
|
final Block block = it.next();
|
||||||
|
INodeFile fileINode = blocksMap.getINode(block);
|
||||||
|
short expectedReplication = fileINode.getReplication();
|
||||||
|
NumberReplicas num = countNodes(block);
|
||||||
|
int numCurrentReplica = num.liveReplicas();
|
||||||
|
if (numCurrentReplica > expectedReplication) {
|
||||||
|
// over-replicated block
|
||||||
|
processOverReplicatedBlock(block, expectedReplication, null, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return true if there are any blocks on this node that have not
|
* Return true if there are any blocks on this node that have not
|
||||||
* yet reached their replication factor. Otherwise returns false.
|
* yet reached their replication factor. Otherwise returns false.
|
||||||
|
|
|
@ -3870,6 +3870,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
||||||
node.stopDecommission();
|
node.stopDecommission();
|
||||||
updateStats(node, true);
|
updateStats(node, true);
|
||||||
}
|
}
|
||||||
|
blockManager.processOverReplicatedBlocksOnReCommission(node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,6 +51,8 @@ public class TestDecommission {
|
||||||
static final int blockSize = 8192;
|
static final int blockSize = 8192;
|
||||||
static final int fileSize = 16384;
|
static final int fileSize = 16384;
|
||||||
static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
|
static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds
|
||||||
|
static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec
|
||||||
|
static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval
|
||||||
|
|
||||||
Random myrand = new Random();
|
Random myrand = new Random();
|
||||||
Path hostsFile;
|
Path hostsFile;
|
||||||
|
@ -74,7 +76,10 @@ public class TestDecommission {
|
||||||
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
|
conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
|
||||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
|
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
|
||||||
|
|
||||||
writeConfigFile(excludeFile, null);
|
writeConfigFile(excludeFile, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,46 +125,64 @@ public class TestDecommission {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For blocks that reside on the nodes that are down, verify that their
|
* Verify that the number of replicas are as expected for each block in
|
||||||
* replication factor is 1 more than the specified one.
|
* the given file.
|
||||||
|
* For blocks with a decommissioned node, verify that their replication
|
||||||
|
* is 1 more than what is specified.
|
||||||
|
* For blocks without decommissioned nodes, verify their replication is
|
||||||
|
* equal to what is specified.
|
||||||
|
*
|
||||||
|
* @param downnode - if null, there is no decommissioned node for this file.
|
||||||
|
* @return - null if no failure found, else an error message string.
|
||||||
*/
|
*/
|
||||||
private void checkFile(FileSystem fileSys, Path name, int repl,
|
private String checkFile(FileSystem fileSys, Path name, int repl,
|
||||||
String downnode, int numDatanodes) throws IOException {
|
String downnode, int numDatanodes) throws IOException {
|
||||||
//
|
boolean isNodeDown = (downnode != null);
|
||||||
// sleep an additional 10 seconds for the blockreports from the datanodes
|
|
||||||
// to arrive.
|
|
||||||
//
|
|
||||||
// need a raw stream
|
// need a raw stream
|
||||||
assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem);
|
assertTrue("Not HDFS:"+fileSys.getUri(),
|
||||||
|
fileSys instanceof DistributedFileSystem);
|
||||||
DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream)
|
DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream)
|
||||||
((DistributedFileSystem)fileSys).open(name);
|
((DistributedFileSystem)fileSys).open(name);
|
||||||
Collection<LocatedBlock> dinfo = dis.getAllBlocks();
|
Collection<LocatedBlock> dinfo = dis.getAllBlocks();
|
||||||
|
|
||||||
for (LocatedBlock blk : dinfo) { // for each block
|
for (LocatedBlock blk : dinfo) { // for each block
|
||||||
int hasdown = 0;
|
int hasdown = 0;
|
||||||
int firstDecomNodeIndex = -1;
|
|
||||||
DatanodeInfo[] nodes = blk.getLocations();
|
DatanodeInfo[] nodes = blk.getLocations();
|
||||||
for (int j = 0; j < nodes.length; j++) { // for each replica
|
for (int j = 0; j < nodes.length; j++) { // for each replica
|
||||||
if (nodes[j].getName().equals(downnode)) {
|
if (isNodeDown && nodes[j].getName().equals(downnode)) {
|
||||||
hasdown++;
|
hasdown++;
|
||||||
LOG.info("Block " + blk.getBlock() + " replica " + nodes[j].getName()
|
//Downnode must actually be decommissioned
|
||||||
+ " is decommissioned.");
|
if (!nodes[j].isDecommissioned()) {
|
||||||
|
return "For block " + blk.getBlock() + " replica on " +
|
||||||
|
nodes[j].getName() + " is given as downnode, " +
|
||||||
|
"but is not decommissioned";
|
||||||
}
|
}
|
||||||
|
//Decommissioned node (if any) should only be last node in list.
|
||||||
|
if (j != nodes.length - 1) {
|
||||||
|
return "For block " + blk.getBlock() + " decommissioned node "
|
||||||
|
+ nodes[j].getName() + " was not last node in list: "
|
||||||
|
+ (j + 1) + " of " + nodes.length;
|
||||||
|
}
|
||||||
|
LOG.info("Block " + blk.getBlock() + " replica on " +
|
||||||
|
nodes[j].getName() + " is decommissioned.");
|
||||||
|
} else {
|
||||||
|
//Non-downnodes must not be decommissioned
|
||||||
if (nodes[j].isDecommissioned()) {
|
if (nodes[j].isDecommissioned()) {
|
||||||
if (firstDecomNodeIndex == -1) {
|
return "For block " + blk.getBlock() + " replica on " +
|
||||||
firstDecomNodeIndex = j;
|
nodes[j].getName() + " is unexpectedly decommissioned";
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
assertEquals("Decom node is not at the end", firstDecomNodeIndex, -1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Block " + blk.getBlock() + " has " + hasdown
|
LOG.info("Block " + blk.getBlock() + " has " + hasdown
|
||||||
+ " decommissioned replica.");
|
+ " decommissioned replica.");
|
||||||
assertEquals("Number of replicas for block " + blk.getBlock(),
|
if(Math.min(numDatanodes, repl+hasdown) != nodes.length) {
|
||||||
Math.min(numDatanodes, repl+hasdown), nodes.length);
|
return "Wrong number of replicas for block " + blk.getBlock() +
|
||||||
|
": " + nodes.length + ", expected " +
|
||||||
|
Math.min(numDatanodes, repl+hasdown);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
|
private void cleanupFile(FileSystem fileSys, Path name) throws IOException {
|
||||||
assertTrue(fileSys.exists(name));
|
assertTrue(fileSys.exists(name));
|
||||||
|
@ -208,6 +231,15 @@ public class TestDecommission {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* stop decommission of the datanode and wait for each to reach the NORMAL state */
|
||||||
|
private void recomissionNode(DatanodeInfo decommissionedNode) throws IOException {
|
||||||
|
LOG.info("Recommissioning node: " + decommissionedNode.getName());
|
||||||
|
writeConfigFile(excludeFile, null);
|
||||||
|
cluster.getNamesystem().refreshNodes(conf);
|
||||||
|
waitNodeState(decommissionedNode, AdminStates.NORMAL);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Wait till node is fully decommissioned.
|
* Wait till node is fully decommissioned.
|
||||||
*/
|
*/
|
||||||
|
@ -286,6 +318,14 @@ public class TestDecommission {
|
||||||
testDecommission(1, 6);
|
testDecommission(1, 6);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests recommission for non federated cluster
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testRecommission() throws IOException {
|
||||||
|
testRecommission(1, 6);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test decommission for federeated cluster
|
* Test decommission for federeated cluster
|
||||||
*/
|
*/
|
||||||
|
@ -323,15 +363,68 @@ public class TestDecommission {
|
||||||
DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
|
DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
|
||||||
assertEquals("All datanodes must be alive", numDatanodes,
|
assertEquals("All datanodes must be alive", numDatanodes,
|
||||||
client.datanodeReport(DatanodeReportType.LIVE).length);
|
client.datanodeReport(DatanodeReportType.LIVE).length);
|
||||||
checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes);
|
assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
|
||||||
cleanupFile(fileSys, file1);
|
cleanupFile(fileSys, file1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restart the cluster and ensure decommissioned datanodes
|
// Restart the cluster and ensure recommissioned datanodes
|
||||||
// are allowed to register with the namenode
|
// are allowed to register with the namenode
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
startCluster(numNamenodes, numDatanodes, conf);
|
startCluster(numNamenodes, numDatanodes, conf);
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private void testRecommission(int numNamenodes, int numDatanodes)
|
||||||
|
throws IOException {
|
||||||
|
LOG.info("Starting test testRecommission");
|
||||||
|
|
||||||
|
startCluster(numNamenodes, numDatanodes, conf);
|
||||||
|
|
||||||
|
ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList =
|
||||||
|
new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
|
||||||
|
for(int i = 0; i < numNamenodes; i++) {
|
||||||
|
namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
|
||||||
|
}
|
||||||
|
Path file1 = new Path("testDecommission.dat");
|
||||||
|
int replicas = numDatanodes - 1;
|
||||||
|
|
||||||
|
for (int i = 0; i < numNamenodes; i++) {
|
||||||
|
ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
|
||||||
|
FileSystem fileSys = cluster.getFileSystem(i);
|
||||||
|
writeFile(fileSys, file1, replicas);
|
||||||
|
|
||||||
|
// Decommission one node. Verify that node is decommissioned.
|
||||||
|
DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes,
|
||||||
|
AdminStates.DECOMMISSIONED);
|
||||||
|
decommissionedNodes.add(decomNode);
|
||||||
|
|
||||||
|
// Ensure decommissioned datanode is not automatically shutdown
|
||||||
|
DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
|
||||||
|
assertEquals("All datanodes must be alive", numDatanodes,
|
||||||
|
client.datanodeReport(DatanodeReportType.LIVE).length);
|
||||||
|
assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes));
|
||||||
|
|
||||||
|
// stop decommission and check if the new replicas are removed
|
||||||
|
recomissionNode(decomNode);
|
||||||
|
// wait for the block to be deleted
|
||||||
|
int tries = 0;
|
||||||
|
while (tries++ < 20) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
if (checkFile(fileSys, file1, replicas, null, numDatanodes) == null) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cleanupFile(fileSys, file1);
|
||||||
|
assertTrue("Checked if node was recommissioned " + tries + " times.",
|
||||||
|
tries < 20);
|
||||||
|
LOG.info("tried: " + tries + " times before recommissioned");
|
||||||
|
}
|
||||||
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue