diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index cf179869e5a..2313f13c0bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1193,6 +1193,9 @@ Release 2.6.4 - UNRELEASED BUG FIXES + HDFS-9600. do not check replication if the block is under construction + (Phil Yang via vinayakumarb) + Release 2.6.3 - 2015-12-17 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index db50e44469c..ef784d2d40e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3444,7 +3444,14 @@ public class BlockManager { * or if it does not have enough racks. */ boolean isNeededReplication(Block b, int expected, int current) { - return current < expected || !isPlacementPolicySatisfied(b); + BlockInfoContiguous blockInfo; + if (b instanceof BlockInfoContiguous) { + blockInfo = (BlockInfoContiguous) b; + } else { + blockInfo = getStoredBlock(b); + } + return blockInfo.isComplete() + && (current < expected || !isPlacementPolicySatisfied(b)); } public long getMissingBlocksCount() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 7f3d77802ec..c76b192c7e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -253,8 +253,9 @@ public class DecommissionManager { NumberReplicas numberReplicas) { final int numExpected = bc.getBlockReplication(); final int numLive = numberReplicas.liveReplicas(); - if (!blockManager.isNeededReplication(block, numExpected, numLive)) { - // Block doesn't need replication. Skip. + if (numLive >= numExpected + && blockManager.isPlacementPolicySatisfied(block)) { + // Block has enough replica, skip LOG.trace("Block {} does not need replication.", block); return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 920a18dcff2..196a38b4ce0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -30,21 +30,28 @@ import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; @@ -53,8 +60,11 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetworkTopology; import org.junit.Assert; @@ -371,8 +381,58 @@ public class TestBlockManager { bm.processMisReplicatedBlocks(); assertEquals(0, bm.numOfUnderReplicatedBlocks()); } - - + + @Test(timeout = 60000) + public void testNeededReplicationWhileAppending() throws IOException { + Configuration conf = new HdfsConfiguration(); + String src = "/test-file"; + Path file = new Path(src); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + try { + BlockManager bm = cluster.getNamesystem().getBlockManager(); + FileSystem fs = cluster.getFileSystem(); + NamenodeProtocols namenode = cluster.getNameNodeRpc(); + DFSOutputStream out = null; + try { + out = (DFSOutputStream) (fs.create(file). + getWrappedStream()); + out.write(1); + out.hflush(); + out.close(); + FSDataInputStream in = null; + ExtendedBlock oldBlock = null; + LocatedBlock oldLoactedBlock = null; + try { + in = fs.open(file); + oldLoactedBlock = DFSTestUtil.getAllBlocks(in).get(0); + oldBlock = oldLoactedBlock.getBlock(); + } finally { + IOUtils.closeStream(in); + } + String clientName = + ((DistributedFileSystem) fs).getClient().getClientName(); + namenode.append(src, clientName, new EnumSetWritable<>( + EnumSet.of(CreateFlag.APPEND))); + LocatedBlock newLocatedBlock = + namenode.updateBlockForPipeline(oldBlock, clientName); + ExtendedBlock newBlock = + new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(), + oldBlock.getNumBytes(), + newLocatedBlock.getBlock().getGenerationStamp()); + namenode.updatePipeline(clientName, oldBlock, newBlock, + oldLoactedBlock.getLocations(), oldLoactedBlock.getStorageIDs()); + BlockInfoContiguous bi = bm.getStoredBlock(newBlock.getLocalBlock()); + assertFalse(bm.isNeededReplication(bi, + oldLoactedBlock.getLocations().length, bm.countLiveNodes(bi))); + } finally { + IOUtils.closeStream(out); + } + } finally { + cluster.shutdown(); + } + } + /** * Tell the block manager that replication is completed for the given * pipeline.