HDFS-2191. Move datanodeMap from FSNamesystem to DatanodeManager.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1151339 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-07-27 05:46:52 +00:00
parent 438c32aaf9
commit 969a263188
12 changed files with 426 additions and 382 deletions

View File

@ -604,6 +604,9 @@ Trunk (unreleased changes)
HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp
implementations. (Ivan Kelly via todd)
HDFS-2191. Move datanodeMap from FSNamesystem to DatanodeManager.
(szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -42,9 +42,11 @@
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
@ -53,6 +55,8 @@
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon;
@ -156,7 +160,7 @@ public long getExcessBlocksCount() {
public final int defaultReplication;
/** The maximum number of entries returned by getCorruptInodes() */
final int maxCorruptFilesReturned;
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
@ -294,15 +298,14 @@ public void metaSave(PrintWriter out) {
}
}
//
// Dump blocks from pendingReplication
//
pendingReplications.metaSave(out);
//
// Dump blocks that are waiting to be deleted
//
dumpRecentInvalidateSets(out);
// Dump all datanodes
getDatanodeManager().datanodeDump(out);
}
/**
@ -453,7 +456,7 @@ public LocatedBlock convertLastBlockToUnderConstruction(
/**
* Get all valid locations of the block
*/
public ArrayList<String> getValidLocations(Block block) {
private List<String> getValidLocations(Block block) {
ArrayList<String> machineSet =
new ArrayList<String>(blocksMap.numNodes(block));
for(Iterator<DatanodeDescriptor> it =
@ -562,6 +565,49 @@ public void verifyReplication(String src,
minReplication);
}
/** Get all blocks with location information from a datanode. */
public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
final long size) throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
if (node == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+ "Asking for blocks from an unrecorded node " + datanode.getName());
throw new HadoopIllegalArgumentException(
"Datanode " + datanode.getName() + " not found.");
}
int numBlocks = node.numBlocks();
if(numBlocks == 0) {
return new BlocksWithLocations(new BlockWithLocations[0]);
}
Iterator<BlockInfo> iter = node.getBlockIterator();
int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
// skip blocks
for(int i=0; i<startBlock; i++) {
iter.next();
}
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0;
BlockInfo curBlock;
while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
totalSize += addBlock(curBlock, results);
}
if(totalSize<size) {
iter = node.getBlockIterator(); // start from the beginning
for(int i=0; i<startBlock&&totalSize<size; i++) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
totalSize += addBlock(curBlock, results);
}
}
return new BlocksWithLocations(
results.toArray(new BlockWithLocations[results.size()]));
}
/** Remove a datanode. */
public void removeDatanode(final DatanodeDescriptor node) {
final Iterator<? extends Block> it = node.getBlockIterator();
@ -660,7 +706,7 @@ private void dumpRecentInvalidateSets(PrintWriter out) {
for(Map.Entry<String,Collection<Block>> entry : recentInvalidateSets.entrySet()) {
Collection<Block> blocks = entry.getValue();
if (blocks.size() > 0) {
out.println(namesystem.getDatanode(entry.getKey()).getName() + blocks);
out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
}
}
}
@ -684,7 +730,7 @@ public void findAndMarkBlockAsCorrupt(Block blk,
private void markBlockAsCorrupt(BlockInfo storedBlock,
DatanodeInfo dn) throws IOException {
assert storedBlock != null : "storedBlock should not be null";
DatanodeDescriptor node = namesystem.getDatanode(dn);
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot mark block " +
storedBlock.getBlockName() +
@ -723,7 +769,7 @@ private void invalidateBlock(Block blk, DatanodeInfo dn)
throws IOException {
NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
+ blk + " on " + dn.getName());
DatanodeDescriptor node = namesystem.getDatanode(dn);
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot invalidate block " + blk +
" because datanode " + dn.getName() +
@ -748,7 +794,7 @@ private void invalidateBlock(Block blk, DatanodeInfo dn)
}
}
public void updateState() {
void updateState() {
pendingReplicationBlocksCount = pendingReplications.size();
underReplicatedBlocksCount = neededReplications.size();
corruptReplicaBlocksCount = corruptReplicas.size();
@ -1134,7 +1180,7 @@ else if (excessBlocks != null && excessBlocks.contains(block)) {
* If there were any replication requests that timed out, reap them
* and put them back into the neededReplication queue
*/
public void processPendingReplications() {
private void processPendingReplications() {
Block[] timedOutItems = pendingReplications.getTimedOutBlocks();
if (timedOutItems != null) {
namesystem.writeLock();
@ -1700,6 +1746,7 @@ public void processOverReplicatedBlock(Block block, short replication,
addedNode, delNodeHint, blockplacement);
}
public void addToExcessReplicate(DatanodeInfo dn, Block block) {
assert namesystem.hasWriteLock();
Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
@ -1773,6 +1820,21 @@ private void removeStoredBlock(Block block, DatanodeDescriptor node) {
}
}
/**
* Get all valid locations of the block & add the block to results
* return the length of the added block; 0 if the block is not added
*/
private long addBlock(Block block, List<BlockWithLocations> results) {
final List<String> machineSet = getValidLocations(block);
if(machineSet.size() == 0) {
return 0;
} else {
results.add(new BlockWithLocations(block,
machineSet.toArray(new String[machineSet.size()])));
return block.getNumBytes();
}
}
/**
* The given node is reporting that it received a certain block.
*/
@ -1784,7 +1846,7 @@ public void addBlock(DatanodeDescriptor node, Block block, String delHint)
// get the deletion hint node
DatanodeDescriptor delHintNode = null;
if (delHint != null && delHint.length() != 0) {
delHintNode = namesystem.getDatanode(delHint);
delHintNode = datanodeManager.getDatanode(delHint);
if (delHintNode == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
+ block + " is expected to be removed from an unrecorded node "
@ -2071,7 +2133,7 @@ private int invalidateWorkForOneNode(String nodeId) {
return 0;
// get blocks to invalidate for the nodeId
assert nodeId != null;
DatanodeDescriptor dn = namesystem.getDatanode(nodeId);
final DatanodeDescriptor dn = datanodeManager.getDatanode(nodeId);
if (dn == null) {
removeFromInvalidates(nodeId);
return 0;
@ -2082,11 +2144,11 @@ private int invalidateWorkForOneNode(String nodeId) {
return 0;
ArrayList<Block> blocksToInvalidate = new ArrayList<Block>(
namesystem.blockInvalidateLimit);
getDatanodeManager().blockInvalidateLimit);
// # blocks that can be sent in one message is limited
Iterator<Block> it = invalidateSet.iterator();
for (int blkCount = 0; blkCount < namesystem.blockInvalidateLimit
for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit
&& it.hasNext(); blkCount++) {
blocksToInvalidate.add(it.next());
it.remove();

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
@ -25,7 +26,9 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,12 +38,22 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.ipc.Server;
@ -48,6 +61,7 @@
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.util.CyclicIteration;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.ReflectionUtils;
@ -62,6 +76,30 @@ public class DatanodeManager {
final FSNamesystem namesystem;
/**
* Stores the datanode -> block map.
* <p>
* Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
* storage id. In order to keep the storage map consistent it tracks
* all storages ever registered with the namenode.
* A descriptor corresponding to a specific storage id can be
* <ul>
* <li>added to the map if it is a new storage id;</li>
* <li>updated with a new datanode started as a replacement for the old one
* with the same storage id; and </li>
* <li>removed if and only if an existing datanode is restarted to serve a
* different storage id.</li>
* </ul> <br>
* The list of the {@link DatanodeDescriptor}s in the map is checkpointed
* in the namespace image file. Only the {@link DatanodeInfo} part is
* persistent, the list of blocks is restored from the datanode block
* reports.
* <p>
* Mapping: StorageID -> DatanodeDescriptor
*/
private final NavigableMap<String, DatanodeDescriptor> datanodeMap
= new TreeMap<String, DatanodeDescriptor>();
/** Cluster network topology */
private final NetworkTopology networktopology = new NetworkTopology();
@ -71,7 +109,12 @@ public class DatanodeManager {
private final DNSToSwitchMapping dnsToSwitchMapping;
/** Read include/exclude files*/
private final HostsFileReader hostsReader;
private final HostsFileReader hostsReader;
/** The period to wait for datanode heartbeat.*/
private final long heartbeatExpireInterval;
/** Ask Datanode only up to this many blocks to delete. */
final int blockInvalidateLimit;
DatanodeManager(final FSNamesystem namesystem, final Configuration conf
) throws IOException {
@ -90,6 +133,19 @@ public class DatanodeManager {
if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) {
dnsToSwitchMapping.resolve(new ArrayList<String>(hostsReader.getHosts()));
}
final long heartbeatIntervalSeconds = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
final int heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval
+ 10 * 1000 * heartbeatIntervalSeconds;
this.blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds),
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY
+ "=" + this.blockInvalidateLimit);
}
private Daemon decommissionthread = null;
@ -124,20 +180,88 @@ public void sortLocatedBlocks(final String targethost,
Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR);
}
}
CyclicIteration<String, DatanodeDescriptor> getDatanodeCyclicIteration(
final String firstkey) {
return new CyclicIteration<String, DatanodeDescriptor>(
datanodeMap, firstkey);
}
/** @return the datanode descriptor for the host. */
public DatanodeDescriptor getDatanodeByHost(final String host) {
return host2DatanodeMap.getDatanodeByHost(host);
}
/** Get a datanode descriptor given corresponding storageID */
DatanodeDescriptor getDatanode(final String storageID) {
return datanodeMap.get(storageID);
}
/**
* Get data node by storage ID.
*
* @param nodeID
* @return DatanodeDescriptor or null if the node is not found.
* @throws UnregisteredNodeException
*/
public DatanodeDescriptor getDatanode(DatanodeID nodeID
) throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
if (node == null)
return null;
if (!node.getName().equals(nodeID.getName())) {
final UnregisteredNodeException e = new UnregisteredNodeException(
nodeID, node);
NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
+ e.getLocalizedMessage());
throw e;
}
return node;
}
/** Prints information about all datanodes. */
void datanodeDump(final PrintWriter out) {
synchronized (datanodeMap) {
out.println("Metasave: Number of datanodes: " + datanodeMap.size());
for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
out.println(node.dumpDatanode());
}
}
}
/** Remove a dead datanode. */
public void removeDeadDatanode(final DatanodeID nodeID) {
synchronized(namesystem.heartbeats) {
synchronized(datanodeMap) {
DatanodeDescriptor d;
try {
d = getDatanode(nodeID);
} catch(IOException e) {
d = null;
}
if (d != null && isDatanodeDead(d)) {
NameNode.stateChangeLog.info(
"BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName());
namesystem.removeDatanode(d);
}
}
}
}
/** Is the datanode dead? */
public boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() <
(Util.now() - heartbeatExpireInterval));
}
/** Add a datanode. */
private void addDatanode(final DatanodeDescriptor node) {
// To keep host2DatanodeMap consistent with datanodeMap,
// remove from host2DatanodeMap the datanodeDescriptor removed
// from datanodeMap before adding node to host2DatanodeMap.
synchronized (namesystem.datanodeMap) {
host2DatanodeMap.remove(
namesystem.datanodeMap.put(node.getStorageID(), node));
synchronized(datanodeMap) {
host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
}
host2DatanodeMap.add(node);
@ -152,8 +276,8 @@ private void addDatanode(final DatanodeDescriptor node) {
/** Physically remove node from datanodeMap. */
private void wipeDatanode(final DatanodeID node) throws IOException {
final String key = node.getStorageID();
synchronized (namesystem.datanodeMap) {
host2DatanodeMap.remove(namesystem.datanodeMap.remove(key));
synchronized (datanodeMap) {
host2DatanodeMap.remove(datanodeMap.remove(key));
}
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
@ -315,7 +439,7 @@ private String newStorageID() {
String newID = null;
while(newID == null) {
newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
if (namesystem.datanodeMap.get(newID) != null)
if (datanodeMap.get(newID) != null)
newID = null;
}
return newID;
@ -350,7 +474,7 @@ public void registerDatanode(DatanodeRegistration nodeReg
+ "node registration from " + nodeReg.getName()
+ " storage " + nodeReg.getStorageID());
DatanodeDescriptor nodeS = namesystem.datanodeMap.get(nodeReg.getStorageID());
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName());
if (nodeN != null && nodeN != nodeS) {
@ -461,7 +585,7 @@ public void refreshHostsReader(Configuration conf) throws IOException {
* 4. Removed from exclude --> stop decommission.
*/
public void refreshDatanodes() throws IOException {
for(DatanodeDescriptor node : namesystem.datanodeMap.values()) {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node, null)) {
node.setDisallowed(true); // case 2.
@ -475,6 +599,45 @@ public void refreshDatanodes() throws IOException {
}
}
/** @return the number of live datanodes. */
public int getNumLiveDataNodes() {
int numLive = 0;
synchronized (datanodeMap) {
for(DatanodeDescriptor dn : datanodeMap.values()) {
if (!isDatanodeDead(dn) ) {
numLive++;
}
}
}
return numLive;
}
/** @return the number of dead datanodes. */
public int getNumDeadDataNodes() {
int numDead = 0;
synchronized (datanodeMap) {
for(DatanodeDescriptor dn : datanodeMap.values()) {
if (isDatanodeDead(dn) ) {
numDead++;
}
}
}
return numDead;
}
/** Fetch live and dead datanodes. */
public void fetchDatanodess(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead) {
final List<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
for(DatanodeDescriptor node : results) {
if (isDatanodeDead(node))
dead.add(node);
else
live.add(node);
}
}
/** For generating datanode reports */
public List<DatanodeDescriptor> getDatanodeListForReport(
final DatanodeReportType type) {
@ -499,13 +662,13 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
ArrayList<DatanodeDescriptor> nodes = null;
synchronized (namesystem.datanodeMap) {
nodes = new ArrayList<DatanodeDescriptor>(namesystem.datanodeMap.size() +
synchronized(datanodeMap) {
nodes = new ArrayList<DatanodeDescriptor>(datanodeMap.size() +
mustList.size());
Iterator<DatanodeDescriptor> it = namesystem.datanodeMap.values().iterator();
Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
while (it.hasNext()) {
DatanodeDescriptor dn = it.next();
boolean isDead = namesystem.isDatanodeDead(dn);
final boolean isDead = isDatanodeDead(dn);
if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) {
nodes.add(dn);
}
@ -537,4 +700,77 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
}
return nodes;
}
private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
node.setLastUpdate(0);
}
/** Handle heartbeat from datanodes. */
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
final String blockPoolId,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int maxTransfers, int failedVolumes
) throws IOException {
synchronized (namesystem.heartbeats) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
if (nodeinfo == null || !nodeinfo.isAlive) {
return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
namesystem.updateStats(nodeinfo, false);
nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, failedVolumes);
namesystem.updateStats(nodeinfo, true);
//check lease recovery
BlockInfoUnderConstruction[] blocks = nodeinfo
.getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks != null) {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
for (BlockInfoUnderConstruction b : blocks) {
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
.getBlockRecoveryId()));
}
return new DatanodeCommand[] { brCommand };
}
final List<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
//check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
maxTransfers);
if (pendingList != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList));
}
//check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
namesystem.addKeyUpdateCommand(cmds, nodeinfo);
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
}
}
return null;
}
}

View File

@ -83,8 +83,8 @@ public void run() {
private void check() {
int count = 0;
for(Map.Entry<String, DatanodeDescriptor> entry
: new CyclicIteration<String, DatanodeDescriptor>(
fsnamesystem.datanodeMap, firstkey)) {
: blockManager.getDatanodeManager().getDatanodeCyclicIteration(
firstkey)) {
final DatanodeDescriptor d = entry.getValue();
firstkey = entry.getKey();

View File

@ -237,35 +237,9 @@ private static final void logAuditEvent(UserGroupInformation ugi,
// Block pool ID used by this namenode
String blockPoolId;
/**
* Stores the datanode -> block map.
* <p>
* Done by storing a set of {@link DatanodeDescriptor} objects, sorted by
* storage id. In order to keep the storage map consistent it tracks
* all storages ever registered with the namenode.
* A descriptor corresponding to a specific storage id can be
* <ul>
* <li>added to the map if it is a new storage id;</li>
* <li>updated with a new datanode started as a replacement for the old one
* with the same storage id; and </li>
* <li>removed if and only if an existing datanode is restarted to serve a
* different storage id.</li>
* </ul> <br>
* The list of the {@link DatanodeDescriptor}s in the map is checkpointed
* in the namespace image file. Only the {@link DatanodeInfo} part is
* persistent, the list of blocks is restored from the datanode block
* reports.
* <p>
* Mapping: StorageID -> DatanodeDescriptor
*/
public final NavigableMap<String, DatanodeDescriptor> datanodeMap =
new TreeMap<String, DatanodeDescriptor>();
/**
* Stores a set of DatanodeDescriptor objects.
* This is a subset of {@link #datanodeMap}, containing nodes that are
* considered alive.
* Stores a subset of datanodeMap, containing nodes that are considered alive.
* The HeartbeatMonitor periodically checks for out-dated entries,
* and removes them from the list.
*/
@ -289,9 +263,6 @@ private static final void logAuditEvent(UserGroupInformation ugi,
// heartbeatRecheckInterval is how often namenode checks for expired datanodes
private long heartbeatRecheckInterval;
// heartbeatExpireInterval is how long namenode waits for datanode to report
// heartbeat
private long heartbeatExpireInterval;
//resourceRecheckInterval is how often namenode checks for the disk space availability
private long resourceRecheckInterval;
@ -314,9 +285,6 @@ private static final void logAuditEvent(UserGroupInformation ugi,
*/
private final GenerationStamp generationStamp = new GenerationStamp();
// Ask Datanode only up to this many blocks to delete.
public int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT;
// precision of access times.
private long accessTimePrecision = 0;
@ -513,14 +481,9 @@ private void setConfigurationParameters(Configuration conf)
this.defaultPermission = PermissionStatus.createImmutable(
fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission));
long heartbeatInterval = conf.getLong(
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000;
this.heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
10 * heartbeatInterval;
this.serverDefaults = new FsServerDefaults(
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
@ -531,14 +494,6 @@ private void setConfigurationParameters(Configuration conf)
this.maxFsObjects = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
//default limit
this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit,
20*(int)(heartbeatInterval/1000));
//use conf value if it is set.
this.blockInvalidateLimit = conf.getInt(
DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, this.blockInvalidateLimit);
LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" + this.blockInvalidateLimit);
this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
@ -642,12 +597,7 @@ void metaSave(String filename) throws IOException {
out.println("Live Datanodes: "+live.size());
out.println("Dead Datanodes: "+dead.size());
blockManager.metaSave(out);
//
// Dump all datanodes
//
datanodeDump(out);
out.flush();
out.close();
} finally {
@ -688,45 +638,7 @@ BlocksWithLocations getBlocks(DatanodeID datanode, long size)
readLock();
try {
checkSuperuserPrivilege();
DatanodeDescriptor node = getDatanode(datanode);
if (node == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
+ "Asking for blocks from an unrecorded node " + datanode.getName());
throw new IllegalArgumentException(
"Unexpected exception. Got getBlocks message for datanode " +
datanode.getName() + ", but there is no info for it");
}
int numBlocks = node.numBlocks();
if(numBlocks == 0) {
return new BlocksWithLocations(new BlockWithLocations[0]);
}
Iterator<BlockInfo> iter = node.getBlockIterator();
int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
// skip blocks
for(int i=0; i<startBlock; i++) {
iter.next();
}
List<BlockWithLocations> results = new ArrayList<BlockWithLocations>();
long totalSize = 0;
BlockInfo curBlock;
while(totalSize<size && iter.hasNext()) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
totalSize += addBlock(curBlock, results);
}
if(totalSize<size) {
iter = node.getBlockIterator(); // start from the beginning
for(int i=0; i<startBlock&&totalSize<size; i++) {
curBlock = iter.next();
if(!curBlock.isComplete()) continue;
totalSize += addBlock(curBlock, results);
}
}
return new BlocksWithLocations(
results.toArray(new BlockWithLocations[results.size()]));
return blockManager.getBlocksWithLocations(datanode, size);
} finally {
readUnlock();
}
@ -742,22 +654,6 @@ public ExportedBlockKeys getBlockKeys() {
: ExportedBlockKeys.DUMMY_KEYS;
}
/**
* Get all valid locations of the block & add the block to results
* return the length of the added block; 0 if the block is not added
*/
private long addBlock(Block block, List<BlockWithLocations> results) {
assert hasReadOrWriteLock();
ArrayList<String> machineSet = blockManager.getValidLocations(block);
if(machineSet.size() == 0) {
return 0;
} else {
results.add(new BlockWithLocations(block,
machineSet.toArray(new String[machineSet.size()])));
return block.getNumBytes();
}
}
/////////////////////////////////////////////////////////
//
// These methods are called by HadoopFS clients
@ -1795,7 +1691,8 @@ LocatedBlock getAdditionalDatanode(final String src, final ExtendedBlock blk,
//find datanode descriptors
chosen = new ArrayList<DatanodeDescriptor>();
for(DatanodeInfo d : existings) {
final DatanodeDescriptor descriptor = getDatanode(d);
final DatanodeDescriptor descriptor = blockManager.getDatanodeManager(
).getDatanode(d);
if (descriptor != null) {
chosen.add(descriptor);
}
@ -2622,7 +2519,8 @@ void commitBlockSynchronization(ExtendedBlock lastblock,
if (newtargets.length > 0) {
descriptors = new DatanodeDescriptor[newtargets.length];
for(int i = 0; i < newtargets.length; i++) {
descriptors[i] = getDatanode(newtargets[i]);
descriptors[i] = blockManager.getDatanodeManager().getDatanode(
newtargets[i]);
}
}
if (closeFile) {
@ -2766,15 +2664,6 @@ public String getRegistrationID() {
return Storage.getRegistrationID(dir.fsImage.getStorage());
}
public boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() <
(now() - heartbeatExpireInterval));
}
private void setDatanodeDead(DatanodeDescriptor node) throws IOException {
node.setLastUpdate(0);
}
/**
* The given node has reported in. This method should:
* 1) Record the heartbeat, so the datanode isn't timed out
@ -2792,91 +2681,32 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
throws IOException {
readLock();
try {
return handleHeartbeatInternal(nodeReg, capacity, dfsUsed,
remaining, blockPoolUsed, xceiverCount, xmitsInProgress,
failedVolumes);
final int maxTransfer = blockManager.maxReplicationStreams - xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, maxTransfer, failedVolumes);
if (cmds != null) {
return cmds;
}
//check distributed upgrade
DatanodeCommand cmd = getDistributedUpgradeCommand();
if (cmd != null) {
return new DatanodeCommand[] {cmd};
}
return null;
} finally {
readUnlock();
}
}
/** @see #handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int, int) */
DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes)
throws IOException {
assert hasReadLock();
DatanodeCommand cmd = null;
synchronized (heartbeats) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
try {
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
if (nodeinfo != null && nodeinfo.isDisallowed()) {
setDatanodeDead(nodeinfo);
throw new DisallowedDatanodeException(nodeinfo);
}
if (nodeinfo == null || !nodeinfo.isAlive) {
return new DatanodeCommand[]{DatanodeCommand.REGISTER};
}
updateStats(nodeinfo, false);
nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
xceiverCount, failedVolumes);
updateStats(nodeinfo, true);
//check lease recovery
BlockInfoUnderConstruction[] blocks = nodeinfo
.getLeaseRecoveryCommand(Integer.MAX_VALUE);
if (blocks != null) {
BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
blocks.length);
for (BlockInfoUnderConstruction b : blocks) {
brCommand.add(new RecoveringBlock(
new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b
.getBlockRecoveryId()));
}
return new DatanodeCommand[] { brCommand };
}
ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(3);
//check pending replication
List<BlockTargetPair> pendingList = nodeinfo.getReplicationCommand(
blockManager.maxReplicationStreams - xmitsInProgress);
if (pendingList != null) {
cmd = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId,
pendingList);
cmds.add(cmd);
}
//check block invalidation
Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
if (blks != null) {
cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks);
cmds.add(cmd);
}
// check access key update
if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
nodeinfo.needKeyUpdate = false;
}
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
}
public void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
final DatanodeDescriptor nodeinfo) {
// check access key update
if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
nodeinfo.needKeyUpdate = false;
}
//check distributed upgrade
cmd = getDistributedUpgradeCommand();
if (cmd != null) {
return new DatanodeCommand[] {cmd};
}
return null;
}
public void updateStats(DatanodeDescriptor node, boolean isAdded) {
@ -3017,7 +2847,8 @@ public void removeDatanode(final DatanodeID nodeID
) throws UnregisteredNodeException {
writeLock();
try {
DatanodeDescriptor nodeInfo = getDatanode(nodeID);
DatanodeDescriptor nodeInfo = getBlockManager().getDatanodeManager(
).getDatanode(nodeID);
if (nodeInfo != null) {
removeDatanode(nodeInfo);
} else {
@ -3033,7 +2864,7 @@ public void removeDatanode(final DatanodeID nodeID
* Remove a datanode descriptor.
* @param nodeInfo datanode descriptor.
*/
private void removeDatanode(DatanodeDescriptor nodeInfo) {
public void removeDatanode(DatanodeDescriptor nodeInfo) {
assert hasWriteLock();
synchronized (heartbeats) {
if (nodeInfo.isAlive) {
@ -3064,6 +2895,7 @@ FSEditLog getEditLog() {
* effect causes more datanodes to be declared dead.
*/
void heartbeatCheck() {
final DatanodeManager datanodeManager = getBlockManager().getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode.
if (isInSafeMode()) {
@ -3079,7 +2911,7 @@ void heartbeatCheck() {
for (Iterator<DatanodeDescriptor> it = heartbeats.iterator();
it.hasNext();) {
DatanodeDescriptor nodeInfo = it.next();
if (isDatanodeDead(nodeInfo)) {
if (datanodeManager.isDatanodeDead(nodeInfo)) {
expiredHeartbeats.incr();
foundDead = true;
nodeID = nodeInfo;
@ -3095,21 +2927,7 @@ void heartbeatCheck() {
return;
}
try {
synchronized(heartbeats) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeInfo = null;
try {
nodeInfo = getDatanode(nodeID);
} catch (IOException e) {
nodeInfo = null;
}
if (nodeInfo != null && isDatanodeDead(nodeInfo)) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: "
+ "lost heartbeat from " + nodeInfo.getName());
removeDatanode(nodeInfo);
}
}
}
datanodeManager.removeDeadDatanode(nodeID);
} finally {
writeUnlock();
}
@ -3129,7 +2947,8 @@ public void processReport(DatanodeID nodeID, String poolId,
writeLock();
startTime = now(); //after acquiring write lock
try {
DatanodeDescriptor node = getDatanode(nodeID);
final DatanodeDescriptor node = blockManager.getDatanodeManager(
).getDatanode(nodeID);
if (node == null || !node.isAlive) {
throw new IOException("ProcessReport from dead or unregistered node: "
+ nodeID.getName());
@ -3269,7 +3088,8 @@ public void blockReceived(DatanodeID nodeID,
) throws IOException {
writeLock();
try {
DatanodeDescriptor node = getDatanode(nodeID);
final DatanodeDescriptor node = blockManager.getDatanodeManager(
).getDatanode(nodeID);
if (node == null || !node.isAlive) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
+ " is received from dead or unregistered node " + nodeID.getName());
@ -3475,33 +3295,7 @@ public void DFSNodesStatus(ArrayList<DatanodeDescriptor> live,
ArrayList<DatanodeDescriptor> dead) {
readLock();
try {
final List<DatanodeDescriptor> results = getBlockManager(
).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.ALL);
for(Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (isDatanodeDead(node))
dead.add(node);
else
live.add(node);
}
} finally {
readUnlock();
}
}
/**
* Prints information about all datanodes.
*/
private void datanodeDump(PrintWriter out) {
readLock();
try {
synchronized (datanodeMap) {
out.println("Metasave: Number of datanodes: " + datanodeMap.size());
for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
out.println(node.dumpDatanode());
}
}
getBlockManager().getDatanodeManager().fetchDatanodess(live, dead);
} finally {
readUnlock();
}
@ -3556,30 +3350,6 @@ void finalizeUpgrade() throws IOException {
checkSuperuserPrivilege();
getFSImage().finalizeUpgrade();
}
/**
* Get data node by storage ID.
*
* @param nodeID
* @return DatanodeDescriptor or null if the node is not found.
* @throws IOException
*/
public DatanodeDescriptor getDatanode(DatanodeID nodeID
) throws UnregisteredNodeException {
assert hasReadOrWriteLock();
UnregisteredNodeException e = null;
DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID());
if (node == null)
return null;
if (!node.getName().equals(nodeID.getName())) {
e = new UnregisteredNodeException(nodeID, node);
NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: "
+ e.getLocalizedMessage());
throw e;
}
return node;
}
/**
* SafeModeInfo contains information related to the safe mode.
@ -4503,43 +4273,14 @@ public void shutdown() {
}
/**
* Number of live data nodes
* @return Number of live data nodes
*/
@Override // FSNamesystemMBean
public int getNumLiveDataNodes() {
int numLive = 0;
synchronized (datanodeMap) {
for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
it.hasNext();) {
DatanodeDescriptor dn = it.next();
if (!isDatanodeDead(dn) ) {
numLive++;
}
}
}
return numLive;
return getBlockManager().getDatanodeManager().getNumLiveDataNodes();
}
/**
* Number of dead data nodes
* @return Number of dead data nodes
*/
@Override // FSNamesystemMBean
public int getNumDeadDataNodes() {
int numDead = 0;
synchronized (datanodeMap) {
for(Iterator<DatanodeDescriptor> it = datanodeMap.values().iterator();
it.hasNext();) {
DatanodeDescriptor dn = it.next();
if (isDatanodeDead(dn) ) {
numDead++;
}
}
}
return numDead;
return getBlockManager().getDatanodeManager().getNumDeadDataNodes();
}
/**
@ -4699,11 +4440,12 @@ void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
blockinfo.setNumBytes(newBlock.getNumBytes());
// find the DatanodeDescriptor objects
final DatanodeManager dm = getBlockManager().getDatanodeManager();
DatanodeDescriptor[] descriptors = null;
if (newNodes.length > 0) {
descriptors = new DatanodeDescriptor[newNodes.length];
for(int i = 0; i < newNodes.length; i++) {
descriptors[i] = getDatanode(newNodes[i]);
descriptors[i] = dm.getDatanode(newNodes[i]);
}
}
blockinfo.setExpectedLocations(descriptors);
@ -4832,12 +4574,6 @@ public int numCorruptReplicas(Block blk) {
return blockManager.numCorruptReplicas(blk);
}
/** Get a datanode descriptor given corresponding storageID */
public DatanodeDescriptor getDatanode(String nodeID) {
assert hasReadOrWriteLock();
return datanodeMap.get(nodeID);
}
/**
* Return a range of corrupt replica block ids. Up to numExpectedBlocks
* blocks starting at the next block after startingBlockId are returned

View File

@ -226,7 +226,7 @@ private DatanodeInfo decommissionNode(int nnIndex,
writeConfigFile(excludeFile, nodes);
cluster.getNamesystem(nnIndex).refreshNodes(conf);
DatanodeInfo ret = NameNodeAdapter.getDatanode(
cluster.getNameNode(nnIndex), info[index]);
cluster.getNamesystem(nnIndex), info[index]);
waitNodeState(ret, waitForState);
return ret;
}
@ -466,7 +466,7 @@ public void testClusterStats(int numNameNodes) throws IOException,
// Stop decommissioning and verify stats
writeConfigFile(excludeFile, null);
fsn.refreshNodes(conf);
DatanodeInfo ret = NameNodeAdapter.getDatanode(namenode, downnode);
DatanodeInfo ret = NameNodeAdapter.getDatanode(fsn, downnode);
waitNodeState(ret, AdminStates.NORMAL);
verifyStats(namenode, fsn, ret, false);
}

View File

@ -25,9 +25,30 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.Daemon;
public class BlockManagerTestUtil {
/** @return the datanode descriptor for the given the given storageID. */
public static DatanodeDescriptor getDatanode(final FSNamesystem ns,
final String storageID) {
ns.readLock();
try {
return ns.getBlockManager().getDatanodeManager().getDatanode(storageID);
} finally {
ns.readUnlock();
}
}
/**
* Refresh block queue counts on the name-node.
*/
public static void updateState(final BlockManager blockManager) {
blockManager.updateState();
}
/**
* @return a tuple of the replica state (number racks, number live
* replicas, and number needed replicas) for the given block.

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
package org.apache.hadoop.hdfs.server.blockmanagement;
import junit.framework.TestCase;
@ -23,8 +23,10 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
/**
* Test if FSNamesystem handles heartbeat right
@ -41,6 +43,8 @@ public void testCompInvalidate() throws Exception {
try {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit;
DatanodeDescriptor[] nodes =
namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]);
assertEquals(nodes.length, NUM_OF_DATANODES);
@ -48,26 +52,25 @@ public void testCompInvalidate() throws Exception {
namesystem.writeLock();
try {
for (int i=0; i<nodes.length; i++) {
for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) {
Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0,
for(int j=0; j<3*blockInvalidateLimit+1; j++) {
Block block = new Block(i*(blockInvalidateLimit+1)+j, 0,
GenerationStamp.FIRST_VALID_STAMP);
namesystem.getBlockManager().addToInvalidates(block, nodes[i]);
bm.addToInvalidates(block, nodes[i]);
}
}
assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES+1));
assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES));
assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1),
namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES-1));
int workCount = namesystem.getBlockManager().computeInvalidateWork(1);
assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
bm.computeInvalidateWork(NUM_OF_DATANODES+1));
assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
bm.computeInvalidateWork(NUM_OF_DATANODES));
assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1),
bm.computeInvalidateWork(NUM_OF_DATANODES-1));
int workCount = bm.computeInvalidateWork(1);
if (workCount == 1) {
assertEquals(namesystem.blockInvalidateLimit+1,
namesystem.getBlockManager().computeInvalidateWork(2));
assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
} else {
assertEquals(workCount, namesystem.blockInvalidateLimit);
assertEquals(2, namesystem.getBlockManager().computeInvalidateWork(2));
assertEquals(workCount, blockInvalidateLimit);
assertEquals(2, bm.computeInvalidateWork(2));
}
} finally {
namesystem.writeUnlock();

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -54,14 +55,9 @@ public void testHeartbeat() throws Exception {
final String poolId = namesystem.getBlockPoolId();
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
namesystem.readLock();
DatanodeDescriptor dd;
try {
dd = namesystem.getDatanode(nodeReg);
} finally {
namesystem.readUnlock();
}
final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
final int REMAINING_BLOCKS = 1;
final int MAX_REPLICATE_LIMIT =

View File

@ -596,7 +596,7 @@ private ArrayList<Block> prepareForRide(final Path filePath,
}
private void printStats() {
NameNodeAdapter.refreshBlockCounts(cluster.getNameNode());
BlockManagerTestUtil.updateState(cluster.getNamesystem().getBlockManager());
if(LOG.isDebugEnabled()) {
LOG.debug("Missing " + cluster.getNamesystem().getMissingBlocksCount());
LOG.debug("Corrupted " + cluster.getNamesystem().getCorruptReplicaBlocks());

View File

@ -45,14 +45,6 @@ public static LocatedBlocks getBlockLocations(NameNode namenode,
src, offset, length, false, true);
}
/**
* Refresh block queue counts on the name-node.
* @param namenode to proxy the invocation to
*/
public static void refreshBlockCounts(NameNode namenode) {
namenode.getNamesystem().getBlockManager().updateState();
}
/**
* Get the internal RPC server instance.
* @return rpc server
@ -68,12 +60,11 @@ public static String getLeaseHolderForPath(NameNode namenode, String path) {
/**
* Return the datanode descriptor for the given datanode.
*/
public static DatanodeDescriptor getDatanode(NameNode namenode,
public static DatanodeDescriptor getDatanode(final FSNamesystem ns,
DatanodeID id) throws IOException {
FSNamesystem ns = namenode.getNamesystem();
ns.readLock();
try {
return ns.getDatanode(id);
return ns.getBlockManager().getDatanodeManager().getDatanode(id);
} finally {
ns.readUnlock();
}

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
@ -61,13 +62,8 @@ private void waitForDatanodeState(String nodeID, boolean alive, int waitTime)
FSNamesystem namesystem = cluster.getNamesystem();
String state = alive ? "alive" : "dead";
while (System.currentTimeMillis() < stopTime) {
namesystem.readLock();
DatanodeDescriptor dd;
try {
dd = namesystem.getDatanode(nodeID);
} finally {
namesystem.readUnlock();
}
final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
namesystem, nodeID);
if (dd.isAlive == alive) {
LOG.info("datanode " + nodeID + " is " + state);
return;