HDFS-9600. do not check replication if the block is under construction (Contributed by Phil Yang)

(cherry picked from commit 34cd7cd765)
This commit is contained in:
Vinayakumar B 2016-01-07 11:27:42 +05:30
parent 2c841e6564
commit aa710bd461
4 changed files with 69 additions and 5 deletions

View File

@ -2861,6 +2861,9 @@ Release 2.6.4 - UNRELEASED
BUG FIXES BUG FIXES
HDFS-9600. do not check replication if the block is under construction
(Phil Yang via vinayakumarb)
Release 2.6.3 - 2015-12-17 Release 2.6.3 - 2015-12-17
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -3595,7 +3595,8 @@ boolean isPlacementPolicySatisfied(BlockInfo storedBlock) {
*/ */
boolean isNeededReplication(BlockInfo storedBlock, int current) { boolean isNeededReplication(BlockInfo storedBlock, int current) {
int expected = storedBlock.getReplication(); int expected = storedBlock.getReplication();
return current < expected || !isPlacementPolicySatisfied(storedBlock); return storedBlock.isComplete()
&& (current < expected || !isPlacementPolicySatisfied(storedBlock));
} }
public short getExpectedReplicaNum(BlockInfo block) { public short getExpectedReplicaNum(BlockInfo block) {

View File

@ -253,8 +253,9 @@ private boolean isSufficientlyReplicated(BlockInfo block,
NumberReplicas numberReplicas) { NumberReplicas numberReplicas) {
final int numExpected = block.getReplication(); final int numExpected = block.getReplication();
final int numLive = numberReplicas.liveReplicas(); final int numLive = numberReplicas.liveReplicas();
if (!blockManager.isNeededReplication(block, numLive)) { if (numLive >= numExpected
// Block doesn't need replication. Skip. && blockManager.isPlacementPolicySatisfied(block)) {
// Block has enough replica, skip
LOG.trace("Block {} does not need replication.", block); LOG.trace("Block {} does not need replication.", block);
return true; return true;
} }

View File

@ -32,6 +32,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -48,16 +49,22 @@
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@ -70,8 +77,11 @@
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile; import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
@ -396,8 +406,57 @@ private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex,
BlockInfo block = addBlockOnNodes(testIndex, origNodes); BlockInfo block = addBlockOnNodes(testIndex, origNodes);
assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block))); assertFalse(bm.isNeededReplication(block, bm.countLiveNodes(block)));
} }
@Test(timeout = 60000)
public void testNeededReplicationWhileAppending() throws IOException {
Configuration conf = new HdfsConfiguration();
String src = "/test-file";
Path file = new Path(src);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
cluster.waitActive();
try {
BlockManager bm = cluster.getNamesystem().getBlockManager();
FileSystem fs = cluster.getFileSystem();
NamenodeProtocols namenode = cluster.getNameNodeRpc();
DFSOutputStream out = null;
try {
out = (DFSOutputStream) (fs.create(file).
getWrappedStream());
out.write(1);
out.hflush();
out.close();
FSDataInputStream in = null;
ExtendedBlock oldBlock = null;
LocatedBlock oldLoactedBlock = null;
try {
in = fs.open(file);
oldLoactedBlock = DFSTestUtil.getAllBlocks(in).get(0);
oldBlock = oldLoactedBlock.getBlock();
} finally {
IOUtils.closeStream(in);
}
String clientName =
((DistributedFileSystem) fs).getClient().getClientName();
namenode.append(src, clientName,
new EnumSetWritable<>(EnumSet.of(CreateFlag.APPEND)));
LocatedBlock newLocatedBlock =
namenode.updateBlockForPipeline(oldBlock, clientName);
ExtendedBlock newBlock =
new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(),
oldBlock.getNumBytes(),
newLocatedBlock.getBlock().getGenerationStamp());
namenode.updatePipeline(clientName, oldBlock, newBlock,
oldLoactedBlock.getLocations(), oldLoactedBlock.getStorageIDs());
BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock());
assertFalse(bm.isNeededReplication(bi, bm.countLiveNodes(bi)));
} finally {
IOUtils.closeStream(out);
}
} finally {
cluster.shutdown();
}
}
/** /**
* Tell the block manager that replication is completed for the given * Tell the block manager that replication is completed for the given
* pipeline. * pipeline.