HDFS-14699. Erasure Coding: Storage not considered in live replica when replication streams hard limit reached to threshold. Contributed by Zhao Yi Ming.

(cherry picked from commit d1c303a497)
(cherry picked from commit eb1ddcd04c)
This commit is contained in:
Surendra Singh Lilhore 2019-09-12 19:11:50 +05:30 committed by Wei-Chiu Chuang
parent f1fe3abac9
commit f0bdab02e7
2 changed files with 90 additions and 8 deletions

View File

@ -2341,6 +2341,22 @@ public class BlockManager implements BlockStatsMXBean {
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
continue; // already reached replication limit
}
// for EC here need to make sure the numReplicas replicates state correct
// because in the scheduleReconstruction it need the numReplicas to check
// whether need to reconstruct the ec internal block
byte blockIndex = -1;
if (isStriped) {
blockIndex = ((BlockInfoStriped) block)
.getStorageBlockIndex(storage);
if (!bitSet.get(blockIndex)) {
bitSet.set(blockIndex);
} else if (state == StoredReplicaState.LIVE) {
numReplicas.subtract(StoredReplicaState.LIVE, 1);
numReplicas.add(StoredReplicaState.REDUNDANT, 1);
}
}
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) {
continue;
}
@ -2348,15 +2364,7 @@ public class BlockManager implements BlockStatsMXBean {
if(isStriped || srcNodes.isEmpty()) {
srcNodes.add(node);
if (isStriped) {
byte blockIndex = ((BlockInfoStriped) block).
getStorageBlockIndex(storage);
liveBlockIndices.add(blockIndex);
if (!bitSet.get(blockIndex)) {
bitSet.set(blockIndex);
} else if (state == StoredReplicaState.LIVE) {
numReplicas.subtract(StoredReplicaState.LIVE, 1);
numReplicas.add(StoredReplicaState.REDUNDANT, 1);
}
}
continue;
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -68,6 +69,7 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.erasurecode.ECSchema;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.net.NetworkTopology;
@ -686,6 +688,67 @@ public class TestBlockManager {
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY).length);
}
@Test
public void testChooseSrcDatanodesWithDupEC() throws Exception {
bm.maxReplicationStreams = 4;
long blockId = -9223372036854775776L; // real ec block id
Block aBlock = new Block(blockId, 0, 0);
// ec policy
ECSchema rsSchema = new ECSchema("rs", 3, 2);
String policyName = "RS-3-2-128k";
int cellSize = 128 * 1024;
ErasureCodingPolicy ecPolicy =
new ErasureCodingPolicy(policyName, rsSchema, cellSize, (byte) -1);
// striped blockInfo
BlockInfoStriped aBlockInfoStriped = new BlockInfoStriped(aBlock, ecPolicy);
// ec storageInfo
DatanodeStorageInfo ds1 = DFSTestUtil.createDatanodeStorageInfo(
"storage1", "1.1.1.1", "rack1", "host1");
DatanodeStorageInfo ds2 = DFSTestUtil.createDatanodeStorageInfo(
"storage2", "2.2.2.2", "rack2", "host2");
DatanodeStorageInfo ds3 = DFSTestUtil.createDatanodeStorageInfo(
"storage3", "3.3.3.3", "rack3", "host3");
DatanodeStorageInfo ds4 = DFSTestUtil.createDatanodeStorageInfo(
"storage4", "4.4.4.4", "rack4", "host4");
DatanodeStorageInfo ds5 = DFSTestUtil.createDatanodeStorageInfo(
"storage5", "5.5.5.5", "rack5", "host5");
// link block with storage
aBlockInfoStriped.addStorage(ds1, aBlock);
aBlockInfoStriped.addStorage(ds2, new Block(blockId + 1, 0, 0));
aBlockInfoStriped.addStorage(ds3, new Block(blockId + 2, 0, 0));
// dup internal block
aBlockInfoStriped.addStorage(ds4, new Block(blockId + 3, 0, 0));
aBlockInfoStriped.addStorage(ds5, new Block(blockId + 3, 0, 0));
// simulate the node 2 arrive maxReplicationStreams
for(int i = 0; i < 4; i++){
ds4.getDatanodeDescriptor().incrementPendingReplicationWithoutTargets();
}
addEcBlockToBM(blockId, ecPolicy);
List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
List<DatanodeStorageInfo> liveNodes = new LinkedList<DatanodeStorageInfo>();
NumberReplicas numReplicas = new NumberReplicas();
List<Byte> liveBlockIndices = new ArrayList<>();
bm.chooseSourceDatanodes(
aBlockInfoStriped,
cntNodes,
liveNodes,
numReplicas, liveBlockIndices,
LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY);
assertEquals("Choose the source node for reconstruction with one node reach"
+ " the MAX maxReplicationStreams, the numReplicas still return the"
+ " correct live replicas.", 4,
numReplicas.liveReplicas());
assertEquals("Choose the source node for reconstruction with one node reach"
+ " the MAX maxReplicationStreams, the numReplicas should return"
+ " the correct redundant Internal Blocks.", 1,
numReplicas.redundantInternalBlocks());
}
@Test
public void testFavorDecomUntilHardLimit() throws Exception {
bm.maxReplicationStreams = 0;
@ -980,6 +1043,17 @@ public class TestBlockManager {
bm.setInitializedReplQueues(false);
}
private BlockInfo addEcBlockToBM(long blkId, ErasureCodingPolicy ecPolicy) {
Block block = new Block(blkId);
BlockInfo blockInfo = new BlockInfoStriped(block, ecPolicy);
long inodeId = ++mockINodeId;
final INodeFile bc = TestINodeFile.createINodeFile(inodeId);
bm.blocksMap.addBlockCollection(blockInfo, bc);
blockInfo.setBlockCollectionId(inodeId);
doReturn(bc).when(fsn).getBlockCollection(inodeId);
return blockInfo;
}
private BlockInfo addBlockToBM(long blkId) {
Block block = new Block(blkId);
BlockInfo blockInfo = new BlockInfoContiguous(block, (short) 3);