diff --git a/hdfs/CHANGES.txt b/hdfs/CHANGES.txt index 5d862abcc4f..348c28f04fa 100644 --- a/hdfs/CHANGES.txt +++ b/hdfs/CHANGES.txt @@ -604,6 +604,9 @@ Trunk (unreleased changes) HDFS-2149. Move EditLogOp serialization formats into FsEditLogOp implementations. (Ivan Kelly via todd) + HDFS-2191. Move datanodeMap from FSNamesystem to DatanodeManager. + (szetszwo) + OPTIMIZATIONS HDFS-1458. Improve checkpoint performance by avoiding unnecessary image diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 6879bb755ee..b6c1e341557 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -42,9 +42,11 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; +import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator; +import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator; import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; @@ -53,6 +55,8 @@ import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.net.Node; import org.apache.hadoop.util.Daemon; @@ -156,7 +160,7 @@ public class BlockManager { public final int defaultReplication; /** The maximum number of entries returned by getCorruptInodes() */ final int maxCorruptFilesReturned; - + /** variable to enable check for enough racks */ final boolean shouldCheckForEnoughRacks; @@ -294,15 +298,14 @@ public class BlockManager { } } - // // Dump blocks from pendingReplication - // pendingReplications.metaSave(out); - // // Dump blocks that are waiting to be deleted - // dumpRecentInvalidateSets(out); + + // Dump all datanodes + getDatanodeManager().datanodeDump(out); } /** @@ -453,7 +456,7 @@ public class BlockManager { /** * Get all valid locations of the block */ - public ArrayList getValidLocations(Block block) { + private List getValidLocations(Block block) { ArrayList machineSet = new ArrayList(blocksMap.numNodes(block)); for(Iterator it = @@ -562,6 +565,49 @@ public class BlockManager { minReplication); } + /** Get all blocks with location information from a datanode. */ + public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode, + final long size) throws UnregisteredNodeException { + final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode); + if (node == null) { + NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: " + + "Asking for blocks from an unrecorded node " + datanode.getName()); + throw new HadoopIllegalArgumentException( + "Datanode " + datanode.getName() + " not found."); + } + + int numBlocks = node.numBlocks(); + if(numBlocks == 0) { + return new BlocksWithLocations(new BlockWithLocations[0]); + } + Iterator iter = node.getBlockIterator(); + int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block + // skip blocks + for(int i=0; i results = new ArrayList(); + long totalSize = 0; + BlockInfo curBlock; + while(totalSize it = node.getBlockIterator(); @@ -660,7 +706,7 @@ public class BlockManager { for(Map.Entry> entry : recentInvalidateSets.entrySet()) { Collection blocks = entry.getValue(); if (blocks.size() > 0) { - out.println(namesystem.getDatanode(entry.getKey()).getName() + blocks); + out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks); } } } @@ -684,7 +730,7 @@ public class BlockManager { private void markBlockAsCorrupt(BlockInfo storedBlock, DatanodeInfo dn) throws IOException { assert storedBlock != null : "storedBlock should not be null"; - DatanodeDescriptor node = namesystem.getDatanode(dn); + DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { throw new IOException("Cannot mark block " + storedBlock.getBlockName() + @@ -723,7 +769,7 @@ public class BlockManager { throws IOException { NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: " + blk + " on " + dn.getName()); - DatanodeDescriptor node = namesystem.getDatanode(dn); + DatanodeDescriptor node = getDatanodeManager().getDatanode(dn); if (node == null) { throw new IOException("Cannot invalidate block " + blk + " because datanode " + dn.getName() + @@ -748,7 +794,7 @@ public class BlockManager { } } - public void updateState() { + void updateState() { pendingReplicationBlocksCount = pendingReplications.size(); underReplicatedBlocksCount = neededReplications.size(); corruptReplicaBlocksCount = corruptReplicas.size(); @@ -1134,7 +1180,7 @@ public class BlockManager { * If there were any replication requests that timed out, reap them * and put them back into the neededReplication queue */ - public void processPendingReplications() { + private void processPendingReplications() { Block[] timedOutItems = pendingReplications.getTimedOutBlocks(); if (timedOutItems != null) { namesystem.writeLock(); @@ -1700,6 +1746,7 @@ public class BlockManager { addedNode, delNodeHint, blockplacement); } + public void addToExcessReplicate(DatanodeInfo dn, Block block) { assert namesystem.hasWriteLock(); Collection excessBlocks = excessReplicateMap.get(dn.getStorageID()); @@ -1773,6 +1820,21 @@ public class BlockManager { } } + /** + * Get all valid locations of the block & add the block to results + * return the length of the added block; 0 if the block is not added + */ + private long addBlock(Block block, List results) { + final List machineSet = getValidLocations(block); + if(machineSet.size() == 0) { + return 0; + } else { + results.add(new BlockWithLocations(block, + machineSet.toArray(new String[machineSet.size()]))); + return block.getNumBytes(); + } + } + /** * The given node is reporting that it received a certain block. */ @@ -1784,7 +1846,7 @@ public class BlockManager { // get the deletion hint node DatanodeDescriptor delHintNode = null; if (delHint != null && delHint.length() != 0) { - delHintNode = namesystem.getDatanode(delHint); + delHintNode = datanodeManager.getDatanode(delHint); if (delHintNode == null) { NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block + " is expected to be removed from an unrecorded node " @@ -2071,7 +2133,7 @@ public class BlockManager { return 0; // get blocks to invalidate for the nodeId assert nodeId != null; - DatanodeDescriptor dn = namesystem.getDatanode(nodeId); + final DatanodeDescriptor dn = datanodeManager.getDatanode(nodeId); if (dn == null) { removeFromInvalidates(nodeId); return 0; @@ -2082,11 +2144,11 @@ public class BlockManager { return 0; ArrayList blocksToInvalidate = new ArrayList( - namesystem.blockInvalidateLimit); + getDatanodeManager().blockInvalidateLimit); // # blocks that can be sent in one message is limited Iterator it = invalidateSet.iterator(); - for (int blkCount = 0; blkCount < namesystem.blockInvalidateLimit + for (int blkCount = 0; blkCount < getDatanodeManager().blockInvalidateLimit && it.hasNext(); blkCount++) { blocksToInvalidate.add(it.next()); it.remove(); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 5540f1fc6ab..2258769668f 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import java.io.IOException; +import java.io.PrintWriter; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -25,7 +26,9 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.NavigableMap; import java.util.Set; +import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,12 +38,22 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; +import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; +import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; +import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; +import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.ipc.Server; @@ -48,6 +61,7 @@ import org.apache.hadoop.net.CachedDNSToSwitchMapping; import org.apache.hadoop.net.DNSToSwitchMapping; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.ScriptBasedMapping; +import org.apache.hadoop.util.CyclicIteration; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.ReflectionUtils; @@ -62,6 +76,30 @@ public class DatanodeManager { final FSNamesystem namesystem; + /** + * Stores the datanode -> block map. + *

+ * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by + * storage id. In order to keep the storage map consistent it tracks + * all storages ever registered with the namenode. + * A descriptor corresponding to a specific storage id can be + *

    + *
  • added to the map if it is a new storage id;
  • + *
  • updated with a new datanode started as a replacement for the old one + * with the same storage id; and
  • + *
  • removed if and only if an existing datanode is restarted to serve a + * different storage id.
  • + *

+ * The list of the {@link DatanodeDescriptor}s in the map is checkpointed + * in the namespace image file. Only the {@link DatanodeInfo} part is + * persistent, the list of blocks is restored from the datanode block + * reports. + *

+ * Mapping: StorageID -> DatanodeDescriptor + */ + private final NavigableMap datanodeMap + = new TreeMap(); + /** Cluster network topology */ private final NetworkTopology networktopology = new NetworkTopology(); @@ -71,7 +109,12 @@ public class DatanodeManager { private final DNSToSwitchMapping dnsToSwitchMapping; /** Read include/exclude files*/ - private final HostsFileReader hostsReader; + private final HostsFileReader hostsReader; + + /** The period to wait for datanode heartbeat.*/ + private final long heartbeatExpireInterval; + /** Ask Datanode only up to this many blocks to delete. */ + final int blockInvalidateLimit; DatanodeManager(final FSNamesystem namesystem, final Configuration conf ) throws IOException { @@ -90,6 +133,19 @@ public class DatanodeManager { if (dnsToSwitchMapping instanceof CachedDNSToSwitchMapping) { dnsToSwitchMapping.resolve(new ArrayList(hostsReader.getHosts())); } + + final long heartbeatIntervalSeconds = conf.getLong( + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); + final int heartbeatRecheckInterval = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes + this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + + 10 * 1000 * heartbeatIntervalSeconds; + this.blockInvalidateLimit = Math.max(20*(int)(heartbeatIntervalSeconds), + DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT); + LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + + "=" + this.blockInvalidateLimit); } private Daemon decommissionthread = null; @@ -124,20 +180,88 @@ public class DatanodeManager { Arrays.sort(b.getLocations(), DFSUtil.DECOM_COMPARATOR); } } - + + CyclicIteration getDatanodeCyclicIteration( + final String firstkey) { + return new CyclicIteration( + datanodeMap, firstkey); + } + /** @return the datanode descriptor for the host. */ public DatanodeDescriptor getDatanodeByHost(final String host) { return host2DatanodeMap.getDatanodeByHost(host); } + /** Get a datanode descriptor given corresponding storageID */ + DatanodeDescriptor getDatanode(final String storageID) { + return datanodeMap.get(storageID); + } + + /** + * Get data node by storage ID. + * + * @param nodeID + * @return DatanodeDescriptor or null if the node is not found. + * @throws UnregisteredNodeException + */ + public DatanodeDescriptor getDatanode(DatanodeID nodeID + ) throws UnregisteredNodeException { + final DatanodeDescriptor node = getDatanode(nodeID.getStorageID()); + if (node == null) + return null; + if (!node.getName().equals(nodeID.getName())) { + final UnregisteredNodeException e = new UnregisteredNodeException( + nodeID, node); + NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: " + + e.getLocalizedMessage()); + throw e; + } + return node; + } + + /** Prints information about all datanodes. */ + void datanodeDump(final PrintWriter out) { + synchronized (datanodeMap) { + out.println("Metasave: Number of datanodes: " + datanodeMap.size()); + for(Iterator it = datanodeMap.values().iterator(); it.hasNext();) { + DatanodeDescriptor node = it.next(); + out.println(node.dumpDatanode()); + } + } + } + + /** Remove a dead datanode. */ + public void removeDeadDatanode(final DatanodeID nodeID) { + synchronized(namesystem.heartbeats) { + synchronized(datanodeMap) { + DatanodeDescriptor d; + try { + d = getDatanode(nodeID); + } catch(IOException e) { + d = null; + } + if (d != null && isDatanodeDead(d)) { + NameNode.stateChangeLog.info( + "BLOCK* removeDeadDatanode: lost heartbeat from " + d.getName()); + namesystem.removeDatanode(d); + } + } + } + } + + /** Is the datanode dead? */ + public boolean isDatanodeDead(DatanodeDescriptor node) { + return (node.getLastUpdate() < + (Util.now() - heartbeatExpireInterval)); + } + /** Add a datanode. */ private void addDatanode(final DatanodeDescriptor node) { // To keep host2DatanodeMap consistent with datanodeMap, // remove from host2DatanodeMap the datanodeDescriptor removed // from datanodeMap before adding node to host2DatanodeMap. - synchronized (namesystem.datanodeMap) { - host2DatanodeMap.remove( - namesystem.datanodeMap.put(node.getStorageID(), node)); + synchronized(datanodeMap) { + host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node)); } host2DatanodeMap.add(node); @@ -152,8 +276,8 @@ public class DatanodeManager { /** Physically remove node from datanodeMap. */ private void wipeDatanode(final DatanodeID node) throws IOException { final String key = node.getStorageID(); - synchronized (namesystem.datanodeMap) { - host2DatanodeMap.remove(namesystem.datanodeMap.remove(key)); + synchronized (datanodeMap) { + host2DatanodeMap.remove(datanodeMap.remove(key)); } if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ".wipeDatanode(" @@ -315,7 +439,7 @@ public class DatanodeManager { String newID = null; while(newID == null) { newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt()); - if (namesystem.datanodeMap.get(newID) != null) + if (datanodeMap.get(newID) != null) newID = null; } return newID; @@ -350,7 +474,7 @@ public class DatanodeManager { + "node registration from " + nodeReg.getName() + " storage " + nodeReg.getStorageID()); - DatanodeDescriptor nodeS = namesystem.datanodeMap.get(nodeReg.getStorageID()); + DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID()); DatanodeDescriptor nodeN = getDatanodeByHost(nodeReg.getName()); if (nodeN != null && nodeN != nodeS) { @@ -461,7 +585,7 @@ public class DatanodeManager { * 4. Removed from exclude --> stop decommission. */ public void refreshDatanodes() throws IOException { - for(DatanodeDescriptor node : namesystem.datanodeMap.values()) { + for(DatanodeDescriptor node : datanodeMap.values()) { // Check if not include. if (!inHostsList(node, null)) { node.setDisallowed(true); // case 2. @@ -475,6 +599,45 @@ public class DatanodeManager { } } + /** @return the number of live datanodes. */ + public int getNumLiveDataNodes() { + int numLive = 0; + synchronized (datanodeMap) { + for(DatanodeDescriptor dn : datanodeMap.values()) { + if (!isDatanodeDead(dn) ) { + numLive++; + } + } + } + return numLive; + } + + /** @return the number of dead datanodes. */ + public int getNumDeadDataNodes() { + int numDead = 0; + synchronized (datanodeMap) { + for(DatanodeDescriptor dn : datanodeMap.values()) { + if (isDatanodeDead(dn) ) { + numDead++; + } + } + } + return numDead; + } + + /** Fetch live and dead datanodes. */ + public void fetchDatanodess(final List live, + final List dead) { + final List results = + getDatanodeListForReport(DatanodeReportType.ALL); + for(DatanodeDescriptor node : results) { + if (isDatanodeDead(node)) + dead.add(node); + else + live.add(node); + } + } + /** For generating datanode reports */ public List getDatanodeListForReport( final DatanodeReportType type) { @@ -499,13 +662,13 @@ public class DatanodeManager { ArrayList nodes = null; - synchronized (namesystem.datanodeMap) { - nodes = new ArrayList(namesystem.datanodeMap.size() + + synchronized(datanodeMap) { + nodes = new ArrayList(datanodeMap.size() + mustList.size()); - Iterator it = namesystem.datanodeMap.values().iterator(); + Iterator it = datanodeMap.values().iterator(); while (it.hasNext()) { DatanodeDescriptor dn = it.next(); - boolean isDead = namesystem.isDatanodeDead(dn); + final boolean isDead = isDatanodeDead(dn); if ( (isDead && listDeadNodes) || (!isDead && listLiveNodes) ) { nodes.add(dn); } @@ -537,4 +700,77 @@ public class DatanodeManager { } return nodes; } + + private void setDatanodeDead(DatanodeDescriptor node) throws IOException { + node.setLastUpdate(0); + } + + /** Handle heartbeat from datanodes. */ + public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg, + final String blockPoolId, + long capacity, long dfsUsed, long remaining, long blockPoolUsed, + int xceiverCount, int maxTransfers, int failedVolumes + ) throws IOException { + synchronized (namesystem.heartbeats) { + synchronized (datanodeMap) { + DatanodeDescriptor nodeinfo = null; + try { + nodeinfo = getDatanode(nodeReg); + } catch(UnregisteredNodeException e) { + return new DatanodeCommand[]{DatanodeCommand.REGISTER}; + } + + // Check if this datanode should actually be shutdown instead. + if (nodeinfo != null && nodeinfo.isDisallowed()) { + setDatanodeDead(nodeinfo); + throw new DisallowedDatanodeException(nodeinfo); + } + + if (nodeinfo == null || !nodeinfo.isAlive) { + return new DatanodeCommand[]{DatanodeCommand.REGISTER}; + } + + namesystem.updateStats(nodeinfo, false); + nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed, + xceiverCount, failedVolumes); + namesystem.updateStats(nodeinfo, true); + + //check lease recovery + BlockInfoUnderConstruction[] blocks = nodeinfo + .getLeaseRecoveryCommand(Integer.MAX_VALUE); + if (blocks != null) { + BlockRecoveryCommand brCommand = new BlockRecoveryCommand( + blocks.length); + for (BlockInfoUnderConstruction b : blocks) { + brCommand.add(new RecoveringBlock( + new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b + .getBlockRecoveryId())); + } + return new DatanodeCommand[] { brCommand }; + } + + final List cmds = new ArrayList(3); + //check pending replication + List pendingList = nodeinfo.getReplicationCommand( + maxTransfers); + if (pendingList != null) { + cmds.add(new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, + pendingList)); + } + //check block invalidation + Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); + if (blks != null) { + cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, + blockPoolId, blks)); + } + + namesystem.addKeyUpdateCommand(cmds, nodeinfo); + if (!cmds.isEmpty()) { + return cmds.toArray(new DatanodeCommand[cmds.size()]); + } + } + } + + return null; + } } diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java index 4e88e4daddd..8275decec36 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java @@ -83,8 +83,8 @@ class DecommissionManager { private void check() { int count = 0; for(Map.Entry entry - : new CyclicIteration( - fsnamesystem.datanodeMap, firstkey)) { + : blockManager.getDatanodeManager().getDatanodeCyclicIteration( + firstkey)) { final DatanodeDescriptor d = entry.getValue(); firstkey = entry.getKey(); diff --git a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 71a9bccd2b6..da9c117bdf9 100644 --- a/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hdfs/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -237,35 +237,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, // Block pool ID used by this namenode String blockPoolId; - - /** - * Stores the datanode -> block map. - *

- * Done by storing a set of {@link DatanodeDescriptor} objects, sorted by - * storage id. In order to keep the storage map consistent it tracks - * all storages ever registered with the namenode. - * A descriptor corresponding to a specific storage id can be - *

    - *
  • added to the map if it is a new storage id;
  • - *
  • updated with a new datanode started as a replacement for the old one - * with the same storage id; and
  • - *
  • removed if and only if an existing datanode is restarted to serve a - * different storage id.
  • - *

- * The list of the {@link DatanodeDescriptor}s in the map is checkpointed - * in the namespace image file. Only the {@link DatanodeInfo} part is - * persistent, the list of blocks is restored from the datanode block - * reports. - *

- * Mapping: StorageID -> DatanodeDescriptor - */ - public final NavigableMap datanodeMap = - new TreeMap(); /** - * Stores a set of DatanodeDescriptor objects. - * This is a subset of {@link #datanodeMap}, containing nodes that are - * considered alive. + * Stores a subset of datanodeMap, containing nodes that are considered alive. * The HeartbeatMonitor periodically checks for out-dated entries, * and removes them from the list. */ @@ -289,9 +263,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, // heartbeatRecheckInterval is how often namenode checks for expired datanodes private long heartbeatRecheckInterval; - // heartbeatExpireInterval is how long namenode waits for datanode to report - // heartbeat - private long heartbeatExpireInterval; //resourceRecheckInterval is how often namenode checks for the disk space availability private long resourceRecheckInterval; @@ -314,9 +285,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, */ private final GenerationStamp generationStamp = new GenerationStamp(); - // Ask Datanode only up to this many blocks to delete. - public int blockInvalidateLimit = DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT; - // precision of access times. private long accessTimePrecision = 0; @@ -513,14 +481,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, this.defaultPermission = PermissionStatus.createImmutable( fsOwner.getShortUserName(), supergroup, new FsPermission(filePermission)); - long heartbeatInterval = conf.getLong( - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, - DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; this.heartbeatRecheckInterval = conf.getInt( DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes - this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + - 10 * heartbeatInterval; this.serverDefaults = new FsServerDefaults( conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE), @@ -531,14 +494,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, this.maxFsObjects = conf.getLong(DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY, DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT); - //default limit - this.blockInvalidateLimit = Math.max(this.blockInvalidateLimit, - 20*(int)(heartbeatInterval/1000)); - //use conf value if it is set. - this.blockInvalidateLimit = conf.getInt( - DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY, this.blockInvalidateLimit); - LOG.info(DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY + "=" + this.blockInvalidateLimit); - this.accessTimePrecision = conf.getLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0); this.supportAppends = conf.getBoolean(DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT); @@ -642,12 +597,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, out.println("Live Datanodes: "+live.size()); out.println("Dead Datanodes: "+dead.size()); blockManager.metaSave(out); - - // - // Dump all datanodes - // - datanodeDump(out); - + out.flush(); out.close(); } finally { @@ -688,45 +638,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, readLock(); try { checkSuperuserPrivilege(); - - DatanodeDescriptor node = getDatanode(datanode); - if (node == null) { - NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: " - + "Asking for blocks from an unrecorded node " + datanode.getName()); - throw new IllegalArgumentException( - "Unexpected exception. Got getBlocks message for datanode " + - datanode.getName() + ", but there is no info for it"); - } - - int numBlocks = node.numBlocks(); - if(numBlocks == 0) { - return new BlocksWithLocations(new BlockWithLocations[0]); - } - Iterator iter = node.getBlockIterator(); - int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block - // skip blocks - for(int i=0; i results = new ArrayList(); - long totalSize = 0; - BlockInfo curBlock; - while(totalSize results) { - assert hasReadOrWriteLock(); - ArrayList machineSet = blockManager.getValidLocations(block); - if(machineSet.size() == 0) { - return 0; - } else { - results.add(new BlockWithLocations(block, - machineSet.toArray(new String[machineSet.size()]))); - return block.getNumBytes(); - } - } - ///////////////////////////////////////////////////////// // // These methods are called by HadoopFS clients @@ -1795,7 +1691,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, //find datanode descriptors chosen = new ArrayList(); for(DatanodeInfo d : existings) { - final DatanodeDescriptor descriptor = getDatanode(d); + final DatanodeDescriptor descriptor = blockManager.getDatanodeManager( + ).getDatanode(d); if (descriptor != null) { chosen.add(descriptor); } @@ -2622,7 +2519,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, if (newtargets.length > 0) { descriptors = new DatanodeDescriptor[newtargets.length]; for(int i = 0; i < newtargets.length; i++) { - descriptors[i] = getDatanode(newtargets[i]); + descriptors[i] = blockManager.getDatanodeManager().getDatanode( + newtargets[i]); } } if (closeFile) { @@ -2766,15 +2664,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, return Storage.getRegistrationID(dir.fsImage.getStorage()); } - public boolean isDatanodeDead(DatanodeDescriptor node) { - return (node.getLastUpdate() < - (now() - heartbeatExpireInterval)); - } - - private void setDatanodeDead(DatanodeDescriptor node) throws IOException { - node.setLastUpdate(0); - } - /** * The given node has reported in. This method should: * 1) Record the heartbeat, so the datanode isn't timed out @@ -2792,91 +2681,32 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, throws IOException { readLock(); try { - return handleHeartbeatInternal(nodeReg, capacity, dfsUsed, - remaining, blockPoolUsed, xceiverCount, xmitsInProgress, - failedVolumes); + final int maxTransfer = blockManager.maxReplicationStreams - xmitsInProgress; + DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( + nodeReg, blockPoolId, capacity, dfsUsed, remaining, blockPoolUsed, + xceiverCount, maxTransfer, failedVolumes); + if (cmds != null) { + return cmds; + } + + //check distributed upgrade + DatanodeCommand cmd = getDistributedUpgradeCommand(); + if (cmd != null) { + return new DatanodeCommand[] {cmd}; + } + return null; } finally { readUnlock(); } } - /** @see #handleHeartbeat(DatanodeRegistration, long, long, long, long, int, int, int) */ - DatanodeCommand[] handleHeartbeatInternal(DatanodeRegistration nodeReg, - long capacity, long dfsUsed, long remaining, long blockPoolUsed, - int xceiverCount, int xmitsInProgress, int failedVolumes) - throws IOException { - assert hasReadLock(); - DatanodeCommand cmd = null; - synchronized (heartbeats) { - synchronized (datanodeMap) { - DatanodeDescriptor nodeinfo = null; - try { - nodeinfo = getDatanode(nodeReg); - } catch(UnregisteredNodeException e) { - return new DatanodeCommand[]{DatanodeCommand.REGISTER}; - } - - // Check if this datanode should actually be shutdown instead. - if (nodeinfo != null && nodeinfo.isDisallowed()) { - setDatanodeDead(nodeinfo); - throw new DisallowedDatanodeException(nodeinfo); - } - - if (nodeinfo == null || !nodeinfo.isAlive) { - return new DatanodeCommand[]{DatanodeCommand.REGISTER}; - } - - updateStats(nodeinfo, false); - nodeinfo.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed, - xceiverCount, failedVolumes); - updateStats(nodeinfo, true); - - //check lease recovery - BlockInfoUnderConstruction[] blocks = nodeinfo - .getLeaseRecoveryCommand(Integer.MAX_VALUE); - if (blocks != null) { - BlockRecoveryCommand brCommand = new BlockRecoveryCommand( - blocks.length); - for (BlockInfoUnderConstruction b : blocks) { - brCommand.add(new RecoveringBlock( - new ExtendedBlock(blockPoolId, b), b.getExpectedLocations(), b - .getBlockRecoveryId())); - } - return new DatanodeCommand[] { brCommand }; - } - - ArrayList cmds = new ArrayList(3); - //check pending replication - List pendingList = nodeinfo.getReplicationCommand( - blockManager.maxReplicationStreams - xmitsInProgress); - if (pendingList != null) { - cmd = new BlockCommand(DatanodeProtocol.DNA_TRANSFER, blockPoolId, - pendingList); - cmds.add(cmd); - } - //check block invalidation - Block[] blks = nodeinfo.getInvalidateBlocks(blockInvalidateLimit); - if (blks != null) { - cmd = new BlockCommand(DatanodeProtocol.DNA_INVALIDATE, blockPoolId, blks); - cmds.add(cmd); - } - // check access key update - if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) { - cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys())); - nodeinfo.needKeyUpdate = false; - } - if (!cmds.isEmpty()) { - return cmds.toArray(new DatanodeCommand[cmds.size()]); - } - } + public void addKeyUpdateCommand(final List cmds, + final DatanodeDescriptor nodeinfo) { + // check access key update + if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) { + cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys())); + nodeinfo.needKeyUpdate = false; } - - //check distributed upgrade - cmd = getDistributedUpgradeCommand(); - if (cmd != null) { - return new DatanodeCommand[] {cmd}; - } - return null; } public void updateStats(DatanodeDescriptor node, boolean isAdded) { @@ -3017,7 +2847,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, ) throws UnregisteredNodeException { writeLock(); try { - DatanodeDescriptor nodeInfo = getDatanode(nodeID); + DatanodeDescriptor nodeInfo = getBlockManager().getDatanodeManager( + ).getDatanode(nodeID); if (nodeInfo != null) { removeDatanode(nodeInfo); } else { @@ -3033,7 +2864,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, * Remove a datanode descriptor. * @param nodeInfo datanode descriptor. */ - private void removeDatanode(DatanodeDescriptor nodeInfo) { + public void removeDatanode(DatanodeDescriptor nodeInfo) { assert hasWriteLock(); synchronized (heartbeats) { if (nodeInfo.isAlive) { @@ -3064,6 +2895,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, * effect causes more datanodes to be declared dead. */ void heartbeatCheck() { + final DatanodeManager datanodeManager = getBlockManager().getDatanodeManager(); // It's OK to check safe mode w/o taking the lock here, we re-check // for safe mode after taking the lock before removing a datanode. if (isInSafeMode()) { @@ -3079,7 +2911,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, for (Iterator it = heartbeats.iterator(); it.hasNext();) { DatanodeDescriptor nodeInfo = it.next(); - if (isDatanodeDead(nodeInfo)) { + if (datanodeManager.isDatanodeDead(nodeInfo)) { expiredHeartbeats.incr(); foundDead = true; nodeID = nodeInfo; @@ -3095,21 +2927,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, return; } try { - synchronized(heartbeats) { - synchronized (datanodeMap) { - DatanodeDescriptor nodeInfo = null; - try { - nodeInfo = getDatanode(nodeID); - } catch (IOException e) { - nodeInfo = null; - } - if (nodeInfo != null && isDatanodeDead(nodeInfo)) { - NameNode.stateChangeLog.info("BLOCK* NameSystem.heartbeatCheck: " - + "lost heartbeat from " + nodeInfo.getName()); - removeDatanode(nodeInfo); - } - } - } + datanodeManager.removeDeadDatanode(nodeID); } finally { writeUnlock(); } @@ -3129,7 +2947,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, writeLock(); startTime = now(); //after acquiring write lock try { - DatanodeDescriptor node = getDatanode(nodeID); + final DatanodeDescriptor node = blockManager.getDatanodeManager( + ).getDatanode(nodeID); if (node == null || !node.isAlive) { throw new IOException("ProcessReport from dead or unregistered node: " + nodeID.getName()); @@ -3269,7 +3088,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, ) throws IOException { writeLock(); try { - DatanodeDescriptor node = getDatanode(nodeID); + final DatanodeDescriptor node = blockManager.getDatanodeManager( + ).getDatanode(nodeID); if (node == null || !node.isAlive) { NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block + " is received from dead or unregistered node " + nodeID.getName()); @@ -3475,33 +3295,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, ArrayList dead) { readLock(); try { - final List results = getBlockManager( - ).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.ALL); - for(Iterator it = results.iterator(); it.hasNext();) { - DatanodeDescriptor node = it.next(); - if (isDatanodeDead(node)) - dead.add(node); - else - live.add(node); - } - } finally { - readUnlock(); - } - } - - /** - * Prints information about all datanodes. - */ - private void datanodeDump(PrintWriter out) { - readLock(); - try { - synchronized (datanodeMap) { - out.println("Metasave: Number of datanodes: " + datanodeMap.size()); - for(Iterator it = datanodeMap.values().iterator(); it.hasNext();) { - DatanodeDescriptor node = it.next(); - out.println(node.dumpDatanode()); - } - } + getBlockManager().getDatanodeManager().fetchDatanodess(live, dead); } finally { readUnlock(); } @@ -3556,30 +3350,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, checkSuperuserPrivilege(); getFSImage().finalizeUpgrade(); } - - - /** - * Get data node by storage ID. - * - * @param nodeID - * @return DatanodeDescriptor or null if the node is not found. - * @throws IOException - */ - public DatanodeDescriptor getDatanode(DatanodeID nodeID - ) throws UnregisteredNodeException { - assert hasReadOrWriteLock(); - UnregisteredNodeException e = null; - DatanodeDescriptor node = datanodeMap.get(nodeID.getStorageID()); - if (node == null) - return null; - if (!node.getName().equals(nodeID.getName())) { - e = new UnregisteredNodeException(nodeID, node); - NameNode.stateChangeLog.fatal("BLOCK* NameSystem.getDatanode: " - + e.getLocalizedMessage()); - throw e; - } - return node; - } /** * SafeModeInfo contains information related to the safe mode. @@ -4503,43 +4273,14 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, } - /** - * Number of live data nodes - * @return Number of live data nodes - */ @Override // FSNamesystemMBean public int getNumLiveDataNodes() { - int numLive = 0; - synchronized (datanodeMap) { - for(Iterator it = datanodeMap.values().iterator(); - it.hasNext();) { - DatanodeDescriptor dn = it.next(); - if (!isDatanodeDead(dn) ) { - numLive++; - } - } - } - return numLive; + return getBlockManager().getDatanodeManager().getNumLiveDataNodes(); } - - /** - * Number of dead data nodes - * @return Number of dead data nodes - */ @Override // FSNamesystemMBean public int getNumDeadDataNodes() { - int numDead = 0; - synchronized (datanodeMap) { - for(Iterator it = datanodeMap.values().iterator(); - it.hasNext();) { - DatanodeDescriptor dn = it.next(); - if (isDatanodeDead(dn) ) { - numDead++; - } - } - } - return numDead; + return getBlockManager().getDatanodeManager().getNumDeadDataNodes(); } /** @@ -4699,11 +4440,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, blockinfo.setNumBytes(newBlock.getNumBytes()); // find the DatanodeDescriptor objects + final DatanodeManager dm = getBlockManager().getDatanodeManager(); DatanodeDescriptor[] descriptors = null; if (newNodes.length > 0) { descriptors = new DatanodeDescriptor[newNodes.length]; for(int i = 0; i < newNodes.length; i++) { - descriptors[i] = getDatanode(newNodes[i]); + descriptors[i] = dm.getDatanode(newNodes[i]); } } blockinfo.setExpectedLocations(descriptors); @@ -4832,12 +4574,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean, return blockManager.numCorruptReplicas(blk); } - /** Get a datanode descriptor given corresponding storageID */ - public DatanodeDescriptor getDatanode(String nodeID) { - assert hasReadOrWriteLock(); - return datanodeMap.get(nodeID); - } - /** * Return a range of corrupt replica block ids. Up to numExpectedBlocks * blocks starting at the next block after startingBlockId are returned diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java index 491a0b550d6..2a88f78688a 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/TestDecommission.java @@ -226,7 +226,7 @@ public class TestDecommission { writeConfigFile(excludeFile, nodes); cluster.getNamesystem(nnIndex).refreshNodes(conf); DatanodeInfo ret = NameNodeAdapter.getDatanode( - cluster.getNameNode(nnIndex), info[index]); + cluster.getNamesystem(nnIndex), info[index]); waitNodeState(ret, waitForState); return ret; } @@ -466,7 +466,7 @@ public class TestDecommission { // Stop decommissioning and verify stats writeConfigFile(excludeFile, null); fsn.refreshNodes(conf); - DatanodeInfo ret = NameNodeAdapter.getDatanode(namenode, downnode); + DatanodeInfo ret = NameNodeAdapter.getDatanode(fsn, downnode); waitNodeState(ret, AdminStates.NORMAL); verifyStats(namenode, fsn, ret, false); } diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java index 592705afc19..292f7d817cc 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java @@ -25,9 +25,30 @@ import java.util.Set; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.util.Daemon; public class BlockManagerTestUtil { + + /** @return the datanode descriptor for the given the given storageID. */ + public static DatanodeDescriptor getDatanode(final FSNamesystem ns, + final String storageID) { + ns.readLock(); + try { + return ns.getBlockManager().getDatanodeManager().getDatanode(storageID); + } finally { + ns.readUnlock(); + } + } + + + /** + * Refresh block queue counts on the name-node. + */ + public static void updateState(final BlockManager blockManager) { + blockManager.updateState(); + } + /** * @return a tuple of the replica state (number racks, number live * replicas, and number needed replicas) for the given block. diff --git a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java similarity index 67% rename from hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java rename to hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java index 0299cc55954..3e32a95cb0d 100644 --- a/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestComputeInvalidateWork.java +++ b/hdfs/src/test/hdfs/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdfs.server.namenode; +package org.apache.hadoop.hdfs.server.blockmanagement; import junit.framework.TestCase; @@ -23,8 +23,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.common.GenerationStamp; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; /** * Test if FSNamesystem handles heartbeat right @@ -41,6 +43,8 @@ public class TestComputeInvalidateWork extends TestCase { try { cluster.waitActive(); final FSNamesystem namesystem = cluster.getNamesystem(); + final BlockManager bm = namesystem.getBlockManager(); + final int blockInvalidateLimit = bm.getDatanodeManager().blockInvalidateLimit; DatanodeDescriptor[] nodes = namesystem.heartbeats.toArray(new DatanodeDescriptor[NUM_OF_DATANODES]); assertEquals(nodes.length, NUM_OF_DATANODES); @@ -48,26 +52,25 @@ public class TestComputeInvalidateWork extends TestCase { namesystem.writeLock(); try { for (int i=0; i