diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a929c4399a7..858a54f8b92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -736,7 +736,12 @@ public class BlockManager implements BlockStatsMXBean { return false; // already completed (e.g. by syncBlock) final boolean b = commitBlock(lastBlock, commitBlock); - if (countNodes(lastBlock).liveReplicas() >= minReplication) { + + // Count replicas on decommissioning nodes, as these will not be + // decommissioned unless recovery/completing last block has finished + NumberReplicas numReplicas = countNodes(lastBlock); + if (numReplicas.liveReplicas() + numReplicas.decommissioning() >= + minReplication) { if (b) { addExpectedReplicasToPending(lastBlock, bc); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java index 1d5ebbfbf95..78f6221584b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java @@ -873,6 +873,54 @@ public class TestDecommission { fdos.close(); } + + @Test(timeout = 360000) + public void testDecommissionWithOpenFileAndBlockRecovery() + throws IOException, InterruptedException { + startCluster(1, 6, conf); + cluster.waitActive(); + + Path file = new Path("/testRecoveryDecommission"); + + // Create a file and never close the output stream to trigger recovery + DistributedFileSystem dfs = cluster.getFileSystem(); + FSNamesystem ns = cluster.getNamesystem(0); + FSDataOutputStream out = dfs.create(file, true, + conf.getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), + (short) 3, blockSize); + + // Write data to the file + long writtenBytes = 0; + while (writtenBytes < fileSize) { + out.writeLong(writtenBytes); + writtenBytes += 8; + } + out.hsync(); + + DatanodeInfo[] lastBlockLocations = NameNodeAdapter.getBlockLocations( + cluster.getNameNode(), "/testRecoveryDecommission", 0, fileSize) + .getLastLocatedBlock().getLocations(); + + // Decommission all nodes of the last block + ArrayList toDecom = new ArrayList<>(); + for (DatanodeInfo dnDecom : lastBlockLocations) { + toDecom.add(dnDecom.getXferAddr()); + } + writeConfigFile(excludeFile, toDecom); + refreshNodes(ns, conf); + + // Make sure hard lease expires to trigger replica recovery + cluster.setLeasePeriod(300L, 300L); + Thread.sleep(2 * BLOCKREPORT_INTERVAL_MSEC); + + for (DatanodeInfo dnDecom : lastBlockLocations) { + DatanodeInfo datanode = NameNodeAdapter.getDatanode( + cluster.getNamesystem(), dnDecom); + waitNodeState(datanode, AdminStates.DECOMMISSIONED); + } + + assertEquals(dfs.getFileStatus(file).getLen(), writtenBytes); + } /** * Tests restart of namenode while datanode hosts are added to exclude file