HDFS-8674. Improve performance of postponed block scans. Contributed by Daryn Sharp.
(cherry picked from commit 0d8a35bd6d
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
This commit is contained in:
parent
948d0ac72d
commit
cafee1109c
|
@ -114,6 +114,8 @@ Release 2.7.4 - UNRELEASED
|
|||
HDFS-9710. DN can be configured to send block receipt IBRs in batches.
|
||||
(Tsz-Wo Nicholas Sze. Backport HDFS-11837 by Vinitha Reddy Gankidi)
|
||||
|
||||
HDFS-8674. Improve performance of postponed block scans. (Daryn Sharp)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HDFS-8307. Spurious DNS Queries from hdfs shell. (Andres Perez via aengineer)
|
||||
|
|
|
@ -27,6 +27,7 @@ import java.util.Collections;
|
|||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -127,7 +128,6 @@ public class BlockManager {
|
|||
private volatile long underReplicatedBlocksCount = 0L;
|
||||
private volatile long scheduledReplicationBlocksCount = 0L;
|
||||
private final AtomicLong excessBlocksCount = new AtomicLong(0L);
|
||||
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
|
||||
private final long startupDelayBlockDeletionInMs;
|
||||
|
||||
/** Used by metrics */
|
||||
|
@ -160,7 +160,7 @@ public class BlockManager {
|
|||
}
|
||||
/** Used by metrics */
|
||||
public long getPostponedMisreplicatedBlocksCount() {
|
||||
return postponedMisreplicatedBlocksCount.get();
|
||||
return postponedMisreplicatedBlocks.size();
|
||||
}
|
||||
/** Used by metrics */
|
||||
public int getPendingDataNodeMessageCount() {
|
||||
|
@ -196,7 +196,10 @@ public class BlockManager {
|
|||
* notified of all block deletions that might have been pending
|
||||
* when the failover happened.
|
||||
*/
|
||||
private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
|
||||
private final LinkedHashSet<Block> postponedMisreplicatedBlocks =
|
||||
new LinkedHashSet<Block>();
|
||||
private final int blocksPerPostpondedRescan;
|
||||
private final ArrayList<Block> rescannedMisreplicatedBlocks;
|
||||
|
||||
/**
|
||||
* Maps a StorageID to the set of blocks that are "extra" for this
|
||||
|
@ -286,7 +289,10 @@ public class BlockManager {
|
|||
this.namesystem = namesystem;
|
||||
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
||||
heartbeatManager = datanodeManager.getHeartbeatManager();
|
||||
|
||||
blocksPerPostpondedRescan = (int)Math.min(Integer.MAX_VALUE,
|
||||
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan());
|
||||
rescannedMisreplicatedBlocks =
|
||||
new ArrayList<Block>(blocksPerPostpondedRescan);
|
||||
startupDelayBlockDeletionInMs = conf.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_SEC_DEFAULT) * 1000L;
|
||||
|
@ -1319,9 +1325,7 @@ public class BlockManager {
|
|||
|
||||
|
||||
private void postponeBlock(Block blk) {
|
||||
if (postponedMisreplicatedBlocks.add(blk)) {
|
||||
postponedMisreplicatedBlocksCount.incrementAndGet();
|
||||
}
|
||||
postponedMisreplicatedBlocks.add(blk);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1941,39 +1945,14 @@ public class BlockManager {
|
|||
if (getPostponedMisreplicatedBlocksCount() == 0) {
|
||||
return;
|
||||
}
|
||||
long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
|
||||
long startPostponedMisReplicatedBlocksCount =
|
||||
getPostponedMisreplicatedBlocksCount();
|
||||
namesystem.writeLock();
|
||||
long startTime = Time.monotonicNow();
|
||||
long startSize = postponedMisreplicatedBlocks.size();
|
||||
try {
|
||||
// blocksPerRescan is the configured number of blocks per rescan.
|
||||
// Randomly select blocksPerRescan consecutive blocks from the HashSet
|
||||
// when the number of blocks remaining is larger than blocksPerRescan.
|
||||
// The reason we don't always pick the first blocksPerRescan blocks is to
|
||||
// handle the case if for some reason some datanodes remain in
|
||||
// content stale state for a long time and only impact the first
|
||||
// blocksPerRescan blocks.
|
||||
int i = 0;
|
||||
long startIndex = 0;
|
||||
long blocksPerRescan =
|
||||
datanodeManager.getBlocksPerPostponedMisreplicatedBlocksRescan();
|
||||
long base = getPostponedMisreplicatedBlocksCount() - blocksPerRescan;
|
||||
if (base > 0) {
|
||||
startIndex = DFSUtil.getRandom().nextLong() % (base+1);
|
||||
if (startIndex < 0) {
|
||||
startIndex += (base+1);
|
||||
}
|
||||
}
|
||||
Iterator<Block> it = postponedMisreplicatedBlocks.iterator();
|
||||
for (int tmp = 0; tmp < startIndex; tmp++) {
|
||||
it.next();
|
||||
}
|
||||
|
||||
for (;it.hasNext(); i++) {
|
||||
for (int i=0; i < blocksPerPostpondedRescan && it.hasNext(); i++) {
|
||||
Block b = it.next();
|
||||
if (i >= blocksPerRescan) {
|
||||
break;
|
||||
}
|
||||
it.remove();
|
||||
|
||||
BlockInfoContiguous bi = blocksMap.getStoredBlock(b);
|
||||
if (bi == null) {
|
||||
|
@ -1982,8 +1961,6 @@ public class BlockManager {
|
|||
"Postponed mis-replicated block " + b + " no longer found " +
|
||||
"in block map.");
|
||||
}
|
||||
it.remove();
|
||||
postponedMisreplicatedBlocksCount.decrementAndGet();
|
||||
continue;
|
||||
}
|
||||
MisReplicationResult res = processMisReplicatedBlock(bi);
|
||||
|
@ -1991,20 +1968,19 @@ public class BlockManager {
|
|||
LOG.debug("BLOCK* rescanPostponedMisreplicatedBlocks: " +
|
||||
"Re-scanned block " + b + ", result is " + res);
|
||||
}
|
||||
if (res != MisReplicationResult.POSTPONE) {
|
||||
it.remove();
|
||||
postponedMisreplicatedBlocksCount.decrementAndGet();
|
||||
if (res == MisReplicationResult.POSTPONE) {
|
||||
rescannedMisreplicatedBlocks.add(b);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
postponedMisreplicatedBlocks.addAll(rescannedMisreplicatedBlocks);
|
||||
rescannedMisreplicatedBlocks.clear();
|
||||
long endSize = postponedMisreplicatedBlocks.size();
|
||||
namesystem.writeUnlock();
|
||||
long endPostponedMisReplicatedBlocksCount =
|
||||
getPostponedMisreplicatedBlocksCount();
|
||||
LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
|
||||
(Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) +
|
||||
" msecs. " + endPostponedMisReplicatedBlocksCount +
|
||||
" blocks are left. " + (startPostponedMisReplicatedBlocksCount -
|
||||
endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
|
||||
(Time.monotonicNow() - startTime) + " msecs. " +
|
||||
endSize + " blocks are left. " +
|
||||
(startSize - endSize) + " blocks were removed.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3400,9 +3376,7 @@ public class BlockManager {
|
|||
// Remove the block from pendingReplications and neededReplications
|
||||
pendingReplications.remove(block);
|
||||
neededReplications.remove(block, UnderReplicatedBlocks.LEVEL);
|
||||
if (postponedMisreplicatedBlocks.remove(block)) {
|
||||
postponedMisreplicatedBlocksCount.decrementAndGet();
|
||||
}
|
||||
postponedMisreplicatedBlocks.remove(block);
|
||||
}
|
||||
|
||||
public BlockInfoContiguous getStoredBlock(Block block) {
|
||||
|
@ -3730,7 +3704,6 @@ public class BlockManager {
|
|||
invalidateBlocks.clear();
|
||||
datanodeManager.clearPendingQueues();
|
||||
postponedMisreplicatedBlocks.clear();
|
||||
postponedMisreplicatedBlocksCount.set(0);
|
||||
};
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue