HDFS-14108. Performance improvement in BlockManager Data Structures. Contributed by Beluga Behr.

This commit is contained in:
Giovanni Matteo Fumarola 2018-11-28 11:25:47 -08:00
parent 64a4b6b08b
commit 4ca3a6b21a
1 changed files with 12 additions and 18 deletions

View File

@ -34,12 +34,10 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@ -844,7 +842,7 @@ public class BlockManager implements BlockStatsMXBean {
// source node returned is not used
chooseSourceDatanodes(getStoredBlock(block), containingNodes,
containingLiveReplicasNodes, numReplicas,
new LinkedList<Byte>(), LowRedundancyBlocks.LEVEL);
new ArrayList<Byte>(), LowRedundancyBlocks.LEVEL);
// containingLiveReplicasNodes can include READ_ONLY_SHARED replicas which are
// not included in the numReplicas.liveReplicas() count
@ -1842,7 +1840,7 @@ public class BlockManager implements BlockStatsMXBean {
int computeReconstructionWorkForBlocks(
List<List<BlockInfo>> blocksToReconstruct) {
int scheduledWork = 0;
List<BlockReconstructionWork> reconWork = new LinkedList<>();
List<BlockReconstructionWork> reconWork = new ArrayList<>();
// Step 1: categorize at-risk blocks into replication and EC tasks
namesystem.writeLock();
@ -1864,14 +1862,10 @@ public class BlockManager implements BlockStatsMXBean {
}
// Step 2: choose target nodes for each reconstruction task
final Set<Node> excludedNodes = new HashSet<>();
for(BlockReconstructionWork rw : reconWork){
for (BlockReconstructionWork rw : reconWork) {
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
excludedNodes.clear();
for (DatanodeDescriptor dn : rw.getContainingNodes()) {
excludedNodes.add(dn);
}
final Set<Node> excludedNodes = new HashSet<>(rw.getContainingNodes());
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
final BlockPlacementPolicy placementPolicy =
@ -1882,9 +1876,9 @@ public class BlockManager implements BlockStatsMXBean {
// Step 3: add tasks to the DN
namesystem.writeLock();
try {
for(BlockReconstructionWork rw : reconWork){
for (BlockReconstructionWork rw : reconWork) {
final DatanodeStorageInfo[] targets = rw.getTargets();
if(targets == null || targets.length == 0){
if (targets == null || targets.length == 0) {
rw.resetTargets();
continue;
}
@ -1901,7 +1895,7 @@ public class BlockManager implements BlockStatsMXBean {
if (blockLog.isDebugEnabled()) {
// log which blocks have been scheduled for reconstruction
for(BlockReconstructionWork rw : reconWork){
for (BlockReconstructionWork rw : reconWork) {
DatanodeStorageInfo[] targets = rw.getTargets();
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
@ -2666,11 +2660,11 @@ public class BlockManager implements BlockStatsMXBean {
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
//
Collection<BlockInfoToAdd> toAdd = new LinkedList<>();
Collection<BlockInfo> toRemove = new TreeSet<>();
Collection<Block> toInvalidate = new LinkedList<>();
Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<>();
Collection<StatefulBlockInfo> toUC = new LinkedList<>();
Collection<BlockInfoToAdd> toAdd = new ArrayList<>();
Collection<BlockInfo> toRemove = new HashSet<>();
Collection<Block> toInvalidate = new ArrayList<>();
Collection<BlockToMarkCorrupt> toCorrupt = new ArrayList<>();
Collection<StatefulBlockInfo> toUC = new ArrayList<>();
boolean sorted = false;
String strBlockReportId = "";