From a14baa05b0371329594501a9e54b77276e7ade1e Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Thu, 7 Jan 2016 11:27:42 +0530 Subject: [PATCH] HDFS-9600. do not check replication if the block is under construction (Contributed by Phil Yang) (cherry picked from commit 34cd7cd76505d01ec251e30837c94ab03319a0c1) (cherry picked from commit aa710bd461b593b0f3d7d7ac41ca68e1aa3fa9d6) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 3 +- .../blockmanagement/DecommissionManager.java | 5 +- .../blockmanagement/TestBlockManager.java | 63 ++++++++++++++++++- 4 files changed, 69 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 62962948b2f..3fdffb06082 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -2814,6 +2814,9 @@ Release 2.6.4 - UNRELEASED BUG FIXES + HDFS-9600. do not check replication if the block is under construction + (Phil Yang via vinayakumarb) + Release 2.6.3 - 2015-12-17 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d1d536e03b5..b477fadf5a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3533,7 +3533,8 @@ public class BlockManager implements BlockStatsMXBean { */ boolean isNeededReplication(BlockInfo storedBlock, int current) { int expected = storedBlock.getReplication(); - return current < expected || !isPlacementPolicySatisfied(storedBlock); + return storedBlock.isComplete() + && (current < expected || !isPlacementPolicySatisfied(storedBlock)); } public short getExpectedReplicaNum(BlockInfo block) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index c895a450fe4..ec9a71065e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -253,8 +253,9 @@ public class DecommissionManager { NumberReplicas numberReplicas) { final int numExpected = block.getReplication(); final int numLive = numberReplicas.liveReplicas(); - if (!blockManager.isNeededReplication(block, numLive)) { - // Block doesn't need replication. Skip. + if (numLive >= numExpected + && blockManager.isPlacementPolicySatisfied(block)) { + // Block has enough replica, skip LOG.trace("Block {} does not need replication.", block); return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java index 1cebac197e0..e9c9ec3fa51 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.LinkedList; import java.util.List; import java.util.Map.Entry; @@ -48,16 +49,22 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; @@ -70,8 +77,11 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.TestINodeFile; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.io.EnumSetWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetworkTopology; @@ -396,8 +406,57 @@ public class TestBlockManager { BlockInfo block = addBlockOnNodes(testIndex, origNodes); assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block))); } - - + + @Test(timeout = 60000) + public void testNeededReplicationWhileAppending() throws IOException { + Configuration conf = new HdfsConfiguration(); + String src = "/test-file"; + Path file = new Path(src); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + try { + BlockManager bm = cluster.getNamesystem().getBlockManager(); + FileSystem fs = cluster.getFileSystem(); + NamenodeProtocols namenode = cluster.getNameNodeRpc(); + DFSOutputStream out = null; + try { + out = (DFSOutputStream) (fs.create(file). + getWrappedStream()); + out.write(1); + out.hflush(); + out.close(); + FSDataInputStream in = null; + ExtendedBlock oldBlock = null; + LocatedBlock oldLoactedBlock = null; + try { + in = fs.open(file); + oldLoactedBlock = DFSTestUtil.getAllBlocks(in).get(0); + oldBlock = oldLoactedBlock.getBlock(); + } finally { + IOUtils.closeStream(in); + } + String clientName = + ((DistributedFileSystem) fs).getClient().getClientName(); + namenode.append(src, clientName, + new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND))); + LocatedBlock newLocatedBlock = + namenode.updateBlockForPipeline(oldBlock, clientName); + ExtendedBlock newBlock = + new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(), + oldBlock.getNumBytes(), + newLocatedBlock.getBlock().getGenerationStamp()); + namenode.updatePipeline(clientName, oldBlock, newBlock, + oldLoactedBlock.getLocations(), oldLoactedBlock.getStorageIDs()); + BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock()); + assertFalse(bm.isNeededReplication(bi, bm.countLiveNodes(bi))); + } finally { + IOUtils.closeStream(out); + } + } finally { + cluster.shutdown(); + } + } + /** * Tell the block manager that replication is completed for the given * pipeline.