From 6979cbfc1f4c28440816b56f5624765872b0be49 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Wed, 24 Feb 2016 19:42:47 -0800 Subject: [PATCH] HDFS-9838. Refactor the excessReplicateMap to a class. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../server/blockmanagement/BlockManager.java | 95 ++++------------ .../blockmanagement/ExcessReplicaMap.java | 106 ++++++++++++++++++ .../hdfs/server/namenode/NamenodeFsck.java | 6 +- .../server/blockmanagement/TestNodeCount.java | 7 +- .../TestOverReplicatedBlocks.java | 10 +- .../namenode/metrics/TestNameNodeMetrics.java | 2 +- 7 files changed, 141 insertions(+), 87 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessReplicaMap.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 08a270dfa74..5186cc259b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -228,6 +228,8 @@ Trunk (Unreleased) HDFS-9829. Erasure Coding: Improve few exception handling logic of ErasureCodingWorker. (Rakesh R via jing9) + HDFS-9838. Refactor the excessReplicateMap to a class. (szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index c0c1ada6dbc..f483d8e8d29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -155,7 +155,6 @@ public class BlockManager implements BlockStatsMXBean { /** flag indicating whether replication queues have been initialized */ private boolean initializedReplQueues; - private final AtomicLong excessBlocksCount = new AtomicLong(0L); private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L); private final long startupDelayBlockDeletionInMs; private final BlockReportLeaseManager blockReportLeaseManager; @@ -187,7 +186,7 @@ public long getStartupDelayBlockDeletionInMs() { } /** Used by metrics */ public long getExcessBlocksCount() { - return excessBlocksCount.get(); + return excessReplicas.size(); } /** Used by metrics */ public long getPostponedMisreplicatedBlocksCount() { @@ -247,8 +246,7 @@ public int getPendingDataNodeMessageCount() { * Maps a StorageID to the set of blocks that are "extra" for this * DataNode. We'll eventually remove these extras. */ - public final Map> excessReplicateMap = - new HashMap<>(); + private final ExcessReplicaMap excessReplicas = new ExcessReplicaMap(); /** * Store set of Blocks that need to be replicated 1 or more times. @@ -1832,13 +1830,17 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, } containingNodes.add(node); - // do not select corrupted replica as src. also do not select the block - // that is already in excess map + // do not select the replica if it is corrupt or excess if (state == StoredReplicaState.CORRUPT || state == StoredReplicaState.EXCESS) { continue; } + // never use already decommissioned nodes or unknown state replicas + if (state == null || state == StoredReplicaState.DECOMMISSIONED) { + continue; + } + if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY && !node.isDecommissionInProgress() && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) { @@ -1847,10 +1849,6 @@ DatanodeDescriptor[] chooseSourceDatanodes(BlockInfo block, if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) { continue; } - // never use already decommissioned nodes - if (node.isDecommissioned()) { - continue; - } if(isStriped || srcNodes.isEmpty()) { srcNodes.add(node); @@ -3194,9 +3192,7 @@ private void processOverReplicatedBlock(final BlockInfo block, postponeBlock(block); return; } - LightWeightHashSet excessBlocks = excessReplicateMap.get( - cur.getDatanodeUuid()); - if (excessBlocks == null || !excessBlocks.contains(block)) { + if (!isExcess(cur, block)) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { // exclude corrupt replicas if (corruptNodes == null || !corruptNodes.contains(cur)) { @@ -3335,7 +3331,7 @@ private void processChosenExcessReplica( final Collection nonExcess, final DatanodeStorageInfo chosen, BlockInfo storedBlock) { nonExcess.remove(chosen); - addToExcessReplicate(chosen.getDatanodeDescriptor(), storedBlock); + excessReplicas.add(chosen.getDatanodeDescriptor(), storedBlock); // // The 'excessblocks' tracks blocks until we get confirmation // that the datanode has deleted them; the only way we remove them @@ -3351,21 +3347,6 @@ private void processChosenExcessReplica( + "({}, {}) is added to invalidated blocks set", chosen, storedBlock); } - private void addToExcessReplicate(DatanodeInfo dn, BlockInfo storedBlock) { - assert namesystem.hasWriteLock(); - LightWeightHashSet excessBlocks = excessReplicateMap.get( - dn.getDatanodeUuid()); - if (excessBlocks == null) { - excessBlocks = new LightWeightHashSet<>(); - excessReplicateMap.put(dn.getDatanodeUuid(), excessBlocks); - } - if (excessBlocks.add(storedBlock)) { - excessBlocksCount.incrementAndGet(); - blockLog.debug("BLOCK* addToExcessReplicate: ({}, {}) is added to" - + " excessReplicateMap", dn, storedBlock); - } - } - private void removeStoredBlock(DatanodeStorageInfo storageInfo, Block block, DatanodeDescriptor node) { if (shouldPostponeBlocksFromFuture && isGenStampInFuture(block)) { @@ -3414,24 +3395,7 @@ public void removeStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) { updateNeededReplications(storedBlock, -1, 0); } - // - // We've removed a block from a node, so it's definitely no longer - // in "excess" there. - // - LightWeightHashSet excessBlocks = excessReplicateMap.get( - node.getDatanodeUuid()); - if (excessBlocks != null) { - if (excessBlocks.remove(storedBlock)) { - excessBlocksCount.decrementAndGet(); - blockLog.debug("BLOCK* removeStoredBlock: {} is removed from " + - "excessBlocks", storedBlock); - if (excessBlocks.size() == 0) { - excessReplicateMap.remove(node.getDatanodeUuid()); - } - } - } - - // Remove the replica from corruptReplicas + excessReplicas.remove(node, storedBlock); corruptReplicas.removeFromCorruptReplicasMap(storedBlock, node); } } @@ -3745,10 +3709,13 @@ private void countReplicasForStripedBlock(NumberReplicas counters, } } - private boolean isExcess(DatanodeDescriptor node, BlockInfo block) { - LightWeightHashSet blocksExcess = excessReplicateMap.get( - node.getDatanodeUuid()); - return blocksExcess != null && blocksExcess.contains(block); + @VisibleForTesting + int getExcessSize4Testing(String dnUuid) { + return excessReplicas.getSize4Testing(dnUuid); + } + + public boolean isExcess(DatanodeDescriptor dn, BlockInfo blk) { + return excessReplicas.contains(dn, blk); } /** @@ -4053,31 +4020,15 @@ public int numCorruptReplicas(Block block) { } public void removeBlockFromMap(BlockInfo block) { - removeFromExcessReplicateMap(block); + for(DatanodeStorageInfo info : blocksMap.getStorages(block)) { + excessReplicas.remove(info.getDatanodeDescriptor(), block); + } + blocksMap.removeBlock(block); // If block is removed from blocksMap remove it from corruptReplicasMap corruptReplicas.removeFromCorruptReplicasMap(block); } - /** - * If a block is removed from blocksMap, remove it from excessReplicateMap. - */ - private void removeFromExcessReplicateMap(BlockInfo block) { - for (DatanodeStorageInfo info : blocksMap.getStorages(block)) { - String uuid = info.getDatanodeDescriptor().getDatanodeUuid(); - LightWeightHashSet excessReplicas = - excessReplicateMap.get(uuid); - if (excessReplicas != null) { - if (excessReplicas.remove(block)) { - excessBlocksCount.decrementAndGet(); - if (excessReplicas.isEmpty()) { - excessReplicateMap.remove(uuid); - } - } - } - } - } - public int getCapacity() { return blocksMap.getCapacity(); } @@ -4270,7 +4221,7 @@ int computeDatanodeWork() { public void clearQueues() { neededReplications.clear(); pendingReplications.clear(); - excessReplicateMap.clear(); + excessReplicas.clear(); invalidateBlocks.clear(); datanodeManager.clearPendingQueues(); postponedMisreplicatedBlocks.clear(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessReplicaMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessReplicaMap.java new file mode 100644 index 00000000000..00aa902ae11 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ExcessReplicaMap.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.util.LightWeightHashSet; +import org.slf4j.Logger; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Maps a datnode to the set of excess replicas. + * + * This class is thread safe. + */ +class ExcessReplicaMap { + public static final Logger blockLog = NameNode.blockStateChangeLog; + + private final Map> map =new HashMap<>(); + private final AtomicLong size = new AtomicLong(0L); + + /** @return the number of replicas in this map. */ + long size() { + return size.get(); + } + + /** @return the number of replicas corresponding to the given datanode. */ + @VisibleForTesting + synchronized int getSize4Testing(String dnUuid) { + final LightWeightHashSet set = map.get(dnUuid); + return set == null? 0: set.size(); + } + + synchronized void clear() { + map.clear(); + size.set(0L); + } + + /** + * @return does this map contains a replica corresponding to the given + * datanode and the given block? + */ + synchronized boolean contains(DatanodeDescriptor dn, BlockInfo blk) { + final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + return set != null && set.contains(blk); + } + + /** + * Add the replica of the given block stored in the given datanode to the map. + * @return true if the block is added. + */ + synchronized boolean add(DatanodeDescriptor dn, BlockInfo blk) { + LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + if (set == null) { + set = new LightWeightHashSet<>(); + map.put(dn.getDatanodeUuid(), set); + } + final boolean added = set.add(blk); + if (added) { + size.incrementAndGet(); + blockLog.debug("BLOCK* ExcessReplicaMap.add({}, {})", dn, blk); + } + return added; + } + + /** + * Remove the replica corresponding to the given datanode and the given block. + * @return true if the block is removed. + */ + synchronized boolean remove(DatanodeDescriptor dn, BlockInfo blk) { + final LightWeightHashSet set = map.get(dn.getDatanodeUuid()); + if (set == null) { + return false; + } + + final boolean removed = set.remove(blk); + if (removed) { + size.decrementAndGet(); + blockLog.debug("BLOCK* ExcessReplicaMap.remove({}, {})", dn, blk); + + if (set.isEmpty()) { + map.remove(dn.getDatanodeUuid()); + } + } + return removed; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 41819965be9..647dd83933b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -73,7 +73,6 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.util.LightWeightHashSet; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NodeBase; @@ -570,8 +569,6 @@ private String getReplicaInfo(BlockInfo storedBlock) { storage.getStorageType())); } if (showReplicaDetails) { - LightWeightHashSet blocksExcess = - blockManager.excessReplicateMap.get(dnDesc.getDatanodeUuid()); Collection corruptReplicas = blockManager.getCorruptReplicas(storedBlock); sb.append("("); @@ -582,8 +579,7 @@ private String getReplicaInfo(BlockInfo storedBlock) { } else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) { sb.append("CORRUPT)"); - } else if (blocksExcess != null - && blocksExcess.contains(storedBlock)) { + } else if (blockManager.isExcess(dnDesc, storedBlock)) { sb.append("EXCESS)"); } else if (dnDesc.isStale(this.staleInterval)) { sb.append("STALE_NODE)"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java index 83d7305ac65..60587278061 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNodeCount.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertTrue; -import java.util.Collection; import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; @@ -105,10 +104,12 @@ public void testNodeCount() throws Exception { // find out a non-excess node DatanodeDescriptor nonExcessDN = null; + for(DatanodeStorageInfo storage : bm.blocksMap.getStorages(block.getLocalBlock())) { final DatanodeDescriptor dn = storage.getDatanodeDescriptor(); - Collection blocks = bm.excessReplicateMap.get(dn.getDatanodeUuid()); - if (blocks == null || !blocks.contains(block.getLocalBlock()) ) { + final BlockInfo info = new BlockInfoContiguous( + block.getLocalBlock(), (short)0); + if (!bm.isExcess(dn, info)) { nonExcessDN = dn; break; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java index 6f709404465..5b442cb799b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue; import java.io.File; -import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; @@ -153,6 +152,7 @@ public void testChooseReplicaToDelete() throws Exception { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); fs = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNamesystem(); + final BlockManager bm = namesystem.getBlockManager(); conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 300); cluster.startDataNodes(conf, 1, true, null, null, null); @@ -171,8 +171,7 @@ public void testChooseReplicaToDelete() throws Exception { long waitTime = DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT * 1000 * (DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT + 1); do { - nodeInfo = namesystem.getBlockManager().getDatanodeManager() - .getDatanode(dnReg); + nodeInfo = bm.getDatanodeManager().getDatanode(dnReg); lastHeartbeat = nodeInfo.getLastUpdateMonotonic(); } while (monotonicNow() - lastHeartbeat < waitTime); fs.setReplication(fileName, (short)3); @@ -183,10 +182,9 @@ public void testChooseReplicaToDelete() throws Exception { // All replicas for deletion should be scheduled on lastDN. // And should not actually be deleted, because lastDN does not heartbeat. namesystem.readLock(); - Collection dnBlocks = - namesystem.getBlockManager().excessReplicateMap.get(lastDNid); + final int dnBlocks = bm.getExcessSize4Testing(dnReg.getDatanodeUuid()); assertEquals("Replicas on node " + lastDNid + " should have been deleted", - SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks.size()); + SMALL_FILE_LENGTH / SMALL_BLOCK_SIZE, dnBlocks); namesystem.readUnlock(); for(BlockLocation location : locs) assertEquals("Block should still have 4 replicas", diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java index a9eeaac0f58..87f9adbe6ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java @@ -289,7 +289,7 @@ public void testExcessBlocks() throws Exception { fs.delete(file, true); rb = getMetrics(NS_METRICS); assertGauge("ExcessBlocks", 0L, rb); - assertTrue(bm.excessReplicateMap.isEmpty()); + assertEquals(0L, bm.getExcessBlocksCount()); } /** Test to ensure metrics reflects missing blocks */