diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 262e0c29685..fcf702d4155 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2004,6 +2004,15 @@ public class BlockManager implements BlockStatsMXBean { // This list includes decommissioning or corrupt nodes. final Set excludedNodes = new HashSet<>(rw.getContainingNodes()); + // Exclude all nodes which already exists as targets for the block + List targets = + pendingReconstruction.getTargets(rw.getBlock()); + if (targets != null) { + for (DatanodeStorageInfo dn : targets) { + excludedNodes.add(dn.getDatanodeDescriptor()); + } + } + // choose replication targets: NOT HOLDING THE GLOBAL LOCK final BlockPlacementPolicy placementPolicy = placementPolicies.getPolicy(rw.getBlock().getBlockType()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java index 8fcb0fe7a89..d52207ede37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReconstruction.java @@ -32,6 +32,8 @@ import java.util.ArrayList; import java.util.concurrent.TimeoutException; import com.google.common.base.Supplier; + +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -56,8 +58,11 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; +import org.apache.log4j.Level; import org.junit.Test; import org.mockito.Mockito; +import org.slf4j.LoggerFactory; /** * This class tests the internals of PendingReconstructionBlocks.java, as well @@ -561,4 +566,54 @@ public class TestPendingReconstruction { fsn.writeUnlock(); } } + + @Test + public void testPendingReConstructionBlocksForSameDN() throws Exception { + final Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); + cluster.waitActive(); + DFSTestUtil.setNameNodeLogLevel(Level.DEBUG); + LogCapturer logs = GenericTestUtils.LogCapturer + .captureLogs(LoggerFactory.getLogger("BlockStateChange")); + BlockManager bm = cluster.getNamesystem().getBlockManager(); + try { + DistributedFileSystem dfs = cluster.getFileSystem(); + // 1. create a file + Path filePath = new Path("/tmp.txt"); + DFSTestUtil.createFile(dfs, filePath, 1024, (short) 1, 0L); + + // 2. disable the IBR + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.pauseIBR(dn); + } + DatanodeManager datanodeManager = + cluster.getNamesystem().getBlockManager().getDatanodeManager(); + ArrayList dnList = + new ArrayList(); + datanodeManager.fetchDatanodes(dnList, dnList, false); + + LocatedBlock block = NameNodeAdapter + .getBlockLocations(cluster.getNameNode(), filePath.toString(), 0, 1) + .get(0); + + // 3. set replication as 3 + dfs.setReplication(filePath, (short) 3); + + // 4 compute replication work twice to make sure the same DN is not adding + // twice + BlockManagerTestUtil.computeAllPendingWork(bm); + BlockManagerTestUtil.computeAllPendingWork(bm); + BlockManagerTestUtil.updateState(bm); + + // 5 capture the logs and verify the reconstruction work for block for + // same DN + String blockName = + "to replicate " + block.getBlock().getLocalBlock().toString(); + assertEquals(1, StringUtils.countMatches(logs.getOutput(), blockName)); + } finally { + cluster.shutdown(); + } + } }