HDFS-2273. Refactor BlockManager.recentInvalidateSets to a new class.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1160475 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-08-22 22:28:17 +00:00
parent fdef052d35
commit 513f17d115
3 changed files with 191 additions and 171 deletions

View File

@ -673,6 +673,9 @@ Trunk (unreleased changes)
HDFS-2096. Mavenization of hadoop-hdfs (Alejandro Abdelnur via tomwhite)
HDFS-2273. Refactor BlockManager.recentInvalidateSets to a new class.
(szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@ -89,7 +88,6 @@ public class BlockManager {
private volatile long underReplicatedBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
private volatile long excessBlocksCount = 0L;
private volatile long pendingDeletionBlocksCount = 0L;
/** Used by metrics */
public long getPendingReplicationBlocksCount() {
@ -109,7 +107,7 @@ public class BlockManager {
}
/** Used by metrics */
public long getPendingDeletionBlocksCount() {
return pendingDeletionBlocksCount;
return invalidateBlocks.numBlocks();
}
/** Used by metrics */
public long getExcessBlocksCount() {
@ -131,14 +129,8 @@ public class BlockManager {
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
//
// Keeps a Collection for every named machine containing
// blocks that have recently been invalidated and are thought to live
// on the machine in question.
// Mapping: StorageID -> ArrayList<Block>
//
private final Map<String, Collection<Block>> recentInvalidateSets =
new TreeMap<String, Collection<Block>>();
/** Blocks to be invalidated. */
private final InvalidateBlocks invalidateBlocks;
//
// Keeps a TreeSet for every named node. Each treeset contains
@ -182,6 +174,7 @@ public class BlockManager {
namesystem = fsn;
datanodeManager = new DatanodeManager(this, fsn, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
invalidateBlocks = new InvalidateBlocks(datanodeManager);
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
blockplacement = BlockPlacementPolicy.getInstance(
@ -306,7 +299,9 @@ public class BlockManager {
this.blockplacement = newpolicy;
}
/** Dump meta data to out. */
public void metaSave(PrintWriter out) {
assert namesystem.hasWriteLock();
//
// Dump contents of neededReplication
//
@ -357,7 +352,7 @@ public class BlockManager {
pendingReplications.metaSave(out);
// Dump blocks that are waiting to be deleted
dumpRecentInvalidateSets(out);
invalidateBlocks.dump(out);
// Dump all datanodes
getDatanodeManager().datanodeDump(out);
@ -493,7 +488,7 @@ public class BlockManager {
// remove this block from the list of pending blocks to be deleted.
for (DatanodeDescriptor dd : targets) {
String datanodeId = dd.getStorageID();
removeFromInvalidates(datanodeId, oldBlock);
invalidateBlocks.remove(datanodeId, oldBlock);
}
long fileLength = fileINode.computeContentSummary().getLength();
@ -510,7 +505,7 @@ public class BlockManager {
blocksMap.nodeIterator(block); it.hasNext();) {
String storageID = it.next().getStorageID();
// filter invalidate replicas
if( ! belongsToInvalidates(storageID, block)) {
if(!invalidateBlocks.contains(storageID, block)) {
machineSet.add(storageID);
}
}
@ -754,64 +749,15 @@ public class BlockManager {
}
node.resetBlocks();
removeFromInvalidates(node.getStorageID());
}
private void removeFromInvalidates(String storageID, Block block) {
synchronized(recentInvalidateSets) {
Collection<Block> v = recentInvalidateSets.get(storageID);
if (v != null && v.remove(block)) {
pendingDeletionBlocksCount--;
if (v.isEmpty()) {
recentInvalidateSets.remove(storageID);
}
}
}
}
boolean belongsToInvalidates(String storageID, Block block) {
Collection<Block> invalidateSet;
synchronized(recentInvalidateSets) {
invalidateSet = recentInvalidateSets.get(storageID);
return invalidateSet != null && invalidateSet.contains(block);
}
}
/**
* Adds block to list of blocks which will be invalidated on specified
* datanode
*
* @param b block
* @param dn datanode
* @param log true to create an entry in the log
*/
private void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
synchronized(recentInvalidateSets) {
Collection<Block> invalidateSet = recentInvalidateSets
.get(dn.getStorageID());
if (invalidateSet == null) {
invalidateSet = new HashSet<Block>();
recentInvalidateSets.put(dn.getStorageID(), invalidateSet);
}
if (invalidateSet.add(b)) {
pendingDeletionBlocksCount++;
if (log) {
NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
+ b + " to " + dn.getName());
}
}
}
invalidateBlocks.remove(node.getStorageID());
}
/**
* Adds block to list of blocks which will be invalidated on specified
* datanode and log the operation
*
* @param b block
* @param dn datanode
*/
void addToInvalidates(Block b, DatanodeInfo dn) {
addToInvalidates(b, dn, true);
void addToInvalidates(final Block block, final DatanodeInfo datanode) {
invalidateBlocks.add(block, datanode, true);
}
/**
@ -823,7 +769,7 @@ public class BlockManager {
for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it
.hasNext();) {
DatanodeDescriptor node = it.next();
addToInvalidates(b, node, false);
invalidateBlocks.add(b, node, false);
datanodes.append(node.getName()).append(" ");
}
if (datanodes.length() != 0) {
@ -832,30 +778,6 @@ public class BlockManager {
}
}
/**
* dumps the contents of recentInvalidateSets
*/
private void dumpRecentInvalidateSets(PrintWriter out) {
assert namesystem.hasWriteLock();
int size;
synchronized(recentInvalidateSets) {
size = recentInvalidateSets.values().size();
}
out.println("Metasave: Blocks " + pendingDeletionBlocksCount
+ " waiting deletion from " + size + " datanodes.");
if (size == 0) {
return;
}
synchronized(recentInvalidateSets) {
for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
Collection<Block> blocks = entry.getValue();
if (blocks.size() > 0) {
out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
}
}
}
}
/**
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
@ -962,35 +884,14 @@ public class BlockManager {
* @return total number of block for deletion
*/
int computeInvalidateWork(int nodesToProcess) {
int numOfNodes;
ArrayList<String> keyArray;
final List<String> nodes = invalidateBlocks.getStorageIDs();
Collections.shuffle(nodes);
synchronized(recentInvalidateSets) {
numOfNodes = recentInvalidateSets.size();
// get an array of the keys
keyArray = new ArrayList<String>(recentInvalidateSets.keySet());
}
nodesToProcess = Math.min(numOfNodes, nodesToProcess);
// randomly pick up <i>nodesToProcess</i> nodes
// and put them at [0, nodesToProcess)
int remainingNodes = numOfNodes - nodesToProcess;
if (nodesToProcess < remainingNodes) {
for(int i=0; i<nodesToProcess; i++) {
int keyIndex = DFSUtil.getRandom().nextInt(numOfNodes-i)+i;
Collections.swap(keyArray, keyIndex, i); // swap to front
}
} else {
for(int i=0; i<remainingNodes; i++) {
int keyIndex = DFSUtil.getRandom().nextInt(numOfNodes-i);
Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
}
}
nodesToProcess = Math.min(nodes.size(), nodesToProcess);
int blockCnt = 0;
for(int nodeCnt = 0; nodeCnt < nodesToProcess; nodeCnt++ ) {
blockCnt += invalidateWorkForOneNode(keyArray.get(nodeCnt));
blockCnt += invalidateWorkForOneNode(nodes.get(nodeCnt));
}
return blockCnt;
}
@ -1592,7 +1493,7 @@ public class BlockManager {
}
// Ignore replicas already scheduled to be removed from the DN
if(belongsToInvalidates(dn.getStorageID(), block)) {
if(invalidateBlocks.contains(dn.getStorageID(), block)) {
assert storedBlock.findDatanode(dn) < 0 : "Block " + block
+ " in recentInvalidatesSet should not appear in DN " + dn;
return storedBlock;
@ -2371,7 +2272,7 @@ public class BlockManager {
}
public int getActiveBlockCount() {
return blocksMap.size() - (int)pendingDeletionBlocksCount;
return blocksMap.size() - (int)invalidateBlocks.numBlocks();
}
public DatanodeDescriptor[] getNodes(BlockInfo block) {
@ -2441,16 +2342,6 @@ public class BlockManager {
return fileINode.getReplication();
}
/** Remove a datanode from the invalidatesSet */
private void removeFromInvalidates(String storageID) {
Collection<Block> blocks;
synchronized(recentInvalidateSets) {
blocks = recentInvalidateSets.remove(storageID);
}
if (blocks != null) {
pendingDeletionBlocksCount -= blocks.size();
}
}
/**
* Get blocks to invalidate for <i>nodeId</i>
@ -2466,49 +2357,7 @@ public class BlockManager {
return 0;
// get blocks to invalidate for the nodeId
assert nodeId != null;
final DatanodeDescriptor dn = datanodeManager.getDatanode(nodeId);
if (dn == null) {
removeFromInvalidates(nodeId);
return 0;
}
Collection<Block> invalidateSet;
ArrayList<Block> blocksToInvalidate;
synchronized(recentInvalidateSets) {
invalidateSet = recentInvalidateSets.get(nodeId);
if (invalidateSet == null)
return 0;
blocksToInvalidate = new ArrayList<Block>(
getDatanodeManager().blockInvalidateLimit);
// # blocks that can be sent in one message is limited
Iterator<Block> it = invalidateSet.iterator();
for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit
&& it.hasNext(); blkCount++) {
blocksToInvalidate.add(it.next());
it.remove();
}
// If we send everything in this message, remove this node entry
if (!it.hasNext()) {
removeFromInvalidates(nodeId);
}
dn.addBlocksToBeInvalidated(blocksToInvalidate);
if (NameNode.stateChangeLog.isInfoEnabled()) {
StringBuilder blockList = new StringBuilder();
for (Block blk : blocksToInvalidate) {
blockList.append(' ');
blockList.append(blk);
}
NameNode.stateChangeLog.info("BLOCK* ask " + dn.getName()
+ " to delete " + blockList);
}
pendingDeletionBlocksCount -= blocksToInvalidate.size();
return blocksToInvalidate.size();
}
return invalidateBlocks.invalidateWork(nodeId);
} finally {
namesystem.writeUnlock();
}

View File

@ -0,0 +1,168 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
/**
* Keeps a Collection for every named machine containing blocks
* that have recently been invalidated and are thought to live
* on the machine in question.
*/
@InterfaceAudience.Private
class InvalidateBlocks {
/** Mapping: StorageID -> Collection of Blocks */
private final Map<String, Collection<Block>> node2blocks =
new TreeMap<String, Collection<Block>>();
/** The total number of blocks in the map. */
private long numBlocks = 0L;
private final DatanodeManager datanodeManager;
InvalidateBlocks(final DatanodeManager datanodeManager) {
this.datanodeManager = datanodeManager;
}
/** @return the number of blocks to be invalidated . */
synchronized long numBlocks() {
return numBlocks;
}
/** Does this contain the block which is associated with the storage? */
synchronized boolean contains(final String storageID, final Block block) {
final Collection<Block> s = node2blocks.get(storageID);
return s != null && s.contains(block);
}
/**
* Add a block to the block collection
* which will be invalidated on the specified datanode.
*/
synchronized void add(final Block block, final DatanodeInfo datanode,
final boolean log) {
Collection<Block> set = node2blocks.get(datanode.getStorageID());
if (set == null) {
set = new HashSet<Block>();
node2blocks.put(datanode.getStorageID(), set);
}
if (set.add(block)) {
numBlocks++;
if (log) {
NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+ ": add " + block + " to " + datanode.getName());
}
}
}
/** Remove a storage from the invalidatesSet */
synchronized void remove(final String storageID) {
final Collection<Block> blocks = node2blocks.remove(storageID);
if (blocks != null) {
numBlocks -= blocks.size();
}
}
/** Remove the block from the specified storage. */
synchronized void remove(final String storageID, final Block block) {
final Collection<Block> v = node2blocks.get(storageID);
if (v != null && v.remove(block)) {
numBlocks--;
if (v.isEmpty()) {
node2blocks.remove(storageID);
}
}
}
/** Print the contents to out. */
synchronized void dump(final PrintWriter out) {
final int size = node2blocks.values().size();
out.println("Metasave: Blocks " + numBlocks
+ " waiting deletion from " + size + " datanodes.");
if (size == 0) {
return;
}
for(Map.Entry<String,Collection<Block>> entry : node2blocks.entrySet()) {
final Collection<Block> blocks = entry.getValue();
if (blocks.size() > 0) {
out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
}
}
}
/** @return a list of the storage IDs. */
synchronized List<String> getStorageIDs() {
return new ArrayList<String>(node2blocks.keySet());
}
/** Invalidate work for the storage. */
int invalidateWork(final String storageId) {
final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId);
if (dn == null) {
remove(storageId);
return 0;
}
final List<Block> toInvalidate = invalidateWork(storageId, dn);
if (toInvalidate == null) {
return 0;
}
if (NameNode.stateChangeLog.isInfoEnabled()) {
NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName()
+ ": ask " + dn.getName() + " to delete " + toInvalidate);
}
return toInvalidate.size();
}
private synchronized List<Block> invalidateWork(
final String storageId, final DatanodeDescriptor dn) {
final Collection<Block> set = node2blocks.get(storageId);
if (set == null) {
return null;
}
// # blocks that can be sent in one message is limited
final int limit = datanodeManager.blockInvalidateLimit;
final List<Block> toInvalidate = new ArrayList<Block>(limit);
final Iterator<Block> it = set.iterator();
for(int count = 0; count < limit && it.hasNext(); count++) {
toInvalidate.add(it.next());
it.remove();
}
// If we send everything in this message, remove this node entry
if (!it.hasNext()) {
remove(storageId);
}
dn.addBlocksToBeInvalidated(toInvalidate);
numBlocks -= toInvalidate.size();
return toInvalidate;
}
}