HDFS-9838. Refactor the excessReplicateMap to a class.

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-02-24 19:42:47 -08:00
parent c684f2b007
commit 6979cbfc1f
7 changed files with 141 additions and 87 deletions

View File

@ -228,6 +228,8 @@ Trunk (Unreleased)
HDFS-9829. Erasure Coding: Improve few exception handling logic of
ErasureCodingWorker. (Rakesh R via jing9)
HDFS-9838. Refactor the excessReplicateMap to a class. (szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -155,7 +155,6 @@ public class BlockManager implements BlockStatsMXBean {
/** flag indicating whether replication queues have been initialized */
private boolean initializedReplQueues;
private final AtomicLong excessBlocksCount = new AtomicLong(0L);
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
private final long startupDelayBlockDeletionInMs;
private final BlockReportLeaseManager blockReportLeaseManager;
@ -187,7 +186,7 @@ public long getStartupDelayBlockDeletionInMs() {
}
/** Used by metrics */
public long getExcessBlocksCount() {
return excessBlocksCount.get();
return excessReplicas.size();
}
/** Used by metrics */
public long getPostponedMisreplicatedBlocksCount() {
@ -247,8 +246,7 @@ public int getPendingDataNodeMessageCount() {
* Maps a StorageID to the set of blocks that are "extra" for this
* DataNode. We'll eventually remove these extras.
*/
public final Map<String, LightWeightHashSet<BlockInfo>> excessReplicateMap =
new HashMap<>();
private final ExcessReplicaMap excessReplicas = new ExcessReplicaMap();
/**
* Store set of Blocks that need to be replicated 1 or more times.
@ -1832,13 +1830,17 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
}
containingNodes.add(node);
// do not select corrupted replica as src. also do not select the block
// that is already in excess map
// do not select the replica if it is corrupt or excess
if (state == StoredReplicaState.CORRUPT ||
state == StoredReplicaState.EXCESS) {
continue;
}
// never use already decommissioned nodes or unknown state replicas
if (state == null || state == StoredReplicaState.DECOMMISSIONED) {
continue;
}
if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
&& !node.isDecommissionInProgress()
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
@ -1847,10 +1849,6 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block,
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) {
continue;
}
// never use already decommissioned nodes
if (node.isDecommissioned()) {
continue;
}
if(isStriped || srcNodes.isEmpty()) {
srcNodes.add(node);
@ -3194,9 +3192,7 @@ private void processOverReplicatedBlock(final BlockInfo block,
postponeBlock(block);
return;
}
LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get(
cur.getDatanodeUuid());
if (excessBlocks == null || !excessBlocks.contains(block)) {
if (!isExcess(cur, block)) {
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) {
@ -3335,7 +3331,7 @@ private void processChosenExcessReplica(
final Collection<DatanodeStorageInfo> nonExcess,
final DatanodeStorageInfo chosen, BlockInfo storedBlock) {
nonExcess.remove(chosen);
addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock);
excessReplicas.add(chosen.getDatanodeDescriptor(), storedBlock);
//
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
@ -3351,21 +3347,6 @@ private void processChosenExcessReplica(
+ "({}, {}) is added to invalidated blocks set", chosen, storedBlock);
}
private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) {
assert namesystem.hasWriteLock();
LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get(
dn.getDatanodeUuid());
if (excessBlocks == null) {
excessBlocks = new LightWeightHashSet<>();
excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks);
}
if (excessBlocks.add(storedBlock)) {
excessBlocksCount.incrementAndGet();
blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to"
+ " excessReplicateMap", dn, storedBlock);
}
}
private void removeStoredBlock(DatanodeStorageInfo storageInfo, Block block,
DatanodeDescriptor node) {
if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) {
@ -3414,24 +3395,7 @@ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
updateNeededReplications(storedBlock, -1, 0);
}
//
// We've removed a block from a node, so it's definitely no longer
// in "excess" there.
//
LightWeightHashSet<BlockInfo> excessBlocks = excessReplicateMap.get(
node.getDatanodeUuid());
if (excessBlocks != null) {
if (excessBlocks.remove(storedBlock)) {
excessBlocksCount.decrementAndGet();
blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " +
"excessBlocks", storedBlock);
if (excessBlocks.size() == 0) {
excessReplicateMap.remove(node.getDatanodeUuid());
}
}
}
// Remove the replica from corruptReplicas
excessReplicas.remove(node, storedBlock);
corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node);
}
}
@ -3745,10 +3709,13 @@ private void countReplicasForStripedBlock(NumberReplicas counters,
}
}
private boolean isExcess(DatanodeDescriptor node, BlockInfo block) {
LightWeightHashSet<BlockInfo> blocksExcess = excessReplicateMap.get(
node.getDatanodeUuid());
return blocksExcess != null && blocksExcess.contains(block);
@VisibleForTesting
int getExcessSize4Testing(String dnUuid) {
return excessReplicas.getSize4Testing(dnUuid);
}
public boolean isExcess(DatanodeDescriptor dn, BlockInfo blk) {
return excessReplicas.contains(dn, blk);
}
/**
@ -4053,31 +4020,15 @@ public int numCorruptReplicas(Block block) {
}
public void removeBlockFromMap(BlockInfo block) {
removeFromExcessReplicateMap(block);
for(DatanodeStorageInfo info : blocksMap.getStorages(block)) {
excessReplicas.remove(info.getDatanodeDescriptor(), block);
}
blocksMap.removeBlock(block);
// If block is removed from blocksMap remove it from corruptReplicasMap
corruptReplicas.removeFromCorruptReplicasMap(block);
}
/**
* If a block is removed from blocksMap, remove it from excessReplicateMap.
*/
private void removeFromExcessReplicateMap(BlockInfo block) {
for (DatanodeStorageInfo info : blocksMap.getStorages(block)) {
String uuid = info.getDatanodeDescriptor().getDatanodeUuid();
LightWeightHashSet<BlockInfo> excessReplicas =
excessReplicateMap.get(uuid);
if (excessReplicas != null) {
if (excessReplicas.remove(block)) {
excessBlocksCount.decrementAndGet();
if (excessReplicas.isEmpty()) {
excessReplicateMap.remove(uuid);
}
}
}
}
}
public int getCapacity() {
return blocksMap.getCapacity();
}
@ -4270,7 +4221,7 @@ int computeDatanodeWork() {
public void clearQueues() {
neededReplications.clear();
pendingReplications.clear();
excessReplicateMap.clear();
excessReplicas.clear();
invalidateBlocks.clear();
datanodeManager.clearPendingQueues();
postponedMisreplicatedBlocks.clear();

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.slf4j.Logger;
import com.google.common.annotations.VisibleForTesting;
/**
* Maps a datnode to the set of excess replicas.
*
* This class is thread safe.
*/
class ExcessReplicaMap {
public static final Logger blockLog = NameNode.blockStateChangeLog;
private final Map<String, LightWeightHashSet<BlockInfo>> map =new HashMap<>();
private final AtomicLong size = new AtomicLong(0L);
/** @return the number of replicas in this map. */
long size() {
return size.get();
}
/** @return the number of replicas corresponding to the given datanode. */
@VisibleForTesting
synchronized int getSize4Testing(String dnUuid) {
final LightWeightHashSet<BlockInfo> set = map.get(dnUuid);
return set == null? 0: set.size();
}
synchronized void clear() {
map.clear();
size.set(0L);
}
/**
* @return does this map contains a replica corresponding to the given
* datanode and the given block?
*/
synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) {
final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
return set != null && set.contains(blk);
}
/**
* Add the replica of the given block stored in the given datanode to the map.
* @return true if the block is added.
*/
synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) {
LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
if (set == null) {
set = new LightWeightHashSet<>();
map.put(dn.getDatanodeUuid(), set);
}
final boolean added = set.add(blk);
if (added) {
size.incrementAndGet();
blockLog.debug("BLOCK* ExcessReplicaMap.add({}, {})", dn, blk);
}
return added;
}
/**
* Remove the replica corresponding to the given datanode and the given block.
* @return true if the block is removed.
*/
synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) {
final LightWeightHashSet<BlockInfo> set = map.get(dn.getDatanodeUuid());
if (set == null) {
return false;
}
final boolean removed = set.remove(blk);
if (removed) {
size.decrementAndGet();
blockLog.debug("BLOCK* ExcessReplicaMap.remove({}, {})", dn, blk);
if (set.isEmpty()) {
map.remove(dn.getDatanodeUuid());
}
}
return removed;
}
}

View File

@ -73,7 +73,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
@ -570,8 +569,6 @@ private String getReplicaInfo(BlockInfo storedBlock) {
storage.getStorageType()));
}
if (showReplicaDetails) {
LightWeightHashSet<BlockInfo> blocksExcess =
blockManager.excessReplicateMap.get(dnDesc.getDatanodeUuid());
Collection<DatanodeDescriptor> corruptReplicas =
blockManager.getCorruptReplicas(storedBlock);
sb.append("(");
@ -582,8 +579,7 @@ private String getReplicaInfo(BlockInfo storedBlock) {
} else if (corruptReplicas != null
&& corruptReplicas.contains(dnDesc)) {
sb.append("CORRUPT)");
} else if (blocksExcess != null
&& blocksExcess.contains(storedBlock)) {
} else if (blockManager.isExcess(dnDesc, storedBlock)) {
sb.append("EXCESS)");
} else if (dnDesc.isStale(this.staleInterval)) {
sb.append("STALE_NODE)");

View File

@ -19,7 +19,6 @@
import static org.junit.Assert.assertTrue;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
@ -105,10 +104,12 @@ public void testNodeCount() throws Exception {
// find out a non-excess node
DatanodeDescriptor nonExcessDN = null;
for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) {
final DatanodeDescriptor dn = storage.getDatanodeDescriptor();
Collection<BlockInfo> blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid());
if (blocks == null || !blocks.contains(block.getLocalBlock()) ) {
final BlockInfo info = new BlockInfoContiguous(
block.getLocalBlock(), (short)0);
if (!bm.isExcess(dn, info)) {
nonExcessDN = dn;
break;
}

View File

@ -22,7 +22,6 @@
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@ -153,6 +152,7 @@ public void testChooseReplicaToDelete() throws Exception {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
fs = cluster.getFileSystem();
final FSNamesystem namesystem = cluster.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 300);
cluster.startDataNodes(conf, 1, true, null, null, null);
@ -171,8 +171,7 @@ public void testChooseReplicaToDelete() throws Exception {
long waitTime = DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT * 1000 *
(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT + 1);
do {
nodeInfo = namesystem.getBlockManager().getDatanodeManager()
.getDatanode(dnReg);
nodeInfo = bm.getDatanodeManager().getDatanode(dnReg);
lastHeartbeat = nodeInfo.getLastUpdateMonotonic();
} while (monotonicNow() - lastHeartbeat < waitTime);
fs.setReplication(fileName, (short)3);
@ -183,10 +182,9 @@ public void testChooseReplicaToDelete() throws Exception {
// All replicas for deletion should be scheduled on lastDN.
// And should not actually be deleted, because lastDN does not heartbeat.
namesystem.readLock();
Collection<BlockInfo> dnBlocks =
namesystem.getBlockManager().excessReplicateMap.get(lastDNid);
final int dnBlocks = bm.getExcessSize4Testing(dnReg.getDatanodeUuid());
assertEquals("Replicas on node " + lastDNid + " should have been deleted",
SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size());
SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks);
namesystem.readUnlock();
for(BlockLocation location : locs)
assertEquals("Block should still have 4 replicas",

View File

@ -289,7 +289,7 @@ public void testExcessBlocks() throws Exception {
fs.delete(file, true);
rb = getMetrics(NS_METRICS);
assertGauge("ExcessBlocks", 0L, rb);
assertTrue(bm.excessReplicateMap.isEmpty());
assertEquals(0L, bm.getExcessBlocksCount());
}
/** Test to ensure metrics reflects missing blocks */