diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 3bb7c2af3bd..39ac94ddd79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3558,13 +3558,15 @@ public class BlockManager implements BlockStatsMXBean { return; } NumberReplicas repl = countNodes(block); + int pendingNum = pendingReplications.getNumReplicas(block); int curExpectedReplicas = getReplication(block); - if (isNeededReplication(block, repl.liveReplicas())) { - neededReplications.update(block, repl.liveReplicas(), repl.readOnlyReplicas(), - repl.decommissionedAndDecommissioning(), curExpectedReplicas, - curReplicasDelta, expectedReplicasDelta); + if (!hasEnoughEffectiveReplicas(block, repl, pendingNum, + curExpectedReplicas)) { + neededReplications.update(block, repl.liveReplicas() + pendingNum, + repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), + curExpectedReplicas, curReplicasDelta, expectedReplicasDelta); } else { - int oldReplicas = repl.liveReplicas()-curReplicasDelta; + int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; neededReplications.remove(block, oldReplicas, repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), oldExpectedReplicas); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java index 2437e38acc9..5477700fa07 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; +import com.google.common.base.Supplier; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import static org.junit.Assert.assertEquals; @@ -38,6 +39,7 @@ import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; @@ -230,6 +232,65 @@ public class TestFileCorruption { } + @Test + public void testSetReplicationWhenBatchIBR() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, + 30000); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, + 1); + DistributedFileSystem dfs; + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3).build()) { + final int bufferSize = 1024; // 1024 Bytes each time + byte[] outBuffer = new byte[bufferSize]; + dfs = cluster.getFileSystem(); + String fileName = "/testSetRep1"; + Path filePath = new Path(fileName); + FSDataOutputStream out = dfs.create(filePath); + out.write(outBuffer, 0, bufferSize); + out.close(); + //sending the FBR to Delay next IBR + cluster.triggerBlockReports(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + cluster.triggerBlockReports(); + if (cluster.getNamesystem().getBlocksTotal() == 1) { + return true; + } + } catch (Exception e) { + // Ignore the exception + } + return false; + } + }, 10, 3000); + fileName = "/testSetRep2"; + filePath = new Path(fileName); + out = dfs.create(filePath); + out.write(outBuffer, 0, bufferSize); + out.close(); + dfs.setReplication(filePath, (short) 10); + // underreplicated Blocks should be one after setrep + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + try { + return cluster.getNamesystem().getBlockManager() + .getUnderReplicatedBlocksCount() == 1; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } + }, 10, 3000); + assertEquals(0, + cluster.getNamesystem().getBlockManager().getMissingBlocksCount()); + } + } + private void markAllBlocksAsCorrupt(BlockManager bm, ExtendedBlock blk) throws IOException { for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) {