HDFS-9600. do not check replication if the block is under construction (Contributed by Phil Yang)
(cherry picked from commit34cd7cd765
) (cherry picked from commitaa710bd461
) Conflicts: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
This commit is contained in:
parent
258ef9f9c3
commit
5dc5bb2812
|
@ -1193,6 +1193,9 @@ Release 2.6.4 - UNRELEASED
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
|
HDFS-9600. do not check replication if the block is under construction
|
||||||
|
(Phil Yang via vinayakumarb)
|
||||||
|
|
||||||
Release 2.6.3 - 2015-12-17
|
Release 2.6.3 - 2015-12-17
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -3444,7 +3444,14 @@ public class BlockManager {
|
||||||
* or if it does not have enough racks.
|
* or if it does not have enough racks.
|
||||||
*/
|
*/
|
||||||
boolean isNeededReplication(Block b, int expected, int current) {
|
boolean isNeededReplication(Block b, int expected, int current) {
|
||||||
return current < expected || !isPlacementPolicySatisfied(b);
|
BlockInfoContiguous blockInfo;
|
||||||
|
if (b instanceof BlockInfoContiguous) {
|
||||||
|
blockInfo = (BlockInfoContiguous) b;
|
||||||
|
} else {
|
||||||
|
blockInfo = getStoredBlock(b);
|
||||||
|
}
|
||||||
|
return blockInfo.isComplete()
|
||||||
|
&& (current < expected || !isPlacementPolicySatisfied(b));
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMissingBlocksCount() {
|
public long getMissingBlocksCount() {
|
||||||
|
|
|
@ -253,8 +253,9 @@ public class DecommissionManager {
|
||||||
NumberReplicas numberReplicas) {
|
NumberReplicas numberReplicas) {
|
||||||
final int numExpected = bc.getBlockReplication();
|
final int numExpected = bc.getBlockReplication();
|
||||||
final int numLive = numberReplicas.liveReplicas();
|
final int numLive = numberReplicas.liveReplicas();
|
||||||
if (!blockManager.isNeededReplication(block, numExpected, numLive)) {
|
if (numLive >= numExpected
|
||||||
// Block doesn't need replication. Skip.
|
&& blockManager.isPlacementPolicySatisfied(block)) {
|
||||||
|
// Block has enough replica, skip
|
||||||
LOG.trace("Block {} does not need replication.", block);
|
LOG.trace("Block {} does not need replication.", block);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,21 +30,28 @@ import static org.mockito.Mockito.verify;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CreateFlag;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||||
|
@ -53,8 +60,11 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
|
||||||
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -371,8 +381,58 @@ public class TestBlockManager {
|
||||||
bm.processMisReplicatedBlocks();
|
bm.processMisReplicatedBlocks();
|
||||||
assertEquals(0, bm.numOfUnderReplicatedBlocks());
|
assertEquals(0, bm.numOfUnderReplicatedBlocks());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testNeededReplicationWhileAppending() throws IOException {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
String src = "/test-file";
|
||||||
|
Path file = new Path(src);
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
try {
|
||||||
|
BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
NamenodeProtocols namenode = cluster.getNameNodeRpc();
|
||||||
|
DFSOutputStream out = null;
|
||||||
|
try {
|
||||||
|
out = (DFSOutputStream) (fs.create(file).
|
||||||
|
getWrappedStream());
|
||||||
|
out.write(1);
|
||||||
|
out.hflush();
|
||||||
|
out.close();
|
||||||
|
FSDataInputStream in = null;
|
||||||
|
ExtendedBlock oldBlock = null;
|
||||||
|
LocatedBlock oldLoactedBlock = null;
|
||||||
|
try {
|
||||||
|
in = fs.open(file);
|
||||||
|
oldLoactedBlock = DFSTestUtil.getAllBlocks(in).get(0);
|
||||||
|
oldBlock = oldLoactedBlock.getBlock();
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(in);
|
||||||
|
}
|
||||||
|
String clientName =
|
||||||
|
((DistributedFileSystem) fs).getClient().getClientName();
|
||||||
|
namenode.append(src, clientName, new EnumSetWritable<>(
|
||||||
|
EnumSet.of(CreateFlag.APPEND)));
|
||||||
|
LocatedBlock newLocatedBlock =
|
||||||
|
namenode.updateBlockForPipeline(oldBlock, clientName);
|
||||||
|
ExtendedBlock newBlock =
|
||||||
|
new ExtendedBlock(oldBlock.getBlockPoolId(), oldBlock.getBlockId(),
|
||||||
|
oldBlock.getNumBytes(),
|
||||||
|
newLocatedBlock.getBlock().getGenerationStamp());
|
||||||
|
namenode.updatePipeline(clientName, oldBlock, newBlock,
|
||||||
|
oldLoactedBlock.getLocations(), oldLoactedBlock.getStorageIDs());
|
||||||
|
BlockInfoContiguous bi = bm.getStoredBlock(newBlock.getLocalBlock());
|
||||||
|
assertFalse(bm.isNeededReplication(bi,
|
||||||
|
oldLoactedBlock.getLocations().length, bm.countLiveNodes(bi)));
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(out);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tell the block manager that replication is completed for the given
|
* Tell the block manager that replication is completed for the given
|
||||||
* pipeline.
|
* pipeline.
|
||||||
|
|
Loading…
Reference in New Issue