HDFS-14847. Erasure Coding: Blocks are over-replicated while EC decommissioning. Contributed by Fei Hui.
This commit is contained in:
parent
19755b9b36
commit
c5b4a2d115
|
@ -17,6 +17,7 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -154,7 +155,8 @@ public class BlockInfoStriped extends BlockInfo {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
byte getStorageBlockIndex(DatanodeStorageInfo storage) {
|
@VisibleForTesting
|
||||||
|
public byte getStorageBlockIndex(DatanodeStorageInfo storage) {
|
||||||
int i = this.findStorageInfo(storage);
|
int i = this.findStorageInfo(storage);
|
||||||
return i == -1 ? -1 : indices[i];
|
return i == -1 ? -1 : indices[i];
|
||||||
}
|
}
|
||||||
|
|
|
@ -437,6 +437,11 @@ public class DatanodeAdminManager {
|
||||||
return monitor.numNodesChecked;
|
return monitor.numNodesChecked;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Queue<DatanodeDescriptor> getPendingNodes() {
|
||||||
|
return pendingNodes;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks to see if datanodes have finished DECOMMISSION_INPROGRESS or
|
* Checks to see if datanodes have finished DECOMMISSION_INPROGRESS or
|
||||||
* ENTERING_MAINTENANCE state.
|
* ENTERING_MAINTENANCE state.
|
||||||
|
|
|
@ -643,7 +643,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
/**
|
/**
|
||||||
* Store block replication work.
|
* Store block replication work.
|
||||||
*/
|
*/
|
||||||
void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
|
@VisibleForTesting
|
||||||
|
public void addBlockToBeReplicated(Block block,
|
||||||
|
DatanodeStorageInfo[] targets) {
|
||||||
assert(block != null && targets != null && targets.length > 0);
|
assert(block != null && targets != null && targets.length > 0);
|
||||||
replicateBlocks.offer(new BlockTargetPair(block, targets));
|
replicateBlocks.offer(new BlockTargetPair(block, targets));
|
||||||
}
|
}
|
||||||
|
@ -701,7 +703,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
||||||
return erasurecodeBlocks.size();
|
return erasurecodeBlocks.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
int getNumberOfReplicateBlocks() {
|
@VisibleForTesting
|
||||||
|
public int getNumberOfReplicateBlocks() {
|
||||||
return replicateBlocks.size();
|
return replicateBlocks.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -164,11 +164,23 @@ class ErasureCodingWork extends BlockReconstructionWork {
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Integer> findLeavingServiceSources() {
|
private List<Integer> findLeavingServiceSources() {
|
||||||
|
// Mark the block in normal node.
|
||||||
|
BlockInfoStriped block = (BlockInfoStriped)getBlock();
|
||||||
|
BitSet bitSet = new BitSet(block.getRealTotalBlockNum());
|
||||||
|
for (int i = 0; i < getSrcNodes().length; i++) {
|
||||||
|
if (getSrcNodes()[i].isInService()) {
|
||||||
|
bitSet.set(liveBlockIndicies[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// If the block is on the node which is decommissioning or
|
||||||
|
// entering_maintenance, and it doesn't exist on other normal nodes,
|
||||||
|
// we just add the node into source list.
|
||||||
List<Integer> srcIndices = new ArrayList<>();
|
List<Integer> srcIndices = new ArrayList<>();
|
||||||
for (int i = 0; i < getSrcNodes().length; i++) {
|
for (int i = 0; i < getSrcNodes().length; i++) {
|
||||||
if (getSrcNodes()[i].isDecommissionInProgress() ||
|
if ((getSrcNodes()[i].isDecommissionInProgress() ||
|
||||||
(getSrcNodes()[i].isEnteringMaintenance() &&
|
(getSrcNodes()[i].isEnteringMaintenance() &&
|
||||||
getSrcNodes()[i].isAlive())) {
|
getSrcNodes()[i].isAlive())) &&
|
||||||
|
!bitSet.get(liveBlockIndicies[i])) {
|
||||||
srcIndices.add(i);
|
srcIndices.add(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -396,7 +396,8 @@ public class ProvidedStorageMap {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
|
public void addBlockToBeReplicated(Block block,
|
||||||
|
DatanodeStorageInfo[] targets) {
|
||||||
// pick a random datanode, delegate to it
|
// pick a random datanode, delegate to it
|
||||||
DatanodeDescriptor node = chooseRandom(targets);
|
DatanodeDescriptor node = chooseRandom(targets);
|
||||||
if (node != null) {
|
if (node != null) {
|
||||||
|
|
|
@ -27,9 +27,11 @@ import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.BitSet;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
@ -43,14 +45,19 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.test.PathUtils;
|
import org.apache.hadoop.test.PathUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -585,4 +592,121 @@ public class TestDecommissionWithStriped {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simulate that There are 2 nodes(dn0,dn1) in decommission. Firstly dn0
|
||||||
|
* replicates in success, dn1 replicates in failure. Decommissions go on.
|
||||||
|
*/
|
||||||
|
@Test (timeout = 120000)
|
||||||
|
public void testDecommissionWithFailedReplicating() throws Exception {
|
||||||
|
|
||||||
|
// Write ec file.
|
||||||
|
Path ecFile = new Path(ecDir, "firstReplicationFailedFile");
|
||||||
|
int writeBytes = cellSize * 6;
|
||||||
|
writeStripedFile(dfs, ecFile, writeBytes);
|
||||||
|
|
||||||
|
// Get 2 nodes of ec block and set them in decommission.
|
||||||
|
// The 2 nodes are not in pendingNodes of DatanodeAdminManager.
|
||||||
|
List<LocatedBlock> lbs = ((HdfsDataInputStream) dfs.open(ecFile))
|
||||||
|
.getAllBlocks();
|
||||||
|
LocatedStripedBlock blk = (LocatedStripedBlock) lbs.get(0);
|
||||||
|
DatanodeInfo[] dnList = blk.getLocations();
|
||||||
|
DatanodeDescriptor dn0 = bm.getDatanodeManager()
|
||||||
|
.getDatanode(dnList[0].getDatanodeUuid());
|
||||||
|
dn0.startDecommission();
|
||||||
|
DatanodeDescriptor dn1 = bm.getDatanodeManager()
|
||||||
|
.getDatanode(dnList[1].getDatanodeUuid());
|
||||||
|
dn1.startDecommission();
|
||||||
|
|
||||||
|
assertEquals(0, bm.getDatanodeManager().getDatanodeAdminManager()
|
||||||
|
.getNumPendingNodes());
|
||||||
|
|
||||||
|
// Replicate dn0 block to another dn
|
||||||
|
// Simulate that dn0 replicates in success, dn1 replicates in failure.
|
||||||
|
final byte blockIndex = blk.getBlockIndices()[0];
|
||||||
|
final Block targetBlk = new Block(blk.getBlock().getBlockId() + blockIndex,
|
||||||
|
cellSize, blk.getBlock().getGenerationStamp());
|
||||||
|
DatanodeInfo extraDn = getDatanodeOutOfTheBlock(blk);
|
||||||
|
DatanodeDescriptor target = bm.getDatanodeManager()
|
||||||
|
.getDatanode(extraDn.getDatanodeUuid());
|
||||||
|
dn0.addBlockToBeReplicated(targetBlk,
|
||||||
|
new DatanodeStorageInfo[] {target.getStorageInfos()[0]});
|
||||||
|
|
||||||
|
// dn0 replicates in success
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
return dn0.getNumberOfReplicateBlocks() == 0;
|
||||||
|
}
|
||||||
|
}, 100, 60000);
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean get() {
|
||||||
|
Iterator<DatanodeStorageInfo> it =
|
||||||
|
bm.getStoredBlock(targetBlk).getStorageInfos();
|
||||||
|
while(it.hasNext()) {
|
||||||
|
if (it.next().getDatanodeDescriptor().equals(target)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 100, 60000);
|
||||||
|
|
||||||
|
// There are 8 live replicas
|
||||||
|
BlockInfoStriped blockInfo =
|
||||||
|
(BlockInfoStriped)bm.getStoredBlock(
|
||||||
|
new Block(blk.getBlock().getBlockId()));
|
||||||
|
assertEquals(8, bm.countNodes(blockInfo).liveReplicas());
|
||||||
|
|
||||||
|
// Add the 2 nodes to pendingNodes of DatanodeAdminManager
|
||||||
|
bm.getDatanodeManager().getDatanodeAdminManager()
|
||||||
|
.getPendingNodes().add(dn0);
|
||||||
|
bm.getDatanodeManager().getDatanodeAdminManager()
|
||||||
|
.getPendingNodes().add(dn1);
|
||||||
|
|
||||||
|
waitNodeState(dn0, AdminStates.DECOMMISSIONED);
|
||||||
|
waitNodeState(dn1, AdminStates.DECOMMISSIONED);
|
||||||
|
|
||||||
|
// There are 9 live replicas
|
||||||
|
assertEquals(9, bm.countNodes(blockInfo).liveReplicas());
|
||||||
|
|
||||||
|
// After dn0 & dn1 decommissioned, all internal Blocks(0~8) are there
|
||||||
|
Iterator<DatanodeStorageInfo> it = blockInfo.getStorageInfos();
|
||||||
|
BitSet indexBitSet = new BitSet(9);
|
||||||
|
while(it.hasNext()) {
|
||||||
|
DatanodeStorageInfo storageInfo = it.next();
|
||||||
|
if(storageInfo.getDatanodeDescriptor().equals(dn0)
|
||||||
|
|| storageInfo.getDatanodeDescriptor().equals(dn1)) {
|
||||||
|
// Skip decommissioned nodes
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
byte index = blockInfo.getStorageBlockIndex(storageInfo);
|
||||||
|
indexBitSet.set(index);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 9; ++i) {
|
||||||
|
assertEquals(true, indexBitSet.get(i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a Datanode which does not contain the block.
|
||||||
|
*/
|
||||||
|
private DatanodeInfo getDatanodeOutOfTheBlock(LocatedStripedBlock blk)
|
||||||
|
throws Exception {
|
||||||
|
DatanodeInfo[] allDnInfos = client.datanodeReport(DatanodeReportType.LIVE);
|
||||||
|
DatanodeInfo[] blkDnInos= blk.getLocations();
|
||||||
|
for (DatanodeInfo dnInfo : allDnInfos) {
|
||||||
|
boolean in = false;
|
||||||
|
for (DatanodeInfo blkDnInfo : blkDnInos) {
|
||||||
|
if (blkDnInfo.equals(dnInfo)) {
|
||||||
|
in = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(!in) {
|
||||||
|
return dnInfo;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue