HDFS-2228. Move block and datanode code from FSNamesystem to BlockManager and DatanodeManager. (szetszwo)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1154899 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-08-08 10:06:45 +00:00
parent 8dc420ba36
commit 371f4a5905
25 changed files with 636 additions and 770 deletions

View File

@ -641,6 +641,9 @@ Trunk (unreleased changes)
HDFS-2226. Clean up counting of operations in FSEditLogLoader (todd)
HDFS-2228. Move block and datanode code from FSNamesystem to
BlockManager and DatanodeManager. (szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -259,26 +259,6 @@ public class BlockInfo extends Block implements LightWeightGSet.LinkedElement {
return head;
}
boolean listIsConsistent(DatanodeDescriptor dn) {
// going forward
int count = 0;
BlockInfo next, nextPrev;
BlockInfo cur = this;
while(cur != null) {
next = cur.getNext(cur.findDatanode(dn));
if(next != null) {
nextPrev = next.getPrevious(next.findDatanode(dn));
if(cur != nextPrev) {
System.out.println("Inconsistent list: cur->next->prev != cur");
return false;
}
}
cur = next;
count++;
}
return true;
}
/**
* BlockInfo represents a block that is not being constructed.
* In order to start modifying the block, the BlockInfo should be converted

View File

@ -46,13 +46,15 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@ -60,8 +62,9 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
/**
@ -81,18 +84,13 @@ public class BlockManager {
private volatile long pendingReplicationBlocksCount = 0L;
private volatile long corruptReplicaBlocksCount = 0L;
private volatile long underReplicatedBlocksCount = 0L;
public volatile long scheduledReplicationBlocksCount = 0L;
private volatile long scheduledReplicationBlocksCount = 0L;
private volatile long excessBlocksCount = 0L;
private volatile long pendingDeletionBlocksCount = 0L;
private boolean isBlockTokenEnabled;
private long blockKeyUpdateInterval;
private long blockTokenLifetime;
private BlockTokenSecretManager blockTokenSecretManager;
/** returns the isBlockTokenEnabled - true if block token enabled ,else false */
public boolean isBlockTokenEnabled() {
return isBlockTokenEnabled;
}
/** get the BlockTokenSecretManager */
public BlockTokenSecretManager getBlockTokenSecretManager() {
@ -131,7 +129,7 @@ public class BlockManager {
* Mapping: Block -> { INode, datanodes, self ref }
* Updated only in response to client-sent information.
*/
public final BlocksMap blocksMap;
final BlocksMap blocksMap;
private final DatanodeManager datanodeManager;
private final HeartbeatManager heartbeatManager;
@ -168,13 +166,13 @@ public class BlockManager {
private final PendingReplicationBlocks pendingReplications;
/** The maximum number of replicas allowed for a block */
public final int maxReplication;
public final short maxReplication;
/** The maximum number of outgoing replication streams
* a given node should have at one time
*/
int maxReplicationStreams;
/** Minimum copies needed or else write is disallowed */
public final int minReplication;
public final short minReplication;
/** Default number of replicas */
public final int defaultReplication;
/** The maximum number of entries returned by getCorruptInodes() */
@ -189,30 +187,6 @@ public class BlockManager {
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
/**
* Get access keys
*
* @return current access keys
*/
public ExportedBlockKeys getBlockKeys() {
return isBlockTokenEnabled ? blockTokenSecretManager.exportKeys()
: ExportedBlockKeys.DUMMY_KEYS;
}
/** Generate block token for a LocatedBlock. */
public void setBlockToken(LocatedBlock l) throws IOException {
Token<BlockTokenIdentifier> token = blockTokenSecretManager.generateToken(l
.getBlock(), EnumSet.of(BlockTokenSecretManager.AccessMode.READ));
l.setBlockToken(token);
}
/** Generate block tokens for the blocks to be returned. */
public void setBlockTokens(List<LocatedBlock> locatedBlocks) throws IOException {
for(LocatedBlock l : locatedBlocks) {
setBlockToken(l);
}
}
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
namesystem = fsn;
datanodeManager = new DatanodeManager(this, fsn, conf);
@ -249,25 +223,28 @@ public class BlockManager {
DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
this.maxReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
this.minReplication = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
if (minReplication <= 0)
throw new IOException(
"Unexpected configuration parameters: dfs.namenode.replication.min = "
+ minReplication
+ " must be greater than 0");
if (maxReplication >= (int)Short.MAX_VALUE)
throw new IOException(
"Unexpected configuration parameters: dfs.replication.max = "
+ maxReplication + " must be less than " + (Short.MAX_VALUE));
if (maxReplication < minReplication)
throw new IOException(
"Unexpected configuration parameters: dfs.namenode.replication.min = "
+ minReplication
+ " must be less than dfs.replication.max = "
+ maxReplication);
final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
if (minR <= 0)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ " = " + minR + " <= 0");
if (maxR > Short.MAX_VALUE)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+ " = " + maxR + " > " + Short.MAX_VALUE);
if (minR > maxR)
throw new IOException("Unexpected configuration parameters: "
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+ " = " + minR + " > "
+ DFSConfigKeys.DFS_REPLICATION_MAX_KEY
+ " = " + maxR);
this.minReplication = (short)minR;
this.maxReplication = (short)maxR;
this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
@ -517,7 +494,7 @@ public class BlockManager {
}
long fileLength = fileINode.computeContentSummary().getLength();
return getBlockLocation(ucBlock, fileLength - ucBlock.getNumBytes());
return createLocatedBlock(ucBlock, fileLength - ucBlock.getNumBytes());
}
/**
@ -537,8 +514,9 @@ public class BlockManager {
return machineSet;
}
public List<LocatedBlock> getBlockLocations(BlockInfo[] blocks, long offset,
long length, int nrBlocksToReturn) throws IOException {
private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
final long offset, final long length, final int nrBlocksToReturn
) throws IOException {
int curBlk = 0;
long curPos = 0, blkSize = 0;
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
@ -557,7 +535,7 @@ public class BlockManager {
long endOff = offset + length;
List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
do {
results.add(getBlockLocation(blocks[curBlk], curPos));
results.add(createLocatedBlock(blocks[curBlk], curPos));
curPos += blocks[curBlk].getNumBytes();
curBlk++;
} while (curPos < endOff
@ -567,7 +545,7 @@ public class BlockManager {
}
/** @return a LocatedBlock for the given block */
public LocatedBlock getBlockLocation(final BlockInfo blk, final long pos
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
) throws IOException {
if (blk instanceof BlockInfoUnderConstruction) {
if (blk.isComplete()) {
@ -608,6 +586,76 @@ public class BlockManager {
return new LocatedBlock(eb, machines, pos, isCorrupt);
}
/** Create a LocatedBlocks. */
public LocatedBlocks createLocatedBlocks(final BlockInfo[] blocks,
final long fileSizeExcludeBlocksUnderConstruction,
final boolean isFileUnderConstruction,
final long offset, final long length, final boolean needBlockToken
) throws IOException {
assert namesystem.hasReadOrWriteLock();
if (blocks == null) {
return null;
} else if (blocks.length == 0) {
return new LocatedBlocks(0, isFileUnderConstruction,
Collections.<LocatedBlock>emptyList(), null, false);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
}
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
blocks, offset, length, Integer.MAX_VALUE);
final BlockInfo last = blocks[blocks.length - 1];
final long lastPos = last.isComplete()?
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
: fileSizeExcludeBlocksUnderConstruction;
final LocatedBlock lastlb = createLocatedBlock(last, lastPos);
if (isBlockTokenEnabled && needBlockToken) {
for(LocatedBlock lb : locatedblocks) {
setBlockToken(lb, AccessMode.READ);
}
setBlockToken(lastlb, AccessMode.READ);
}
return new LocatedBlocks(
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
locatedblocks, lastlb, last.isComplete());
}
}
/** @return current access keys. */
public ExportedBlockKeys getBlockKeys() {
return isBlockTokenEnabled? blockTokenSecretManager.exportKeys()
: ExportedBlockKeys.DUMMY_KEYS;
}
/** Generate a block token for the located block. */
public void setBlockToken(final LocatedBlock b,
final BlockTokenSecretManager.AccessMode mode) throws IOException {
if (isBlockTokenEnabled) {
b.setBlockToken(blockTokenSecretManager.generateToken(b.getBlock(),
EnumSet.of(mode)));
}
}
void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
final DatanodeDescriptor nodeinfo) {
// check access key update
if (isBlockTokenEnabled && nodeinfo.needKeyUpdate) {
cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
nodeinfo.needKeyUpdate = false;
}
}
/**
* Clamp the specified replication between the minimum and the maximum
* replication levels.
*/
public short adjustReplication(short replication) {
return replication < minReplication? minReplication
: replication > maxReplication? maxReplication: replication;
}
/**
* Check whether the replication parameter is within the range
* determined by system configuration.
@ -639,7 +687,7 @@ public class BlockManager {
final long size) throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
if (node == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.getBlocks: "
NameNode.stateChangeLog.warn("BLOCK* getBlocks: "
+ "Asking for blocks from an unrecorded node " + datanode.getName());
throw new HadoopIllegalArgumentException(
"Datanode " + datanode.getName() + " not found.");
@ -711,7 +759,7 @@ public class BlockManager {
* @param dn datanode
* @param log true to create an entry in the log
*/
void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
private void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
Collection<Block> invalidateSet = recentInvalidateSets
.get(dn.getStorageID());
if (invalidateSet == null) {
@ -721,7 +769,7 @@ public class BlockManager {
if (invalidateSet.add(b)) {
pendingDeletionBlocksCount++;
if (log) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
+ b + " to " + dn.getName());
}
}
@ -734,7 +782,7 @@ public class BlockManager {
* @param b block
* @param dn datanode
*/
public void addToInvalidates(Block b, DatanodeInfo dn) {
void addToInvalidates(Block b, DatanodeInfo dn) {
addToInvalidates(b, dn, true);
}
@ -751,7 +799,7 @@ public class BlockManager {
datanodes.append(node.getName()).append(" ");
}
if (datanodes.length() != 0) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
+ b + " to " + datanodes.toString());
}
}
@ -775,20 +823,29 @@ public class BlockManager {
}
}
public void findAndMarkBlockAsCorrupt(Block blk,
DatanodeInfo dn) throws IOException {
BlockInfo storedBlock = getStoredBlock(blk);
if (storedBlock == null) {
// Check if the replica is in the blockMap, if not
// ignore the request for now. This could happen when BlockScanner
// thread of Datanode reports bad block before Block reports are sent
// by the Datanode on startup
NameNode.stateChangeLog.info("BLOCK* NameSystem.markBlockAsCorrupt: " +
"block " + blk + " could not be marked as " +
"corrupt as it does not exist in blocksMap");
return;
/**
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
*/
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn) throws IOException {
namesystem.writeLock();
try {
final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
if (storedBlock == null) {
// Check if the replica is in the blockMap, if not
// ignore the request for now. This could happen when BlockScanner
// thread of Datanode reports bad block before Block reports are sent
// by the Datanode on startup
NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
+ blk + " not found.");
return;
}
markBlockAsCorrupt(storedBlock, dn);
} finally {
namesystem.writeUnlock();
}
markBlockAsCorrupt(storedBlock, dn);
}
private void markBlockAsCorrupt(BlockInfo storedBlock,
@ -804,7 +861,7 @@ public class BlockManager {
INodeFile inode = storedBlock.getINode();
if (inode == null) {
NameNode.stateChangeLog.info("BLOCK NameSystem.markBlockAsCorrupt: " +
NameNode.stateChangeLog.info("BLOCK markBlockAsCorrupt: " +
"block " + storedBlock +
" could not be marked as corrupt as it" +
" does not belong to any file");
@ -831,13 +888,12 @@ public class BlockManager {
*/
private void invalidateBlock(Block blk, DatanodeInfo dn)
throws IOException {
NameNode.stateChangeLog.info("DIR* NameSystem.invalidateBlock: "
NameNode.stateChangeLog.info("BLOCK* invalidateBlock: "
+ blk + " on " + dn.getName());
DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
if (node == null) {
throw new IOException("Cannot invalidate block " + blk +
" because datanode " + dn.getName() +
" does not exist.");
throw new IOException("Cannot invalidate block " + blk
+ " because datanode " + dn.getName() + " does not exist.");
}
// Check how many copies we have of the block. If we have at least one
@ -847,14 +903,12 @@ public class BlockManager {
addToInvalidates(blk, dn);
removeStoredBlock(blk, node);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.invalidateBlocks: "
+ blk + " on "
+ dn.getName() + " listed for deletion.");
NameNode.stateChangeLog.debug("BLOCK* invalidateBlocks: "
+ blk + " on " + dn.getName() + " listed for deletion.");
}
} else {
NameNode.stateChangeLog.info("BLOCK* NameSystem.invalidateBlocks: "
+ blk + " on " + dn.getName()
+ " is the only copy and was not deleted.");
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + blk + " on "
+ dn.getName() + " is the only copy and was not deleted.");
}
}
@ -1286,20 +1340,51 @@ public class BlockManager {
}
/**
* The given node is reporting all its blocks. Use this info to
* update the (datanode-->blocklist) and (block-->nodelist) tables.
* The given datanode is reporting all its blocks.
* Update the (machine-->blocklist) and (block-->machinelist) maps.
*/
public void processReport(DatanodeDescriptor node, BlockListAsLongs report)
throws IOException {
boolean isFirstBlockReport = (node.numBlocks() == 0);
if (isFirstBlockReport) {
// Initial block reports can be processed a lot more efficiently than
// ordinary block reports. This shortens NN restart times.
processFirstBlockReport(node, report);
return;
}
public void processReport(final DatanodeID nodeID, final String poolId,
final BlockListAsLongs newReport) throws IOException {
namesystem.writeLock();
final long startTime = Util.now(); //after acquiring write lock
final long endTime;
try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
throw new IOException("ProcessReport from dead or unregistered node: "
+ nodeID.getName());
}
// To minimize startup time, we discard any second (or later) block reports
// that we receive while still in startup phase.
if (namesystem.isInStartupSafeMode() && node.numBlocks() > 0) {
NameNode.stateChangeLog.info("BLOCK* processReport: "
+ "discarded non-initial block report from " + nodeID.getName()
+ " because namenode still in startup phase");
return;
}
if (node.numBlocks() == 0) {
// The first block report can be processed a lot more efficiently than
// ordinary block reports. This shortens restart times.
processFirstBlockReport(node, newReport);
} else {
processReport(node, newReport);
}
} finally {
endTime = Util.now();
namesystem.writeUnlock();
}
// Log the block report processing stats from Namenode perspective
NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
NameNode.stateChangeLog.info("BLOCK* processReport: from "
+ nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+ ", processing time: " + (endTime - startTime) + " msecs");
}
private void processReport(final DatanodeDescriptor node,
final BlockListAsLongs report) throws IOException {
// Normal case:
// Modify the (block-->datanode) map, according to the difference
// between the old and new block report.
@ -1322,7 +1407,7 @@ public class BlockManager {
addStoredBlock(b, node, null, true);
}
for (Block b : toInvalidate) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: block "
NameNode.stateChangeLog.info("BLOCK* processReport: block "
+ b + " on " + node.getName() + " size " + b.getNumBytes()
+ " does not belong to any file.");
addToInvalidates(b, node);
@ -1343,8 +1428,8 @@ public class BlockManager {
* @param report - the initial block report, to be processed
* @throws IOException
*/
void processFirstBlockReport(DatanodeDescriptor node, BlockListAsLongs report)
throws IOException {
private void processFirstBlockReport(final DatanodeDescriptor node,
final BlockListAsLongs report) throws IOException {
if (report == null) return;
assert (namesystem.hasWriteLock());
assert (node.numBlocks() == 0);
@ -1441,12 +1526,12 @@ public class BlockManager {
* @param toUC replicas of blocks currently under construction
* @return
*/
BlockInfo processReportedBlock(DatanodeDescriptor dn,
Block block, ReplicaState reportedState,
Collection<BlockInfo> toAdd,
Collection<Block> toInvalidate,
Collection<BlockInfo> toCorrupt,
Collection<StatefulBlockInfo> toUC) {
private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
final Block block, final ReplicaState reportedState,
final Collection<BlockInfo> toAdd,
final Collection<Block> toInvalidate,
final Collection<BlockInfo> toCorrupt,
final Collection<StatefulBlockInfo> toUC) {
if(LOG.isDebugEnabled()) {
LOG.debug("Reported block " + block
@ -1616,11 +1701,9 @@ public class BlockManager {
}
if (storedBlock == null || storedBlock.getINode() == null) {
// If this block does not belong to anyfile, then we are done.
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+ "addStoredBlock request received for "
+ block + " on " + node.getName()
+ " size " + block.getNumBytes()
+ " But it does not belong to any file.");
NameNode.stateChangeLog.info("BLOCK* addStoredBlock: " + block + " on "
+ node.getName() + " size " + block.getNumBytes()
+ " but it does not belong to any file.");
// we could add this block to invalidate set of this datanode.
// it will happen in next block report otherwise.
return block;
@ -1636,13 +1719,13 @@ public class BlockManager {
if (added) {
curReplicaDelta = 1;
if (logEveryBlock) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
NameNode.stateChangeLog.info("BLOCK* addStoredBlock: "
+ "blockMap updated: " + node.getName() + " is added to " +
storedBlock + " size " + storedBlock.getNumBytes());
}
} else {
curReplicaDelta = 0;
NameNode.stateChangeLog.warn("BLOCK* NameSystem.addStoredBlock: "
NameNode.stateChangeLog.warn("BLOCK* addStoredBlock: "
+ "Redundant addStoredBlock request received for " + storedBlock
+ " on " + node.getName() + " size " + storedBlock.getNumBytes());
}
@ -1778,13 +1861,39 @@ public class BlockManager {
LOG.info("Number of over-replicated blocks = " + nrOverReplicated);
}
/** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl,
final String src, final Block... blocks) throws IOException {
if (newRepl == oldRepl) {
return;
}
// update needReplication priority queues
for(Block b : blocks) {
updateNeededReplications(b, 0, newRepl-oldRepl);
}
if (oldRepl > newRepl) {
// old replication > the new one; need to remove copies
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
for(Block b : blocks) {
processOverReplicatedBlock(b, newRepl, null, null);
}
} else { // replication factor is increased
LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
}
}
/**
* Find how many of the containing nodes are "extra", if any.
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
public void processOverReplicatedBlock(Block block, short replication,
DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
private void processOverReplicatedBlock(final Block block,
final short replication, final DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint) {
assert namesystem.hasWriteLock();
if (addedNode == delNodeHint) {
delNodeHint = null;
@ -1806,12 +1915,112 @@ public class BlockManager {
}
}
}
namesystem.chooseExcessReplicates(nonExcess, block, replication,
chooseExcessReplicates(nonExcess, block, replication,
addedNode, delNodeHint, blockplacement);
}
public void addToExcessReplicate(DatanodeInfo dn, Block block) {
/**
* We want "replication" replicates for the block, but we now have too many.
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
*
* srcNodes.size() - dstNodes.size() == replication
*
* We pick node that make sure that replicas are spread across racks and
* also try hard to pick one with least free space.
* The algorithm is first to pick a node with least free space from nodes
* that are on a rack holding more than one replicas of the block.
* So removing such a replica won't remove a rack.
* If no such a node is available,
* then pick a node with least free space
*/
private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
Block b, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
BlockPlacementPolicy replicator) {
assert namesystem.hasWriteLock();
// first form a rack to datanodes map and
INodeFile inode = getINode(b);
final Map<String, List<DatanodeDescriptor>> rackMap
= new HashMap<String, List<DatanodeDescriptor>>();
for(final Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
iter.hasNext(); ) {
final DatanodeDescriptor node = iter.next();
final String rackName = node.getNetworkLocation();
List<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
if (datanodeList == null) {
datanodeList = new ArrayList<DatanodeDescriptor>();
rackMap.put(rackName, datanodeList);
}
datanodeList.add(node);
}
// split nodes into two sets
// priSet contains nodes on rack with more than one replica
// remains contains the remaining nodes
final List<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
for(List<DatanodeDescriptor> datanodeList : rackMap.values()) {
if (datanodeList.size() == 1 ) {
remains.add(datanodeList.get(0));
} else {
priSet.addAll(datanodeList);
}
}
// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
boolean firstOne = true;
while (nonExcess.size() - replication > 0) {
// check if we can delete delNodeHint
final DatanodeInfo cur;
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint)
&& (priSet.contains(delNodeHint)
|| (addedNode != null && !priSet.contains(addedNode))) ) {
cur = delNodeHint;
} else { // regular excessive replica removal
cur = replicator.chooseReplicaToDelete(inode, b, replication,
priSet, remains);
}
firstOne = false;
// adjust rackmap, priSet, and remains
String rack = cur.getNetworkLocation();
final List<DatanodeDescriptor> datanodes = rackMap.get(rack);
datanodes.remove(cur);
if (datanodes.isEmpty()) {
rackMap.remove(rack);
}
if (priSet.remove(cur)) {
if (datanodes.size() == 1) {
priSet.remove(datanodes.get(0));
remains.add(datanodes.get(0));
}
} else {
remains.remove(cur);
}
nonExcess.remove(cur);
addToExcessReplicate(cur, b);
//
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
// is when we get a "removeBlock" message.
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
addToInvalidates(b, cur);
NameNode.stateChangeLog.info("BLOCK* chooseExcessReplicates: "
+"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
}
}
private void addToExcessReplicate(DatanodeInfo dn, Block block) {
assert namesystem.hasWriteLock();
Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
if (excessBlocks == null) {
@ -1821,7 +2030,7 @@ public class BlockManager {
if (excessBlocks.add(block)) {
excessBlocksCount++;
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.chooseExcessReplicates:"
NameNode.stateChangeLog.debug("BLOCK* addToExcessReplicate:"
+ " (" + dn.getName() + ", " + block
+ ") is added to excessReplicateMap");
}
@ -1834,14 +2043,14 @@ public class BlockManager {
*/
private void removeStoredBlock(Block block, DatanodeDescriptor node) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
+ block + " from " + node.getName());
}
assert (namesystem.hasWriteLock());
{
if (!blocksMap.removeNode(block, node)) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
+ block + " has already been removed from node " + node);
}
return;
@ -1869,8 +2078,7 @@ public class BlockManager {
if (excessBlocks.remove(block)) {
excessBlocksCount--;
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.removeStoredBlock: "
NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
+ block + " is removed from excessBlocks");
}
if (excessBlocks.size() == 0) {
@ -1902,7 +2110,7 @@ public class BlockManager {
/**
* The given node is reporting that it received a certain block.
*/
public void addBlock(DatanodeDescriptor node, Block block, String delHint)
private void addBlock(DatanodeDescriptor node, Block block, String delHint)
throws IOException {
// decrement number of blocks scheduled to this datanode.
node.decBlocksScheduled();
@ -1912,9 +2120,8 @@ public class BlockManager {
if (delHint != null && delHint.length() != 0) {
delHintNode = datanodeManager.getDatanode(delHint);
if (delHintNode == null) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: "
+ block + " is expected to be removed from an unrecorded node "
+ delHint);
NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + block
+ " is expected to be removed from an unrecorded node " + delHint);
}
}
@ -1942,7 +2149,7 @@ public class BlockManager {
addStoredBlock(b, node, delHintNode, true);
}
for (Block b : toInvalidate) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.addBlock: block "
NameNode.stateChangeLog.info("BLOCK* addBlock: block "
+ b + " on " + node.getName() + " size " + b.getNumBytes()
+ " does not belong to any file.");
addToInvalidates(b, node);
@ -1952,6 +2159,30 @@ public class BlockManager {
}
}
/** The given node is reporting that it received a certain block. */
public void blockReceived(final DatanodeID nodeID, final String poolId,
final Block block, final String delHint) throws IOException {
namesystem.writeLock();
try {
final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
if (node == null || !node.isAlive) {
final String s = block + " is received from dead or unregistered node "
+ nodeID.getName();
NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s);
throw new IOException(s);
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block
+ " is received from " + nodeID.getName());
}
addBlock(node, block, delHint);
} finally {
namesystem.writeUnlock();
}
}
/**
* Return the number of nodes that are live and decommissioned.
*/
@ -2142,9 +2373,9 @@ public class BlockManager {
return b;
}
/* updates a block in under replication queue */
public void updateNeededReplications(Block block, int curReplicasDelta,
int expectedReplicasDelta) {
/** updates a block in under replication queue */
private void updateNeededReplications(final Block block,
final int curReplicasDelta, int expectedReplicasDelta) {
namesystem.writeLock();
try {
NumberReplicas repl = countNodes(block);
@ -2303,8 +2534,9 @@ public class BlockManager {
return blocksMap.getINode(b);
}
public void removeFromCorruptReplicasMap(Block block) {
corruptReplicas.removeFromCorruptReplicasMap(block);
/** @return an iterator of the datanodes. */
public Iterator<DatanodeDescriptor> datanodeIterator(final Block block) {
return blocksMap.nodeIterator(block);
}
public int numCorruptReplicas(Block block) {
@ -2313,6 +2545,8 @@ public class BlockManager {
public void removeBlockFromMap(Block block) {
blocksMap.removeBlock(block);
// If block is removed from blocksMap remove it from corruptReplicasMap
corruptReplicas.removeFromCorruptReplicasMap(block);
}
public int getCapacity() {

View File

@ -29,7 +29,7 @@ import org.apache.hadoop.hdfs.util.LightWeightGSet;
* block's metadata currently includes INode it belongs to and
* the datanodes that store the block.
*/
public class BlocksMap {
class BlocksMap {
private static class NodeIterator implements Iterator<DatanodeDescriptor> {
private BlockInfo blockInfo;
private int nextIdx = 0;
@ -101,7 +101,7 @@ public class BlocksMap {
/**
* Add block b belonging to the specified file inode to the map.
*/
public BlockInfo addINode(BlockInfo b, INodeFile iNode) {
BlockInfo addINode(BlockInfo b, INodeFile iNode) {
BlockInfo info = blocks.get(b);
if (info != b) {
info = b;
@ -137,7 +137,7 @@ public class BlocksMap {
* Searches for the block in the BlocksMap and
* returns Iterator that iterates through the nodes the block belongs to.
*/
public Iterator<DatanodeDescriptor> nodeIterator(Block b) {
Iterator<DatanodeDescriptor> nodeIterator(Block b) {
return nodeIterator(blocks.get(b));
}
@ -182,27 +182,6 @@ public class BlocksMap {
Iterable<BlockInfo> getBlocks() {
return blocks;
}
/**
* Check if the block exists in map
*/
public boolean contains(Block block) {
return blocks.contains(block);
}
/**
* Check if the replica at the given datanode exists in map
*/
boolean contains(Block block, DatanodeDescriptor datanode) {
BlockInfo info = blocks.get(block);
if (info == null)
return false;
if (-1 == info.findDatanode(datanode))
return false;
return true;
}
/** Get the capacity of the HashMap that stores blocks */
int getCapacity() {

View File

@ -34,6 +34,7 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -405,7 +406,7 @@ public class DatanodeManager {
* @param nodeList
* , array list of live or dead nodes.
*/
public void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
private void removeDecomNodeFromList(final List<DatanodeDescriptor> nodeList) {
// If the include list is empty, any nodes are welcomed and it does not
// make sense to exclude any nodes from the cluster. Therefore, no remove.
if (hostsReader.getHosts().isEmpty()) {
@ -563,7 +564,7 @@ public class DatanodeManager {
nodeReg.getInfoPort(),
nodeReg.getIpcPort());
nodeReg.updateRegInfo(dnReg);
nodeReg.exportedKeys = namesystem.getBlockManager().getBlockKeys();
nodeReg.exportedKeys = blockManager.getBlockKeys();
NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
+ "node registration from " + nodeReg.getName()
@ -710,16 +711,59 @@ public class DatanodeManager {
return numDead;
}
/** @return list of datanodes where decommissioning is in progress. */
public List<DatanodeDescriptor> getDecommissioningNodes() {
namesystem.readLock();
try {
final List<DatanodeDescriptor> decommissioningNodes
= new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> results = getDatanodeListForReport(
DatanodeReportType.LIVE);
for(DatanodeDescriptor node : results) {
if (node.isDecommissionInProgress()) {
decommissioningNodes.add(node);
}
}
return decommissioningNodes;
} finally {
namesystem.readUnlock();
}
}
/** Fetch live and dead datanodes. */
public void fetchDatanodess(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead) {
final List<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
for(DatanodeDescriptor node : results) {
if (isDatanodeDead(node))
dead.add(node);
else
live.add(node);
public void fetchDatanodes(final List<DatanodeDescriptor> live,
final List<DatanodeDescriptor> dead, final boolean removeDecommissionNode) {
if (live == null && dead == null) {
throw new HadoopIllegalArgumentException("Both live and dead lists are null");
}
namesystem.readLock();
try {
final List<DatanodeDescriptor> results =
getDatanodeListForReport(DatanodeReportType.ALL);
for(DatanodeDescriptor node : results) {
if (isDatanodeDead(node)) {
if (dead != null) {
dead.add(node);
}
} else {
if (live != null) {
live.add(node);
}
}
}
} finally {
namesystem.readUnlock();
}
if (removeDecommissionNode) {
if (live != null) {
removeDecomNodeFromList(live);
}
if (dead != null) {
removeDecomNodeFromList(dead);
}
}
}
@ -847,7 +891,7 @@ public class DatanodeManager {
blockPoolId, blks));
}
namesystem.addKeyUpdateCommand(cmds, nodeinfo);
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
// check for balancer bandwidth update
if (nodeinfo.getBalancerBandwidth() > 0) {

View File

@ -26,11 +26,11 @@ import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.TreeSet;
import javax.servlet.ServletContext;
@ -249,7 +249,7 @@ public class JspHelper {
out.print("</tbody></table>");
}
public static void sortNodeList(ArrayList<DatanodeDescriptor> nodes,
public static void sortNodeList(final List<DatanodeDescriptor> nodes,
String field, String order) {
class NodeComapare implements Comparator<DatanodeDescriptor> {

View File

@ -444,8 +444,6 @@ public class FSDirectory implements Closeable {
// modify file-> block and blocksMap
fileNode.removeLastBlock(block);
getBlockManager().removeBlockFromMap(block);
// If block is removed from blocksMap remove it from corruptReplicasMap
getBlockManager().removeFromCorruptReplicasMap(block);
// write modified block locations to log
fsImage.getEditLog().logOpenFile(path, fileNode);
@ -809,7 +807,7 @@ public class FSDirectory implements Closeable {
* @return array of file blocks
* @throws QuotaExceededException
*/
Block[] setReplication(String src, short replication, int[] oldReplication)
Block[] setReplication(String src, short replication, short[] oldReplication)
throws QuotaExceededException, UnresolvedLinkException {
waitForReady();
Block[] fileBlocks = null;
@ -826,14 +824,10 @@ public class FSDirectory implements Closeable {
Block[] unprotectedSetReplication(String src,
short replication,
int[] oldReplication
short[] oldReplication
) throws QuotaExceededException,
UnresolvedLinkException {
assert hasWriteLock();
if (oldReplication == null) {
oldReplication = new int[1];
}
oldReplication[0] = -1;
INode[] inodes = rootDir.getExistingPathINodes(src, true);
INode inode = inodes[inodes.length - 1];
@ -845,14 +839,17 @@ public class FSDirectory implements Closeable {
return null;
}
INodeFile fileNode = (INodeFile)inode;
oldReplication[0] = fileNode.getReplication();
final short oldRepl = fileNode.getReplication();
// check disk quota
long dsDelta = (replication - oldReplication[0]) *
(fileNode.diskspaceConsumed()/oldReplication[0]);
long dsDelta = (replication - oldRepl) * (fileNode.diskspaceConsumed()/oldRepl);
updateCount(inodes, inodes.length-1, 0, dsDelta, true);
fileNode.setReplication(replication);
if (oldReplication != null) {
oldReplication[0] = oldRepl;
}
return fileNode.getBlocks();
}
@ -2075,8 +2072,9 @@ public class FSDirectory implements Closeable {
size = fileNode.computeFileSize(true);
replication = fileNode.getReplication();
blocksize = fileNode.getPreferredBlockSize();
loc = getFSNamesystem().getBlockLocationsInternal(
fileNode, 0L, size, false);
loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
fileNode.getBlocks(), fileNode.computeFileSize(false),
fileNode.isUnderConstruction(), 0L, size, false);
if (loc==null) {
loc = new LocatedBlocks();
}

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.DataInputStream;
import java.io.File;
import java.io.FilterInputStream;
import java.io.IOException;
@ -144,8 +143,8 @@ public class FSEditLogLoader {
// versions > 0 support per file replication
// get name and replication
short replication
= fsNamesys.adjustReplication(addCloseOp.replication);
final short replication = fsNamesys.getBlockManager(
).adjustReplication(addCloseOp.replication);
long blockSize = addCloseOp.blockSize;
BlockInfo blocks[] = new BlockInfo[addCloseOp.blocks.length];
@ -218,8 +217,8 @@ public class FSEditLogLoader {
}
case OP_SET_REPLICATION: {
SetReplicationOp setReplicationOp = (SetReplicationOp)op;
short replication
= fsNamesys.adjustReplication(setReplicationOp.replication);
short replication = fsNamesys.getBlockManager().adjustReplication(
setReplicationOp.replication);
fsDir.unprotectedSetReplication(setReplicationOp.path,
replication, null);
break;

View File

@ -330,7 +330,7 @@ class FSImageFormat {
int imgVersion = getLayoutVersion();
short replication = in.readShort();
replication = namesystem.adjustReplication(replication);
replication = namesystem.getBlockManager().adjustReplication(replication);
modificationTime = in.readLong();
if (LayoutVersion.supports(Feature.FILE_ACCESS_TIME, imgVersion)) {
atime = in.readLong();

View File

@ -39,10 +39,8 @@ import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -74,7 +72,6 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@ -88,12 +85,12 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStatistics;
@ -110,11 +107,9 @@ import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
@ -394,7 +389,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
return this.fsLock.getReadHoldCount() > 0;
}
boolean hasReadOrWriteLock() {
public boolean hasReadOrWriteLock() {
return hasReadLock() || hasWriteLock();
}
@ -534,14 +529,12 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
long totalInodes = this.dir.totalInodes();
long totalBlocks = this.getBlocksTotal();
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
this.DFSNodesStatus(live, dead);
String str = totalInodes + " files and directories, " + totalBlocks
+ " blocks = " + (totalInodes + totalBlocks) + " total";
out.println(str);
out.println(totalInodes + " files and directories, " + totalBlocks
+ " blocks = " + (totalInodes + totalBlocks) + " total");
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(live, dead, false);
out.println("Live Datanodes: "+live.size());
out.println("Dead Datanodes: "+dead.size());
blockManager.metaSave(out);
@ -750,7 +743,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
dir.setTimes(src, inode, -1, now, false);
}
return getBlockLocationsInternal(inode, offset, length, needBlockToken);
return blockManager.createLocatedBlocks(inode.getBlocks(),
inode.computeFileSize(false), inode.isUnderConstruction(),
offset, length, needBlockToken);
} finally {
if (attempt == 0) {
readUnlock();
@ -761,44 +756,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
return null; // can never reach here
}
LocatedBlocks getBlockLocationsInternal(INodeFile inode,
long offset, long length, boolean needBlockToken)
throws IOException {
assert hasReadOrWriteLock();
final BlockInfo[] blocks = inode.getBlocks();
if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
}
if (blocks == null) {
return null;
}
if (blocks.length == 0) {
return new LocatedBlocks(0, inode.isUnderConstruction(),
Collections.<LocatedBlock>emptyList(), null, false);
} else {
final long n = inode.computeFileSize(false);
final List<LocatedBlock> locatedblocks = blockManager.getBlockLocations(
blocks, offset, length, Integer.MAX_VALUE);
final BlockInfo last = inode.getLastBlock();
if (LOG.isDebugEnabled()) {
LOG.debug("last = " + last);
}
LocatedBlock lastBlock = last.isComplete() ? blockManager
.getBlockLocation(last, n - last.getNumBytes()) : blockManager
.getBlockLocation(last, n);
if (blockManager.isBlockTokenEnabled() && needBlockToken) {
blockManager.setBlockTokens(locatedblocks);
blockManager.setBlockToken(lastBlock);
}
return new LocatedBlocks(n, inode.isUnderConstruction(), locatedblocks,
lastBlock, last.isComplete());
}
}
/**
* Moves all the blocks from srcs and appends them to trg
@ -960,7 +917,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* The access time is precise upto an hour. The transaction, if needed, is
* written to the edits log but is not flushed.
*/
public void setTimes(String src, long mtime, long atime)
void setTimes(String src, long mtime, long atime)
throws IOException, UnresolvedLinkException {
if (!isAccessTimeSupported() && atime != -1) {
throw new IOException("Access time for hdfs is not configured. " +
@ -1060,60 +1017,37 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* @return true if successful;
* false if file does not exist or is a directory
*/
public boolean setReplication(String src, short replication)
throws IOException, UnresolvedLinkException {
boolean status = false;
boolean setReplication(final String src, final short replication
) throws IOException {
blockManager.verifyReplication(src, replication, null);
final boolean isFile;
writeLock();
try {
if (isInSafeMode()) {
throw new SafeModeException("Cannot set replication for " + src, safeMode);
}
status = setReplicationInternal(src, replication);
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
}
final short[] oldReplication = new short[1];
final Block[] blocks = dir.setReplication(src, replication, oldReplication);
isFile = blocks != null;
if (isFile) {
blockManager.setReplication(oldReplication[0], replication, src, blocks);
}
} finally {
writeUnlock();
}
getEditLog().logSync();
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
if (isFile && auditLog.isInfoEnabled() && isExternalInvocation()) {
logAuditEvent(UserGroupInformation.getCurrentUser(),
Server.getRemoteIp(),
"setReplication", src, null, null);
}
return status;
}
private boolean setReplicationInternal(String src,
short replication) throws AccessControlException, QuotaExceededException,
SafeModeException, UnresolvedLinkException, IOException {
assert hasWriteLock();
blockManager.verifyReplication(src, replication, null);
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
}
int[] oldReplication = new int[1];
Block[] fileBlocks;
fileBlocks = dir.setReplication(src, replication, oldReplication);
if (fileBlocks == null) // file not found or is a directory
return false;
int oldRepl = oldReplication[0];
if (oldRepl == replication) // the same replication
return true;
// update needReplication priority queues
for(int idx = 0; idx < fileBlocks.length; idx++)
blockManager.updateNeededReplications(fileBlocks[idx], 0, replication-oldRepl);
if (oldRepl > replication) {
// old replication > the new one; need to remove copies
LOG.info("Reducing replication for file " + src
+ ". New replication is " + replication);
for(int idx = 0; idx < fileBlocks.length; idx++)
blockManager.processOverReplicatedBlock(fileBlocks[idx], replication, null, null);
} else { // replication factor is increased
LOG.info("Increasing replication for file " + src
+ ". New replication is " + replication);
}
return true;
return isFile;
}
long getPreferredBlockSize(String filename)
@ -1287,9 +1221,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
LocatedBlock lb =
blockManager.convertLastBlockToUnderConstruction(cons);
if (lb != null && blockManager.isBlockTokenEnabled()) {
lb.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(lb.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
if (lb != null) {
blockManager.setBlockToken(lb, AccessMode.WRITE);
}
return lb;
} else {
@ -1456,7 +1389,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
try {
lb = startFileInternal(src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND),
false, (short)blockManager.maxReplication, (long)0);
false, blockManager.maxReplication, (long)0);
} finally {
writeUnlock();
}
@ -1577,10 +1510,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
// Create next block
LocatedBlock b = new LocatedBlock(getExtendedBlock(newBlock), targets, fileLength);
if (blockManager.isBlockTokenEnabled()) {
b.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(b.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
blockManager.setBlockToken(b, BlockTokenSecretManager.AccessMode.WRITE);
return b;
}
@ -1626,17 +1556,14 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
).chooseTarget(src, numAdditionalNodes, clientnode, chosen, true,
excludes, preferredblocksize);
final LocatedBlock lb = new LocatedBlock(blk, targets);
if (blockManager.isBlockTokenEnabled()) {
lb.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(lb.getBlock(),
EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
}
blockManager.setBlockToken(lb, AccessMode.COPY);
return lb;
}
/**
* The client would like to let go of the given block
*/
public boolean abandonBlock(ExtendedBlock b, String src, String holder)
boolean abandonBlock(ExtendedBlock b, String src, String holder)
throws LeaseExpiredException, FileNotFoundException,
UnresolvedLinkException, IOException {
writeLock();
@ -1821,23 +1748,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
}
/**
* Mark the block belonging to datanode as corrupt
* @param blk Block to be marked as corrupt
* @param dn Datanode which holds the corrupt replica
*/
public void markBlockAsCorrupt(ExtendedBlock blk, DatanodeInfo dn)
throws IOException {
writeLock();
try {
blockManager.findAndMarkBlockAsCorrupt(blk.getLocalBlock(), dn);
} finally {
writeUnlock();
}
}
////////////////////////////////////////////////////////////////
// Here's how to handle block-copy failure during client write:
// -- As usual, the client's write should result in a streaming
@ -2621,16 +2531,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
}
public void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
final DatanodeDescriptor nodeinfo) {
// check access key update
if (blockManager.isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
cmds.add(new KeyUpdateCommand(blockManager.getBlockTokenSecretManager().exportKeys()));
nodeinfo.needKeyUpdate = false;
}
}
/**
* Returns whether or not there were available resources at the last check of
* resources.
@ -2688,179 +2588,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
FSEditLog getEditLog() {
return getFSImage().getEditLog();
}
/**
* The given node is reporting all its blocks. Use this info to
* update the (machine-->blocklist) and (block-->machinelist) tables.
*/
void processReport(DatanodeID nodeID, String poolId,
BlockListAsLongs newReport) throws IOException {
long startTime, endTime;
writeLock();
startTime = now(); //after acquiring write lock
try {
final DatanodeDescriptor node = blockManager.getDatanodeManager(
).getDatanode(nodeID);
if (node == null || !node.isAlive) {
throw new IOException("ProcessReport from dead or unregistered node: "
+ nodeID.getName());
}
// To minimize startup time, we discard any second (or later) block reports
// that we receive while still in startup phase.
if (isInStartupSafeMode() && node.numBlocks() > 0) {
NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: "
+ "discarded non-initial block report from " + nodeID.getName()
+ " because namenode still in startup phase");
return;
}
blockManager.processReport(node, newReport);
} finally {
endTime = now();
writeUnlock();
}
// Log the block report processing stats from Namenode perspective
NameNode.getNameNodeMetrics().addBlockReport((int) (endTime - startTime));
NameNode.stateChangeLog.info("BLOCK* NameSystem.processReport: from "
+ nodeID.getName() + ", blocks: " + newReport.getNumberOfBlocks()
+ ", processing time: " + (endTime - startTime) + " msecs");
}
/**
* We want "replication" replicates for the block, but we now have too many.
* In this method, copy enough nodes from 'srcNodes' into 'dstNodes' such that:
*
* srcNodes.size() - dstNodes.size() == replication
*
* We pick node that make sure that replicas are spread across racks and
* also try hard to pick one with least free space.
* The algorithm is first to pick a node with least free space from nodes
* that are on a rack holding more than one replicas of the block.
* So removing such a replica won't remove a rack.
* If no such a node is available,
* then pick a node with least free space
*/
public void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
Block b, short replication,
DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint,
BlockPlacementPolicy replicator) {
assert hasWriteLock();
// first form a rack to datanodes map and
INodeFile inode = blockManager.getINode(b);
HashMap<String, ArrayList<DatanodeDescriptor>> rackMap =
new HashMap<String, ArrayList<DatanodeDescriptor>>();
for (Iterator<DatanodeDescriptor> iter = nonExcess.iterator();
iter.hasNext();) {
DatanodeDescriptor node = iter.next();
String rackName = node.getNetworkLocation();
ArrayList<DatanodeDescriptor> datanodeList = rackMap.get(rackName);
if(datanodeList==null) {
datanodeList = new ArrayList<DatanodeDescriptor>();
}
datanodeList.add(node);
rackMap.put(rackName, datanodeList);
}
// split nodes into two sets
// priSet contains nodes on rack with more than one replica
// remains contains the remaining nodes
ArrayList<DatanodeDescriptor> priSet = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> remains = new ArrayList<DatanodeDescriptor>();
for( Iterator<Entry<String, ArrayList<DatanodeDescriptor>>> iter =
rackMap.entrySet().iterator(); iter.hasNext(); ) {
Entry<String, ArrayList<DatanodeDescriptor>> rackEntry = iter.next();
ArrayList<DatanodeDescriptor> datanodeList = rackEntry.getValue();
if( datanodeList.size() == 1 ) {
remains.add(datanodeList.get(0));
} else {
priSet.addAll(datanodeList);
}
}
// pick one node to delete that favors the delete hint
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
boolean firstOne = true;
while (nonExcess.size() - replication > 0) {
DatanodeInfo cur = null;
// check if we can del delNodeHint
if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) &&
(priSet.contains(delNodeHint) || (addedNode != null && !priSet.contains(addedNode))) ) {
cur = delNodeHint;
} else { // regular excessive replica removal
cur = replicator.chooseReplicaToDelete(inode, b, replication, priSet, remains);
}
firstOne = false;
// adjust rackmap, priSet, and remains
String rack = cur.getNetworkLocation();
ArrayList<DatanodeDescriptor> datanodes = rackMap.get(rack);
datanodes.remove(cur);
if(datanodes.isEmpty()) {
rackMap.remove(rack);
}
if( priSet.remove(cur) ) {
if (datanodes.size() == 1) {
priSet.remove(datanodes.get(0));
remains.add(datanodes.get(0));
}
} else {
remains.remove(cur);
}
nonExcess.remove(cur);
blockManager.addToExcessReplicate(cur, b);
//
// The 'excessblocks' tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
// is when we get a "removeBlock" message.
//
// The 'invalidate' list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
blockManager.addToInvalidates(b, cur);
NameNode.stateChangeLog.info("BLOCK* NameSystem.chooseExcessReplicates: "
+"("+cur.getName()+", "+b+") is added to recentInvalidateSets");
}
}
/**
* The given node is reporting that it received a certain block.
*/
public void blockReceived(DatanodeID nodeID,
String poolId,
Block block,
String delHint
) throws IOException {
writeLock();
try {
final DatanodeDescriptor node = blockManager.getDatanodeManager(
).getDatanode(nodeID);
if (node == null || !node.isAlive) {
NameNode.stateChangeLog.warn("BLOCK* NameSystem.blockReceived: " + block
+ " is received from dead or unregistered node " + nodeID.getName());
throw new IOException(
"Got blockReceived message from unregistered or dead node " + block);
}
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("BLOCK* NameSystem.blockReceived: "
+block+" is received from " + nodeID.getName());
}
blockManager.addBlock(node, block, delHint);
} finally {
writeUnlock();
}
}
}
private void checkBlock(ExtendedBlock block) throws IOException {
if (block != null && !this.blockPoolId.equals(block.getBlockPoolId())) {
@ -3009,43 +2737,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
}
/**
*/
public void DFSNodesStatus(ArrayList<DatanodeDescriptor> live,
ArrayList<DatanodeDescriptor> dead) {
readLock();
try {
getBlockManager().getDatanodeManager().fetchDatanodess(live, dead);
} finally {
readUnlock();
}
}
public Date getStartTime() {
return new Date(systemStart);
}
short getMaxReplication() { return (short)blockManager.maxReplication; }
short getMinReplication() { return (short)blockManager.minReplication; }
short getDefaultReplication() { return (short)blockManager.defaultReplication; }
/**
* Clamp the specified replication between the minimum and maximum
* replication levels for this namesystem.
*/
short adjustReplication(short replication) {
short minReplication = getMinReplication();
if (replication < minReplication) {
replication = minReplication;
}
short maxReplication = getMaxReplication();
if (replication > maxReplication) {
replication = maxReplication;
}
return replication;
}
/**
* Rereads the config to get hosts and exclude list file names.
* Rereads the files to update the hosts and exclude lists. It
@ -3740,10 +3435,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
writeUnlock();
}
}
public RemoteEditLogManifest getEditLogManifest(long sinceTxId) throws IOException {
return getEditLog().getEditLogManifest(sinceTxId);
}
NamenodeCommand startCheckpoint(
NamenodeRegistration bnReg, // backup node
@ -3968,7 +3659,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
/**
* shutdown FSNamesystem
*/
public void shutdown() {
void shutdown() {
if (mbeanName != null)
MBeans.unregister(mbeanName);
}
@ -4069,10 +3760,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
// get a new generation stamp and an access token
block.setGenerationStamp(nextGenerationStamp());
locatedBlock = new LocatedBlock(block, new DatanodeInfo[0]);
if (blockManager.isBlockTokenEnabled()) {
locatedBlock.setBlockToken(blockManager.getBlockTokenSecretManager().generateToken(
block, EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)));
}
blockManager.setBlockToken(locatedBlock, AccessMode.WRITE);
} finally {
writeUnlock();
}
@ -4273,26 +3961,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
return blockManager.numCorruptReplicas(blk);
}
/**
* Return a range of corrupt replica block ids. Up to numExpectedBlocks
* blocks starting at the next block after startingBlockId are returned
* (fewer if numExpectedBlocks blocks are unavailable). If startingBlockId
* is null, up to numExpectedBlocks blocks are returned from the beginning.
* If startingBlockId cannot be found, null is returned.
*
* @param numExpectedBlocks Number of block ids to return.
* 0 <= numExpectedBlocks <= 100
* @param startingBlockId Block id from which to start. If null, start at
* beginning.
* @return Up to numExpectedBlocks blocks from startingBlockId if it exists
*
*/
long[] getCorruptReplicaBlockIds(int numExpectedBlocks,
Long startingBlockId) {
return blockManager.getCorruptReplicaBlockIds(numExpectedBlocks,
startingBlockId);
}
static class CorruptFileBlockInfo {
String path;
Block block;
@ -4354,28 +4022,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
}
/**
* @return list of datanodes where decommissioning is in progress
*/
public ArrayList<DatanodeDescriptor> getDecommissioningNodes() {
readLock();
try {
ArrayList<DatanodeDescriptor> decommissioningNodes =
new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> results = getBlockManager(
).getDatanodeManager().getDatanodeListForReport(DatanodeReportType.LIVE);
for (Iterator<DatanodeDescriptor> it = results.iterator(); it.hasNext();) {
DatanodeDescriptor node = it.next();
if (node.isDecommissionInProgress()) {
decommissioningNodes.add(node);
}
}
return decommissioningNodes;
} finally {
readUnlock();
}
}
/**
* Create delegation token secret manager
*/
@ -4406,7 +4052,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* @return Token<DelegationTokenIdentifier>
* @throws IOException
*/
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
Token<DelegationTokenIdentifier> token;
writeLock();
@ -4686,13 +4332,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
public String getLiveNodes() {
final Map<String, Map<String,Object>> info =
new HashMap<String, Map<String,Object>>();
final ArrayList<DatanodeDescriptor> liveNodeList =
new ArrayList<DatanodeDescriptor>();
final ArrayList<DatanodeDescriptor> deadNodeList =
new ArrayList<DatanodeDescriptor>();
DFSNodesStatus(liveNodeList, deadNodeList);
removeDecomNodeFromList(liveNodeList);
for (DatanodeDescriptor node : liveNodeList) {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
for (DatanodeDescriptor node : live) {
final Map<String, Object> innerinfo = new HashMap<String, Object>();
innerinfo.put("lastContact", getLastContact(node));
innerinfo.put("usedSpace", getDfsUsed(node));
@ -4710,14 +4352,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
public String getDeadNodes() {
final Map<String, Map<String, Object>> info =
new HashMap<String, Map<String, Object>>();
final ArrayList<DatanodeDescriptor> liveNodeList =
new ArrayList<DatanodeDescriptor>();
final ArrayList<DatanodeDescriptor> deadNodeList =
new ArrayList<DatanodeDescriptor>();
// we need to call DFSNodeStatus to filter out the dead data nodes
DFSNodesStatus(liveNodeList, deadNodeList);
removeDecomNodeFromList(deadNodeList);
for (DatanodeDescriptor node : deadNodeList) {
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
blockManager.getDatanodeManager().fetchDatanodes(null, dead, true);
for (DatanodeDescriptor node : dead) {
final Map<String, Object> innerinfo = new HashMap<String, Object>();
innerinfo.put("lastContact", getLastContact(node));
innerinfo.put("decommissioned", node.isDecommissioned());
@ -4734,8 +4371,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
public String getDecomNodes() {
final Map<String, Map<String, Object>> info =
new HashMap<String, Map<String, Object>>();
final ArrayList<DatanodeDescriptor> decomNodeList =
this.getDecommissioningNodes();
final List<DatanodeDescriptor> decomNodeList = blockManager.getDatanodeManager(
).getDecommissioningNodes();
for (DatanodeDescriptor node : decomNodeList) {
final Map<String, Object> innerinfo = new HashMap<String, Object>();
innerinfo.put("underReplicatedBlocks", node.decommissioningStatus
@ -4771,18 +4408,4 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
public BlockManager getBlockManager() {
return blockManager;
}
void removeDecomNodeFromList(List<DatanodeDescriptor> nodeList) {
getBlockManager().getDatanodeManager().removeDecomNodeFromList(nodeList);
}
/**
* Tell all datanodes to use a new, non-persistent bandwidth value for
* dfs.datanode.balance.bandwidthPerSec.
* @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
* @throws IOException
*/
public void setBalancerBandwidth(long bandwidth) throws IOException {
getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
}
}

View File

@ -30,6 +30,7 @@ import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.security.UserGroupInformation;
/**
@ -59,13 +60,12 @@ public class FsckServlet extends DfsServlet {
NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
final FSNamesystem namesystem = nn.getNamesystem();
final BlockManager bm = namesystem.getBlockManager();
final int totalDatanodes =
namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE);
final short minReplication = namesystem.getMinReplication();
namesystem.getNumberOfDatanodes(DatanodeReportType.LIVE);
new NamenodeFsck(conf, nn,
NamenodeJspHelper.getNetworkTopology(nn), pmap, out,
totalDatanodes, minReplication, remoteAddress).fsck();
bm.getDatanodeManager().getNetworkTopology(), pmap, out,
totalDatanodes, bm.minReplication, remoteAddress).fsck();
return null;
}

View File

@ -855,7 +855,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
DatanodeInfo[] nodes = blocks[i].getLocations();
for (int j = 0; j < nodes.length; j++) {
DatanodeInfo dn = nodes[j];
namesystem.markBlockAsCorrupt(blk, dn);
namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn);
}
}
}
@ -1055,7 +1055,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException {
return namesystem.getEditLogManifest(sinceTxId);
return namesystem.getEditLog().getEditLogManifest(sinceTxId);
}
@Override // ClientProtocol
@ -1096,8 +1096,9 @@ public class NameNode implements NamenodeProtocols, FSConstants {
* @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
* @throws IOException
*/
@Override // ClientProtocol
public void setBalancerBandwidth(long bandwidth) throws IOException {
namesystem.setBalancerBandwidth(bandwidth);
namesystem.getBlockManager().getDatanodeManager().setBalancerBandwidth(bandwidth);
}
@Override // ClientProtocol
@ -1195,7 +1196,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
+ " blocks");
}
namesystem.processReport(nodeReg, poolId, blist);
namesystem.getBlockManager().processReport(nodeReg, poolId, blist);
if (getFSImage().isUpgradeFinalized())
return new DatanodeCommand.Finalize(poolId);
return null;
@ -1210,7 +1211,8 @@ public class NameNode implements NamenodeProtocols, FSConstants {
+"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
}
for (int i = 0; i < blocks.length; i++) {
namesystem.blockReceived(nodeReg, poolId, blocks[i], delHints[i]);
namesystem.getBlockManager().blockReceived(
nodeReg, poolId, blocks[i], delHints[i]);
}
}

View File

@ -42,13 +42,14 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.FSConstants.UpgradeAction;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -229,14 +230,10 @@ class NamenodeJspHelper {
void generateHealthReport(JspWriter out, NameNode nn,
HttpServletRequest request) throws IOException {
FSNamesystem fsn = nn.getNamesystem();
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
fsn.DFSNodesStatus(live, dead);
// If a data node has been first included in the include list,
// then decommissioned, then removed from both include and exclude list.
// We make the web console to "forget" this node by not displaying it.
fsn.removeDecomNodeFromList(live);
fsn.removeDecomNodeFromList(dead);
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
dm.fetchDatanodes(live, dead, true);
int liveDecommissioned = 0;
for (DatanodeDescriptor d : live) {
@ -248,8 +245,7 @@ class NamenodeJspHelper {
deadDecommissioned += d.isDecommissioned() ? 1 : 0;
}
ArrayList<DatanodeDescriptor> decommissioning = fsn
.getDecommissioningNodes();
final List<DatanodeDescriptor> decommissioning = dm.getDecommissioningNodes();
sorterField = request.getParameter("sorter/field");
sorterOrder = request.getParameter("sorter/order");
@ -370,15 +366,10 @@ class NamenodeJspHelper {
return token == null ? null : token.encodeToUrlString();
}
/** @return the network topology. */
static NetworkTopology getNetworkTopology(final NameNode namenode) {
return namenode.getNamesystem().getBlockManager().getDatanodeManager(
).getNetworkTopology();
}
/** @return a randomly chosen datanode. */
static DatanodeDescriptor getRandomDatanode(final NameNode namenode) {
return (DatanodeDescriptor)getNetworkTopology(namenode).chooseRandom(
return (DatanodeDescriptor)namenode.getNamesystem().getBlockManager(
).getDatanodeManager().getNetworkTopology().chooseRandom(
NodeBase.ROOT);
}
@ -564,12 +555,14 @@ class NamenodeJspHelper {
void generateNodesList(ServletContext context, JspWriter out,
HttpServletRequest request) throws IOException {
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
nn.getNamesystem().DFSNodesStatus(live, dead);
nn.getNamesystem().removeDecomNodeFromList(live);
nn.getNamesystem().removeDecomNodeFromList(dead);
final FSNamesystem ns = nn.getNamesystem();
final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
dm.fetchDatanodes(live, dead, true);
InetSocketAddress nnSocketAddress = (InetSocketAddress) context
.getAttribute(NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
String nnaddr = nnSocketAddress.getAddress().getHostAddress() + ":"
@ -678,8 +671,7 @@ class NamenodeJspHelper {
}
} else if (whatNodes.equals("DECOMMISSIONING")) {
// Decommissioning Nodes
ArrayList<DatanodeDescriptor> decommissioning = nn.getNamesystem()
.getDecommissioningNodes();
final List<DatanodeDescriptor> decommissioning = dm.getDecommissioningNodes();
out.print("<br> <a name=\"DecommissioningNodes\" id=\"title\"> "
+ " Decommissioning Datanodes : " + decommissioning.size()
+ "</a><br><br>\n");
@ -715,16 +707,17 @@ class NamenodeJspHelper {
static class XMLBlockInfo {
final Block block;
final INodeFile inode;
final FSNamesystem fsn;
final BlockManager blockManager;
public XMLBlockInfo(FSNamesystem fsn, Long blockId) {
this.fsn = fsn;
XMLBlockInfo(FSNamesystem fsn, Long blockId) {
this.blockManager = fsn.getBlockManager();
if (blockId == null) {
this.block = null;
this.inode = null;
} else {
this.block = new Block(blockId);
this.inode = fsn.getBlockManager().getINode(block);
this.inode = blockManager.getINode(block);
}
}
@ -798,31 +791,25 @@ class NamenodeJspHelper {
}
doc.startTag("replicas");
if (fsn.getBlockManager().blocksMap.contains(block)) {
Iterator<DatanodeDescriptor> it =
fsn.getBlockManager().blocksMap.nodeIterator(block);
for(final Iterator<DatanodeDescriptor> it = blockManager.datanodeIterator(block);
it.hasNext(); ) {
doc.startTag("replica");
while (it.hasNext()) {
doc.startTag("replica");
DatanodeDescriptor dd = it.next();
DatanodeDescriptor dd = it.next();
doc.startTag("host_name");
doc.pcdata(dd.getHostName());
doc.endTag();
doc.startTag("host_name");
doc.pcdata(dd.getHostName());
doc.endTag();
boolean isCorrupt = fsn.getCorruptReplicaBlockIds(0,
block.getBlockId()) != null;
doc.startTag("is_corrupt");
doc.pcdata(""+isCorrupt);
doc.endTag();
doc.endTag(); // </replica>
}
}
boolean isCorrupt = blockManager.getCorruptReplicaBlockIds(0,
block.getBlockId()) != null;
doc.startTag("is_corrupt");
doc.pcdata(""+isCorrupt);
doc.endTag();
doc.endTag(); // </replica>
}
doc.endTag(); // </replicas>
}
@ -834,14 +821,14 @@ class NamenodeJspHelper {
// utility class used in corrupt_replicas_xml.jsp
static class XMLCorruptBlockInfo {
final FSNamesystem fsn;
final Configuration conf;
final Long startingBlockId;
final int numCorruptBlocks;
final BlockManager blockManager;
public XMLCorruptBlockInfo(FSNamesystem fsn, Configuration conf,
XMLCorruptBlockInfo(FSNamesystem fsn, Configuration conf,
int numCorruptBlocks, Long startingBlockId) {
this.fsn = fsn;
this.blockManager = fsn.getBlockManager();
this.conf = conf;
this.numCorruptBlocks = numCorruptBlocks;
this.startingBlockId = startingBlockId;
@ -864,17 +851,16 @@ class NamenodeJspHelper {
doc.endTag();
doc.startTag("num_missing_blocks");
doc.pcdata(""+fsn.getMissingBlocksCount());
doc.pcdata(""+blockManager.getMissingBlocksCount());
doc.endTag();
doc.startTag("num_corrupt_replica_blocks");
doc.pcdata(""+fsn.getCorruptReplicaBlocks());
doc.pcdata(""+blockManager.getCorruptReplicaBlocksCount());
doc.endTag();
doc.startTag("corrupt_replica_block_ids");
long[] corruptBlockIds
= fsn.getCorruptReplicaBlockIds(numCorruptBlocks,
startingBlockId);
final long[] corruptBlockIds = blockManager.getCorruptReplicaBlockIds(
numCorruptBlocks, startingBlockId);
if (corruptBlockIds != null) {
for (Long blockId: corruptBlockIds) {
doc.startTag("block_id");

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
@ -375,10 +376,9 @@ public class DFSTestUtil {
/*
* Return the total capacity of all live DNs.
*/
public static long getLiveDatanodeCapacity(FSNamesystem ns) {
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
ns.DFSNodesStatus(live, dead);
public static long getLiveDatanodeCapacity(DatanodeManager dm) {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
dm.fetchDatanodes(live, null, false);
long capacity = 0;
for (final DatanodeDescriptor dn : live) {
capacity += dn.getCapacity();
@ -389,21 +389,20 @@ public class DFSTestUtil {
/*
* Return the capacity of the given live DN.
*/
public static long getDatanodeCapacity(FSNamesystem ns, int index) {
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
ns.DFSNodesStatus(live, dead);
public static long getDatanodeCapacity(DatanodeManager dm, int index) {
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
dm.fetchDatanodes(live, null, false);
return live.get(index).getCapacity();
}
/*
* Wait for the given # live/dead DNs, total capacity, and # vol failures.
*/
public static void waitForDatanodeStatus(FSNamesystem ns, int expectedLive,
public static void waitForDatanodeStatus(DatanodeManager dm, int expectedLive,
int expectedDead, long expectedVolFails, long expectedTotalCapacity,
long timeout) throws InterruptedException, TimeoutException {
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
final int ATTEMPTS = 10;
int count = 0;
long currTotalCapacity = 0;
@ -413,7 +412,7 @@ public class DFSTestUtil {
Thread.sleep(timeout);
live.clear();
dead.clear();
ns.DFSNodesStatus(live, dead);
dm.fetchDatanodes(live, dead, false);
currTotalCapacity = 0;
volFails = 0;
for (final DatanodeDescriptor dd : live) {

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
/**
* This class tests DatanodeDescriptor.getBlocksScheduled() at the
@ -50,7 +51,9 @@ public class TestBlocksScheduledCounter extends TestCase {
((DFSOutputStream)(out.getWrappedStream())).hflush();
ArrayList<DatanodeDescriptor> dnList = new ArrayList<DatanodeDescriptor>();
cluster.getNamesystem().DFSNodesStatus(dnList, dnList);
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
dm.fetchDatanodes(dnList, dnList, false);
DatanodeDescriptor dn = dnList.get(0);
assertEquals(1, dn.getBlocksScheduled());

View File

@ -146,8 +146,8 @@ public class TestFileCorruption extends TestCase {
// report corrupted block by the third datanode
DatanodeRegistration dnR =
DataNodeTestUtils.getDNRegistrationForBP(dataNode, blk.getBlockPoolId());
cluster.getNamesystem().markBlockAsCorrupt(blk,
new DatanodeInfo(dnR));
cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt(
blk, new DatanodeInfo(dnR));
// open the file
fs.open(FILE_PATH);

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.log4j.Level;
import org.junit.Test;
@ -133,7 +134,7 @@ public class TestBlocksWithNotEnoughRacks {
DFSTestUtil.waitForReplication(cluster, b, 1, REPLICATION_FACTOR, 0);
REPLICATION_FACTOR = 2;
ns.setReplication("/testFile", REPLICATION_FACTOR);
NameNodeAdapter.setReplication(ns, "/testFile", REPLICATION_FACTOR);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
cluster.shutdown();
@ -172,7 +173,7 @@ public class TestBlocksWithNotEnoughRacks {
String newRacks[] = {"/rack2", "/rack2"};
cluster.startDataNodes(conf, 2, true, null, newRacks);
REPLICATION_FACTOR = 5;
ns.setReplication("/testFile", REPLICATION_FACTOR);
NameNodeAdapter.setReplication(ns, "/testFile", REPLICATION_FACTOR);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {
@ -258,7 +259,7 @@ public class TestBlocksWithNotEnoughRacks {
// was not the one that lived on the rack with only one replica,
// ie we should still have 2 racks after reducing the repl factor.
REPLICATION_FACTOR = 2;
ns.setReplication("/testFile", REPLICATION_FACTOR);
NameNodeAdapter.setReplication(ns, "/testFile", REPLICATION_FACTOR);
DFSTestUtil.waitForReplication(cluster, b, 2, REPLICATION_FACTOR, 0);
} finally {

View File

@ -33,10 +33,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.HeartbeatManager;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
public class TestOverReplicatedBlocks extends TestCase {
/** Test processOverReplicatedBlock can handle corrupt replicas fine.
@ -100,7 +99,7 @@ public class TestOverReplicatedBlocks extends TestCase {
}
// decrease the replication factor to 1;
namesystem.setReplication(fileName.toString(), (short)1);
NameNodeAdapter.setReplication(namesystem, fileName.toString(), (short)1);
// corrupt one won't be chosen to be excess one
// without 4910 the number of live replicas would be 0: block gets lost

View File

@ -37,7 +37,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
@ -113,10 +113,11 @@ public class TestDataNodeVolumeFailureReporting {
* heartbeat their capacities.
*/
Thread.sleep(WAIT_FOR_HEARTBEATS);
FSNamesystem ns = cluster.getNamesystem();
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(ns);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(ns, 0);
final long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
File dn1Vol1 = new File(dataDir, "data"+(2*0+1));
File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
@ -160,7 +161,7 @@ public class TestDataNodeVolumeFailureReporting {
assert (WAIT_FOR_HEARTBEATS * 10) > WAIT_FOR_DEATH;
// Eventually the NN should report two volume failures
DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 2,
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
/*
@ -177,10 +178,10 @@ public class TestDataNodeVolumeFailureReporting {
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
ns.DFSNodesStatus(live, dead);
dm.fetchDatanodes(live, dead, false);
live.clear();
dead.clear();
ns.DFSNodesStatus(live, dead);
dm.fetchDatanodes(live, dead, false);
assertEquals("DN3 should have 1 failed volume",
1, live.get(2).getVolumeFailures());
@ -189,8 +190,8 @@ public class TestDataNodeVolumeFailureReporting {
* total capacity should be down by three volumes (assuming the host
* did not grow or shrink the data volume while the test was running).
*/
dnCapacity = DFSTestUtil.getDatanodeCapacity(ns, 0);
DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 3,
dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 3,
origCapacity - (3*dnCapacity), WAIT_FOR_HEARTBEATS);
/*
@ -212,7 +213,7 @@ public class TestDataNodeVolumeFailureReporting {
getMetrics(dns.get(2).getMetrics().name()));
// The NN considers the DN dead
DFSTestUtil.waitForDatanodeStatus(ns, 2, 1, 2,
DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 2,
origCapacity - (4*dnCapacity), WAIT_FOR_HEARTBEATS);
/*
@ -236,7 +237,7 @@ public class TestDataNodeVolumeFailureReporting {
* and that the volume failure count should be reported as zero by
* both the metrics and the NN.
*/
DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 0, origCapacity,
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 0, origCapacity,
WAIT_FOR_HEARTBEATS);
}
@ -251,9 +252,10 @@ public class TestDataNodeVolumeFailureReporting {
cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive();
FSNamesystem ns = cluster.getNamesystem();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(ns);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(ns, 0);
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
// Fail the first volume on both datanodes (we have to keep the
// third healthy so one node in the pipeline will not fail).
@ -267,13 +269,13 @@ public class TestDataNodeVolumeFailureReporting {
DFSTestUtil.waitReplication(fs, file1, (short)2);
// The NN reports two volumes failures
DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 2,
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
// After restarting the NN it still see the two failures
cluster.restartNameNode(0);
cluster.waitActive();
DFSTestUtil.waitForDatanodeStatus(ns, 3, 0, 2,
DFSTestUtil.waitForDatanodeStatus(dm, 3, 0, 2,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
}
}

View File

@ -17,29 +17,30 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeTrue;
/**
* Test the ability of a DN to tolerate volume failures.
@ -154,9 +155,10 @@ public class TestDataNodeVolumeFailureToleration {
conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 0);
cluster.startDataNodes(conf, 2, true, null, null);
cluster.waitActive();
FSNamesystem ns = cluster.getNamesystem();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(ns);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(ns, 0);
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
long origCapacity = DFSTestUtil.getLiveDatanodeCapacity(dm);
long dnCapacity = DFSTestUtil.getDatanodeCapacity(dm, 0);
// Fail a volume on the 2nd DN
File dn2Vol1 = new File(dataDir, "data"+(2*1+1));
@ -168,7 +170,7 @@ public class TestDataNodeVolumeFailureToleration {
DFSTestUtil.waitReplication(fs, file1, (short)2);
// Check that this single failure caused a DN to die.
DFSTestUtil.waitForDatanodeStatus(ns, 2, 1, 0,
DFSTestUtil.waitForDatanodeStatus(dm, 2, 1, 0,
origCapacity - (1*dnCapacity), WAIT_FOR_HEARTBEATS);
// If we restore the volume we should still only be able to get

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@ -52,6 +51,11 @@ public class NameNodeAdapter {
public static Server getRpcServer(NameNode namenode) {
return namenode.server;
}
public static boolean setReplication(final FSNamesystem ns,
final String src, final short replication) throws IOException {
return ns.setReplication(src, replication);
}
public static String getLeaseHolderForPath(NameNode namenode, String path) {
return namenode.getNamesystem().leaseManager.getLeaseByPath(path).getHolder();

View File

@ -24,6 +24,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@ -199,13 +201,13 @@ public class TestDecommissioningStatus {
Thread.sleep(5000);
FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
for (int iteration = 0; iteration < numDatanodes; iteration++) {
String downnode = decommissionNode(fsn, conf, client, localFileSys,
iteration);
decommissionedNodes.add(downnode);
Thread.sleep(5000);
ArrayList<DatanodeDescriptor> decommissioningNodes = fsn
.getDecommissioningNodes();
final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
if (iteration == 0) {
assertEquals(decommissioningNodes.size(), 1);
DatanodeDescriptor decommNode = decommissioningNodes.get(0);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
@ -32,6 +33,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
@ -59,11 +61,13 @@ public class TestNamenodeCapacityReport extends TestCase {
cluster.waitActive();
final FSNamesystem namesystem = cluster.getNamesystem();
final DatanodeManager dm = cluster.getNamesystem().getBlockManager(
).getDatanodeManager();
// Ensure the data reported for each data node is right
ArrayList<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
ArrayList<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
namesystem.DFSNodesStatus(live, dead);
final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
dm.fetchDatanodes(live, dead, false);
assertTrue(live.size() == 1);

View File

@ -75,6 +75,7 @@ public class TestNameNodeMetrics extends TestCase {
private DistributedFileSystem fs;
private Random rand = new Random();
private FSNamesystem namesystem;
private BlockManager bm;
private static Path getTestPath(String fileName) {
return new Path(TEST_ROOT_DIR_PATH, fileName);
@ -85,6 +86,7 @@ public class TestNameNodeMetrics extends TestCase {
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DATANODE_COUNT).build();
cluster.waitActive();
namesystem = cluster.getNamesystem();
bm = namesystem.getBlockManager();
fs = (DistributedFileSystem) cluster.getFileSystem();
}
@ -167,7 +169,7 @@ public class TestNameNodeMetrics extends TestCase {
// Corrupt first replica of the block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
namesystem.markBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("CorruptBlocks", 1L, rb);
@ -188,7 +190,7 @@ public class TestNameNodeMetrics extends TestCase {
Path file = getTestPath("testExcessBlocks");
createFile(file, 100, (short)2);
long totalBlocks = 1;
namesystem.setReplication(file.toString(), (short)1);
NameNodeAdapter.setReplication(namesystem, file.toString(), (short)1);
updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("ExcessBlocks", totalBlocks, rb);
@ -204,7 +206,7 @@ public class TestNameNodeMetrics extends TestCase {
// Corrupt the only replica of the block to result in a missing block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
namesystem.markBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("UnderReplicatedBlocks", 1L, rb);

View File

@ -455,7 +455,7 @@ public class TestNNLeaseRecovery {
fsn.leaseManager.addLease("mock-lease", file.toString());
if (setStoredBlock) {
when(b1.getINode()).thenReturn(iNFmock);
fsn.getBlockManager().blocksMap.addINode(b1, iNFmock);
fsn.getBlockManager().addINode(b1, iNFmock);
}
when(fsDir.getFileINode(anyString())).thenReturn(iNFmock);