diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 83d1d85f8a7..ccd861a710a 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -861,6 +861,9 @@ Trunk (unreleased changes) HDFS-2152. TestWriteConfigurationToDFS causing the random failures. (Uma Maheswara Rao G via atm) + HDFS-2114. re-commission of a decommissioned node does not delete + excess replicas. (John George via mattf) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d1a59cbd199..46a66a175bd 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1877,6 +1877,25 @@ public class BlockManager { + srcNode.isDecommissionInProgress()); } + /** + * On stopping decommission, check if the node has excess replicas. + * If there are any excess replicas, call processOverReplicatedBlock() + */ + public void processOverReplicatedBlocksOnReCommission(DatanodeDescriptor srcNode) { + final Iterator it = srcNode.getBlockIterator(); + while(it.hasNext()) { + final Block block = it.next(); + INodeFile fileINode = blocksMap.getINode(block); + short expectedReplication = fileINode.getReplication(); + NumberReplicas num = countNodes(block); + int numCurrentReplica = num.liveReplicas(); + if (numCurrentReplica > expectedReplication) { + // over-replicated block + processOverReplicatedBlock(block, expectedReplication, null, null); + } + } + } + /** * Return true if there are any blocks on this node that have not * yet reached their replication factor. Otherwise returns false. diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 1ceb3d2aa9f..f607f666dfc 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3870,6 +3870,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, node.stopDecommission(); updateStats(node, true); } + blockManager.processOverReplicatedBlocksOnReCommission(node); } } diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java index 6ded5a3d07c..491a0b550d6 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java @@ -51,6 +51,8 @@ public class TestDecommission { static final int blockSize = 8192; static final int fileSize = 16384; static final int HEARTBEAT_INTERVAL = 1; // heartbeat interval in seconds + static final int BLOCKREPORT_INTERVAL_MSEC = 1000; //block report in msec + static final int NAMENODE_REPLICATION_INTERVAL = 1; //replication interval Random myrand = new Random(); Path hostsFile; @@ -74,7 +76,10 @@ public class TestDecommission { conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath()); conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL); + conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC); conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL); + writeConfigFile(excludeFile, null); } @@ -118,49 +123,67 @@ public class TestDecommission { stm.close(); LOG.info("Created file " + name + " with " + repl + " replicas."); } - + /** - * For blocks that reside on the nodes that are down, verify that their - * replication factor is 1 more than the specified one. + * Verify that the number of replicas are as expected for each block in + * the given file. + * For blocks with a decommissioned node, verify that their replication + * is 1 more than what is specified. + * For blocks without decommissioned nodes, verify their replication is + * equal to what is specified. + * + * @param downnode - if null, there is no decommissioned node for this file. + * @return - null if no failure found, else an error message string. */ - private void checkFile(FileSystem fileSys, Path name, int repl, - String downnode, int numDatanodes) throws IOException { - // - // sleep an additional 10 seconds for the blockreports from the datanodes - // to arrive. - // + private String checkFile(FileSystem fileSys, Path name, int repl, + String downnode, int numDatanodes) throws IOException { + boolean isNodeDown = (downnode != null); // need a raw stream - assertTrue("Not HDFS:"+fileSys.getUri(), fileSys instanceof DistributedFileSystem); - - DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream) + assertTrue("Not HDFS:"+fileSys.getUri(), + fileSys instanceof DistributedFileSystem); + DFSClient.DFSDataInputStream dis = (DFSClient.DFSDataInputStream) ((DistributedFileSystem)fileSys).open(name); Collection dinfo = dis.getAllBlocks(); - for (LocatedBlock blk : dinfo) { // for each block int hasdown = 0; - int firstDecomNodeIndex = -1; DatanodeInfo[] nodes = blk.getLocations(); - for (int j = 0; j < nodes.length; j++) { // for each replica - if (nodes[j].getName().equals(downnode)) { + for (int j = 0; j < nodes.length; j++) { // for each replica + if (isNodeDown && nodes[j].getName().equals(downnode)) { hasdown++; - LOG.info("Block " + blk.getBlock() + " replica " + nodes[j].getName() - + " is decommissioned."); - } - if (nodes[j].isDecommissioned()) { - if (firstDecomNodeIndex == -1) { - firstDecomNodeIndex = j; + //Downnode must actually be decommissioned + if (!nodes[j].isDecommissioned()) { + return "For block " + blk.getBlock() + " replica on " + + nodes[j].getName() + " is given as downnode, " + + "but is not decommissioned"; + } + //Decommissioned node (if any) should only be last node in list. + if (j != nodes.length - 1) { + return "For block " + blk.getBlock() + " decommissioned node " + + nodes[j].getName() + " was not last node in list: " + + (j + 1) + " of " + nodes.length; + } + LOG.info("Block " + blk.getBlock() + " replica on " + + nodes[j].getName() + " is decommissioned."); + } else { + //Non-downnodes must not be decommissioned + if (nodes[j].isDecommissioned()) { + return "For block " + blk.getBlock() + " replica on " + + nodes[j].getName() + " is unexpectedly decommissioned"; } - continue; } - assertEquals("Decom node is not at the end", firstDecomNodeIndex, -1); } + LOG.info("Block " + blk.getBlock() + " has " + hasdown - + " decommissioned replica."); - assertEquals("Number of replicas for block " + blk.getBlock(), - Math.min(numDatanodes, repl+hasdown), nodes.length); + + " decommissioned replica."); + if(Math.min(numDatanodes, repl+hasdown) != nodes.length) { + return "Wrong number of replicas for block " + blk.getBlock() + + ": " + nodes.length + ", expected " + + Math.min(numDatanodes, repl+hasdown); + } } + return null; } - + private void cleanupFile(FileSystem fileSys, Path name) throws IOException { assertTrue(fileSys.exists(name)); fileSys.delete(name, true); @@ -208,6 +231,15 @@ public class TestDecommission { return ret; } + /* stop decommission of the datanode and wait for each to reach the NORMAL state */ + private void recomissionNode(DatanodeInfo decommissionedNode) throws IOException { + LOG.info("Recommissioning node: " + decommissionedNode.getName()); + writeConfigFile(excludeFile, null); + cluster.getNamesystem().refreshNodes(conf); + waitNodeState(decommissionedNode, AdminStates.NORMAL); + + } + /* * Wait till node is fully decommissioned. */ @@ -286,6 +318,14 @@ public class TestDecommission { testDecommission(1, 6); } + /** + * Tests recommission for non federated cluster + */ + @Test + public void testRecommission() throws IOException { + testRecommission(1, 6); + } + /** * Test decommission for federeated cluster */ @@ -323,15 +363,68 @@ public class TestDecommission { DFSClient client = getDfsClient(cluster.getNameNode(i), conf); assertEquals("All datanodes must be alive", numDatanodes, client.datanodeReport(DatanodeReportType.LIVE).length); - checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes); + assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes)); cleanupFile(fileSys, file1); } } - - // Restart the cluster and ensure decommissioned datanodes + + // Restart the cluster and ensure recommissioned datanodes // are allowed to register with the namenode cluster.shutdown(); startCluster(numNamenodes, numDatanodes, conf); + cluster.shutdown(); + } + + + private void testRecommission(int numNamenodes, int numDatanodes) + throws IOException { + LOG.info("Starting test testRecommission"); + + startCluster(numNamenodes, numDatanodes, conf); + + ArrayList> namenodeDecomList = + new ArrayList>(numNamenodes); + for(int i = 0; i < numNamenodes; i++) { + namenodeDecomList.add(i, new ArrayList(numDatanodes)); + } + Path file1 = new Path("testDecommission.dat"); + int replicas = numDatanodes - 1; + + for (int i = 0; i < numNamenodes; i++) { + ArrayList decommissionedNodes = namenodeDecomList.get(i); + FileSystem fileSys = cluster.getFileSystem(i); + writeFile(fileSys, file1, replicas); + + // Decommission one node. Verify that node is decommissioned. + DatanodeInfo decomNode = decommissionNode(i, decommissionedNodes, + AdminStates.DECOMMISSIONED); + decommissionedNodes.add(decomNode); + + // Ensure decommissioned datanode is not automatically shutdown + DFSClient client = getDfsClient(cluster.getNameNode(i), conf); + assertEquals("All datanodes must be alive", numDatanodes, + client.datanodeReport(DatanodeReportType.LIVE).length); + assertNull(checkFile(fileSys, file1, replicas, decomNode.getName(), numDatanodes)); + + // stop decommission and check if the new replicas are removed + recomissionNode(decomNode); + // wait for the block to be deleted + int tries = 0; + while (tries++ < 20) { + try { + Thread.sleep(1000); + if (checkFile(fileSys, file1, replicas, null, numDatanodes) == null) { + break; + } + } catch (InterruptedException ie) { + } + } + cleanupFile(fileSys, file1); + assertTrue("Checked if node was recommissioned " + tries + " times.", + tries < 20); + LOG.info("tried: " + tries + " times before recommissioned"); + } + cluster.shutdown(); } /**