HDFS-9600. do not check replication if the block is under construction (Contributed by Phil Yang)
(cherry picked from commit 34cd7cd76505d01ec251e30837c94ab03319a0c1) (cherry picked from commit aa710bd461b593b0f3d7d7ac41ca68e1aa3fa9d6) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
This commit is contained in:
parent
4be18edf2f
commit
79da1283d6
@ -27,6 +27,9 @@ Release 2.6.4 - UNRELEASED
|
||||
HDFS-9445. Datanode may deadlock while handling a bad volume.
|
||||
(Wlater Su via Kihwal)
|
||||
|
||||
HDFS-9600. do not check replication if the block is under construction
|
||||
(Phil Yang via vinayakumarb)
|
||||
|
||||
Release 2.6.3 - 2015-12-17
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -3290,8 +3290,9 @@ boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
|
||||
NumberReplicas num = countNodes(block);
|
||||
int curReplicas = num.liveReplicas();
|
||||
int curExpectedReplicas = getReplication(block);
|
||||
|
||||
if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
|
||||
|
||||
if (curReplicas < curExpectedReplicas
|
||||
|| !isPlacementPolicySatisfied(block)) {
|
||||
if (curExpectedReplicas > curReplicas) {
|
||||
if (bc.isUnderConstruction()) {
|
||||
if (block.equals(bc.getLastBlock()) && curReplicas > minReplication) {
|
||||
@ -3502,8 +3503,15 @@ boolean isPlacementPolicySatisfied(Block b) {
|
||||
* A block needs replication if the number of replicas is less than expected
|
||||
* or if it does not have enough racks.
|
||||
*/
|
||||
private boolean isNeededReplication(Block b, int expected, int current) {
|
||||
return current < expected || !isPlacementPolicySatisfied(b);
|
||||
boolean isNeededReplication(Block b, int expected, int current) {
|
||||
BlockInfo blockInfo;
|
||||
if (b instanceof BlockInfo) {
|
||||
blockInfo = (BlockInfo) b;
|
||||
} else {
|
||||
blockInfo = getStoredBlock(b);
|
||||
}
|
||||
return blockInfo.isComplete()
|
||||
&& (current < expected || !isPlacementPolicySatisfied(b));
|
||||
}
|
||||
|
||||
public long getMissingBlocksCount() {
|
||||
|
@ -30,21 +30,28 @@
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
@ -54,8 +61,11 @@
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||
import org.apache.hadoop.io.EnumSetWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.junit.Assert;
|
||||
@ -373,8 +383,59 @@ private void doTestSingleRackClusterIsSufficientlyReplicated(int testIndex,
|
||||
bm.processMisReplicatedBlocks();
|
||||
assertEquals(0, bm.numOfUnderReplicatedBlocks());
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testNeededReplicationWhileAppending() throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
String src = "/test-file";
|
||||
Path file = new Path(src);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
cluster.waitActive();
|
||||
try {
|
||||
BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
||||
DFSOutputStream out = null;
|
||||
try {
|
||||
out = (DFSOutputStream) (fs.create(file).
|
||||
getWrappedStream());
|
||||
out.write(1);
|
||||
out.hflush();
|
||||
out.close();
|
||||
FSDataInputStream in = null;
|
||||
ExtendedBlock oldBlock = null;
|
||||
LocatedBlock oldLoactedBlock = null;
|
||||
try {
|
||||
in = fs.open(file);
|
||||
oldLoactedBlock = DFSTestUtil.getAllBlocks(in).get(0);
|
||||
oldBlock = oldLoactedBlock.getBlock();
|
||||
} finally {
|
||||
IOUtils.closeStream(in);
|
||||
}
|
||||
String clientName =
|
||||
((DistributedFileSystem) fs).getClient().getClientName();
|
||||
namenode.append(src, clientName);
|
||||
LocatedBlock newLocatedBlock =
|
||||
namenode.updateBlockForPipeline(oldBlock, clientName);
|
||||
ExtendedBlock newBlock =
|
||||
new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(),
|
||||
oldBlock.getNumBytes(),
|
||||
newLocatedBlock.getBlock().getGenerationStamp());
|
||||
namenode.updatePipeline(clientName, oldBlock, newBlock,
|
||||
oldLoactedBlock.getLocations(), oldLoactedBlock.getStorageIDs());
|
||||
BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock());
|
||||
assertFalse(
|
||||
bm.isNeededReplication(bi, oldLoactedBlock.getLocations().length,
|
||||
bm.countLiveNodes(bi)));
|
||||
} finally {
|
||||
IOUtils.closeStream(out);
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Tell the block manager that replication is completed for the given
|
||||
* pipeline.
|
||||
|
Loading…
x
Reference in New Issue
Block a user