HDFS-2112. Move ReplicationMonitor to block management. Contributed by Uma Maheswara Rao G

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1149771 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-07-23 01:01:58 +00:00
parent ad3f694261
commit 89537b7710
17 changed files with 162 additions and 117 deletions

View File

@ -590,6 +590,9 @@ Trunk (unreleased changes)
HDFS-2116. Use Mokito in TestStreamFile and TestByteRangeInputStream. HDFS-2116. Use Mokito in TestStreamFile and TestByteRangeInputStream.
(Plamen Jeliazkov via shv) (Plamen Jeliazkov via shv)
HDFS-2112. Move ReplicationMonitor to block management. (Uma Maheswara
Rao G via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -42,9 +42,9 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator; import org.apache.hadoop.hdfs.server.blockmanagement.UnderReplicatedBlocks.BlockIterator;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.Daemon;
/** /**
* Keeps information related to the blocks stored in the Hadoop cluster. * Keeps information related to the blocks stored in the Hadoop cluster.
@ -101,6 +102,9 @@ public class BlockManager {
return excessBlocksCount; return excessBlocksCount;
} }
/**replicationRecheckInterval is how often namenode checks for new replication work*/
private final long replicationRecheckInterval;
/** /**
* Mapping: Block -> { INode, datanodes, self ref } * Mapping: Block -> { INode, datanodes, self ref }
* Updated only in response to client-sent information. * Updated only in response to client-sent information.
@ -108,7 +112,10 @@ public class BlockManager {
public final BlocksMap blocksMap; public final BlocksMap blocksMap;
private final DatanodeManager datanodeManager; private final DatanodeManager datanodeManager;
/** Replication thread. */
final Daemon replicationThread = new Daemon(new ReplicationMonitor());
/** Store blocks -> datanodedescriptor(s) map of corrupt replicas */ /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap(); final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
@ -162,7 +169,6 @@ public class BlockManager {
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException { public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
namesystem = fsn; namesystem = fsn;
datanodeManager = new DatanodeManager(fsn, conf); datanodeManager = new DatanodeManager(fsn, conf);
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR); blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
blockplacement = BlockPlacementPolicy.getInstance( blockplacement = BlockPlacementPolicy.getInstance(
conf, namesystem, datanodeManager.getNetworkTopology()); conf, namesystem, datanodeManager.getNetworkTopology());
@ -198,22 +204,29 @@ public class BlockManager {
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null ? false
: true; : true;
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
FSNamesystem.LOG.info("defaultReplication = " + defaultReplication); FSNamesystem.LOG.info("defaultReplication = " + defaultReplication);
FSNamesystem.LOG.info("maxReplication = " + maxReplication); FSNamesystem.LOG.info("maxReplication = " + maxReplication);
FSNamesystem.LOG.info("minReplication = " + minReplication); FSNamesystem.LOG.info("minReplication = " + minReplication);
FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams); FSNamesystem.LOG.info("maxReplicationStreams = " + maxReplicationStreams);
FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks); FSNamesystem.LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
FSNamesystem.LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
} }
public void activate(Configuration conf) { public void activate(Configuration conf) {
pendingReplications.start(); pendingReplications.start();
datanodeManager.activate(conf); datanodeManager.activate(conf);
this.replicationThread.start();
} }
public void close() { public void close() {
if (pendingReplications != null) pendingReplications.stop(); if (pendingReplications != null) pendingReplications.stop();
blocksMap.close(); blocksMap.close();
datanodeManager.close(); datanodeManager.close();
if (replicationThread != null) replicationThread.interrupt();
} }
/** @return the datanodeManager */ /** @return the datanodeManager */
@ -2248,4 +2261,72 @@ public class BlockManager {
processOverReplicatedBlocksOnReCommission(node); processOverReplicatedBlocksOnReCommission(node);
} }
} }
/**
* Periodically calls computeReplicationWork().
*/
private class ReplicationMonitor implements Runnable {
static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
@Override
public void run() {
while (namesystem.isRunning()) {
try {
computeDatanodeWork();
processPendingReplications();
Thread.sleep(replicationRecheckInterval);
} catch (InterruptedException ie) {
LOG.warn("ReplicationMonitor thread received InterruptedException.", ie);
break;
} catch (IOException ie) {
LOG.warn("ReplicationMonitor thread received exception. " , ie);
} catch (Throwable t) {
LOG.warn("ReplicationMonitor thread received Runtime exception. ", t);
Runtime.getRuntime().exit(-1);
}
}
}
}
/**
* Compute block replication and block invalidation work that can be scheduled
* on data-nodes. The datanode will be informed of this work at the next
* heartbeat.
*
* @return number of blocks scheduled for replication or removal.
* @throws IOException
*/
int computeDatanodeWork() throws IOException {
int workFound = 0;
int blocksToProcess = 0;
int nodesToProcess = 0;
// Blocks should not be replicated or removed if in safe mode.
// It's OK to check safe mode here w/o holding lock, in the worst
// case extra replications will be scheduled, and these will get
// fixed up later.
if (namesystem.isInSafeMode())
return workFound;
synchronized (namesystem.heartbeats) {
blocksToProcess = (int) (namesystem.heartbeats.size() * ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
nodesToProcess = (int) Math.ceil((double) namesystem.heartbeats.size()
* ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
}
workFound = this.computeReplicationWork(blocksToProcess);
// Update FSNamesystemMetrics counters
namesystem.writeLock();
try {
this.updateState();
this.scheduledReplicationBlocksCount = workFound;
} finally {
namesystem.writeUnlock();
}
workFound += this.computeInvalidateWork(nodesToProcess);
return workFound;
}
} }

View File

@ -35,11 +35,11 @@ class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class); static final Log LOG = LogFactory.getLog(DecommissionManager.class);
private final FSNamesystem fsnamesystem; private final FSNamesystem fsnamesystem;
private final BlockManager blockmanager; private final BlockManager blockManager;
DecommissionManager(FSNamesystem namesystem) { DecommissionManager(FSNamesystem namesystem) {
this.fsnamesystem = namesystem; this.fsnamesystem = namesystem;
this.blockmanager = fsnamesystem.getBlockManager(); this.blockManager = fsnamesystem.getBlockManager();
} }
/** Periodically check decommission status. */ /** Periodically check decommission status. */
@ -90,7 +90,7 @@ class DecommissionManager {
if (d.isDecommissionInProgress()) { if (d.isDecommissionInProgress()) {
try { try {
blockmanager.checkDecommissionStateInternal(d); blockManager.checkDecommissionStateInternal(d);
} catch(Exception e) { } catch(Exception e) {
LOG.warn("entry=" + entry, e); LOG.warn("entry=" + entry, e);
} }

View File

@ -154,7 +154,7 @@ public class FSDirectory implements Closeable {
} }
private BlockManager getBlockManager() { private BlockManager getBlockManager() {
return getFSNamesystem().blockManager; return getFSNamesystem().getBlockManager();
} }
void loadFSImage(Collection<URI> dataDirs, void loadFSImage(Collection<URI> dataDirs,

View File

@ -233,7 +233,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
// Stores the correct file name hierarchy // Stores the correct file name hierarchy
// //
public FSDirectory dir; public FSDirectory dir;
BlockManager blockManager; private BlockManager blockManager;
// Block pool ID used by this namenode // Block pool ID used by this namenode
String blockPoolId; String blockPoolId;
@ -280,7 +280,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
Daemon hbthread = null; // HeartbeatMonitor thread Daemon hbthread = null; // HeartbeatMonitor thread
public Daemon lmthread = null; // LeaseMonitor thread public Daemon lmthread = null; // LeaseMonitor thread
Daemon smmthread = null; // SafeModeMonitor thread Daemon smmthread = null; // SafeModeMonitor thread
public Daemon replthread = null; // Replication thread
Daemon nnrmthread = null; // NamenodeResourceMonitor thread Daemon nnrmthread = null; // NamenodeResourceMonitor thread
private volatile boolean hasResourcesAvailable = false; private volatile boolean hasResourcesAvailable = false;
@ -292,8 +292,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
// heartbeatExpireInterval is how long namenode waits for datanode to report // heartbeatExpireInterval is how long namenode waits for datanode to report
// heartbeat // heartbeat
private long heartbeatExpireInterval; private long heartbeatExpireInterval;
//replicationRecheckInterval is how often namenode checks for new replication work
private long replicationRecheckInterval;
//resourceRecheckInterval is how often namenode checks for the disk space availability //resourceRecheckInterval is how often namenode checks for the disk space availability
private long resourceRecheckInterval; private long resourceRecheckInterval;
@ -387,10 +385,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
blockManager.activate(conf); blockManager.activate(conf);
this.hbthread = new Daemon(new HeartbeatMonitor()); this.hbthread = new Daemon(new HeartbeatMonitor());
this.lmthread = new Daemon(leaseManager.new Monitor()); this.lmthread = new Daemon(leaseManager.new Monitor());
this.replthread = new Daemon(new ReplicationMonitor());
hbthread.start(); hbthread.start();
lmthread.start(); lmthread.start();
replthread.start();
this.nnrmthread = new Daemon(new NameNodeResourceMonitor()); this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
nnrmthread.start(); nnrmthread.start();
@ -524,9 +521,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval +
10 * heartbeatInterval; 10 * heartbeatInterval;
this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
this.serverDefaults = new FsServerDefaults( this.serverDefaults = new FsServerDefaults(
conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE), conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE),
conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BYTES_PER_CHECKSUM), conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BYTES_PER_CHECKSUM),
@ -595,7 +590,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
try { try {
if (blockManager != null) blockManager.close(); if (blockManager != null) blockManager.close();
if (hbthread != null) hbthread.interrupt(); if (hbthread != null) hbthread.interrupt();
if (replthread != null) replthread.interrupt();
if (smmthread != null) smmthread.interrupt(); if (smmthread != null) smmthread.interrupt();
if (dtSecretManager != null) dtSecretManager.stopThreads(); if (dtSecretManager != null) dtSecretManager.stopThreads();
if (nnrmthread != null) nnrmthread.interrupt(); if (nnrmthread != null) nnrmthread.interrupt();
@ -3009,77 +3003,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
} }
} }
/**
* Periodically calls computeReplicationWork().
*/
class ReplicationMonitor implements Runnable {
static final int INVALIDATE_WORK_PCT_PER_ITERATION = 32;
static final float REPLICATION_WORK_MULTIPLIER_PER_ITERATION = 2;
public void run() {
while (fsRunning) {
try {
computeDatanodeWork();
blockManager.processPendingReplications();
Thread.sleep(replicationRecheckInterval);
} catch (InterruptedException ie) {
LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
break;
} catch (IOException ie) {
LOG.warn("ReplicationMonitor thread received exception. " + ie);
} catch (Throwable t) {
LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
Runtime.getRuntime().exit(-1);
}
}
}
}
/////////////////////////////////////////////////////////
//
// These methods are called by the Namenode system, to see
// if there is any work for registered datanodes.
//
/////////////////////////////////////////////////////////
/**
* Compute block replication and block invalidation work
* that can be scheduled on data-nodes.
* The datanode will be informed of this work at the next heartbeat.
*
* @return number of blocks scheduled for replication or removal.
* @throws IOException
*/
public int computeDatanodeWork() throws IOException {
int workFound = 0;
int blocksToProcess = 0;
int nodesToProcess = 0;
// Blocks should not be replicated or removed if in safe mode.
// It's OK to check safe mode here w/o holding lock, in the worst
// case extra replications will be scheduled, and these will get
// fixed up later.
if (isInSafeMode())
return workFound;
synchronized (heartbeats) {
blocksToProcess = (int)(heartbeats.size()
* ReplicationMonitor.REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
nodesToProcess = (int)Math.ceil((double)heartbeats.size()
* ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100);
}
workFound = blockManager.computeReplicationWork(blocksToProcess);
// Update FSNamesystemMetrics counters
writeLock();
try {
blockManager.updateState();
blockManager.scheduledReplicationBlocksCount = workFound;
} finally {
writeUnlock();
}
workFound += blockManager.computeInvalidateWork(nodesToProcess);
return workFound;
}
public void setNodeReplicationLimit(int limit) { public void setNodeReplicationLimit(int limit) {
blockManager.maxReplicationStreams = limit; blockManager.maxReplicationStreams = limit;
} }

View File

@ -724,7 +724,7 @@ class NamenodeJspHelper {
this.inode = null; this.inode = null;
} else { } else {
this.block = new Block(blockId); this.block = new Block(blockId);
this.inode = fsn.blockManager.getINode(block); this.inode = fsn.getBlockManager().getINode(block);
} }
} }
@ -799,9 +799,9 @@ class NamenodeJspHelper {
doc.startTag("replicas"); doc.startTag("replicas");
if (fsn.blockManager.blocksMap.contains(block)) { if (fsn.getBlockManager().blocksMap.contains(block)) {
Iterator<DatanodeDescriptor> it = Iterator<DatanodeDescriptor> it =
fsn.blockManager.blocksMap.nodeIterator(block); fsn.getBlockManager().blocksMap.nodeIterator(block);
while (it.hasNext()) { while (it.hasNext()) {
doc.startTag("replica"); doc.startTag("replica");

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.blockmanagement; package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
@ -24,7 +25,7 @@ import java.util.Set;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.util.Daemon;
public class BlockManagerTestUtil { public class BlockManagerTestUtil {
/** /**
@ -48,12 +49,12 @@ public class BlockManagerTestUtil {
* decommissioning/decommissioned nodes are not counted. corrupt replicas * decommissioning/decommissioned nodes are not counted. corrupt replicas
* are also ignored * are also ignored
*/ */
private static int getNumberOfRacks(final BlockManager blockmanager, private static int getNumberOfRacks(final BlockManager blockManager,
final Block b) { final Block b) {
final Set<String> rackSet = new HashSet<String>(0); final Set<String> rackSet = new HashSet<String>(0);
final Collection<DatanodeDescriptor> corruptNodes = final Collection<DatanodeDescriptor> corruptNodes =
blockmanager.corruptReplicas.getNodes(b); getCorruptReplicas(blockManager).getNodes(b);
for (Iterator<DatanodeDescriptor> it = blockmanager.blocksMap.nodeIterator(b); for (Iterator<DatanodeDescriptor> it = blockManager.blocksMap.nodeIterator(b);
it.hasNext();) { it.hasNext();) {
DatanodeDescriptor cur = it.next(); DatanodeDescriptor cur = it.next();
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) { if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
@ -68,4 +69,33 @@ public class BlockManagerTestUtil {
return rackSet.size(); return rackSet.size();
} }
/**
* @param blockManager
* @return replication monitor thread instance from block manager.
*/
public static Daemon getReplicationThread(final BlockManager blockManager)
{
return blockManager.replicationThread;
}
/**
* @param blockManager
* @return corruptReplicas from block manager
*/
public static CorruptReplicasMap getCorruptReplicas(final BlockManager blockManager){
return blockManager.corruptReplicas;
}
/**
* @param blockManager
* @return computed block replication and block invalidation work that can be
* scheduled on data-nodes.
* @throws IOException
*/
public static int getComputedDatanodeWork(final BlockManager blockManager) throws IOException
{
return blockManager.computeDatanodeWork();
}
} }

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -220,7 +221,8 @@ public class TestBlockReport {
cluster.getNameNode().blockReport(dnR, poolId, cluster.getNameNode().blockReport(dnR, poolId,
new BlockListAsLongs(blocks, null).getBlockListAsLongs()); new BlockListAsLongs(blocks, null).getBlockListAsLongs());
cluster.getNamesystem().computeDatanodeWork(); BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
.getBlockManager());
printStats(); printStats();

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.common.HdfsConstants; import org.apache.hadoop.hdfs.server.common.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -191,7 +192,7 @@ public class TestDataNodeVolumeFailure {
// now check the number of under-replicated blocks // now check the number of under-replicated blocks
FSNamesystem fsn = cluster.getNamesystem(); FSNamesystem fsn = cluster.getNamesystem();
// force update of all the metric counts by calling computeDatanodeWork // force update of all the metric counts by calling computeDatanodeWork
fsn.computeDatanodeWork(); BlockManagerTestUtil.getComputedDatanodeWork(fsn.getBlockManager());
// get all the counts // get all the counts
long underRepl = fsn.getUnderReplicatedBlocks(); long underRepl = fsn.getUnderReplicatedBlocks();
long pendRepl = fsn.getPendingReplicationBlocks(); long pendRepl = fsn.getPendingReplicationBlocks();

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants; import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@ -1111,9 +1112,11 @@ public class NNThroughputBenchmark {
// start data-nodes; create a bunch of files; generate block reports. // start data-nodes; create a bunch of files; generate block reports.
blockReportObject.generateInputs(ignore); blockReportObject.generateInputs(ignore);
// stop replication monitor // stop replication monitor
namesystem.replthread.interrupt(); BlockManagerTestUtil.getReplicationThread(namesystem.getBlockManager())
.interrupt();
try { try {
namesystem.replthread.join(); BlockManagerTestUtil.getReplicationThread(namesystem.getBlockManager())
.join();
} catch(InterruptedException ei) { } catch(InterruptedException ei) {
return; return;
} }
@ -1156,7 +1159,8 @@ public class NNThroughputBenchmark {
assert daemonId < numThreads : "Wrong daemonId."; assert daemonId < numThreads : "Wrong daemonId.";
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
// compute data-node work // compute data-node work
int work = nameNode.getNamesystem().computeDatanodeWork(); int work = BlockManagerTestUtil.getComputedDatanodeWork(nameNode
.getNamesystem().getBlockManager());
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
numPendingBlocks += work; numPendingBlocks += work;
if(work == 0) if(work == 0)

View File

@ -50,7 +50,7 @@ public class NameNodeAdapter {
* @param namenode to proxy the invocation to * @param namenode to proxy the invocation to
*/ */
public static void refreshBlockCounts(NameNode namenode) { public static void refreshBlockCounts(NameNode namenode) {
namenode.getNamesystem().blockManager.updateState(); namenode.getNamesystem().getBlockManager().updateState();
} }
/** /**

View File

@ -100,7 +100,7 @@ public class TestBlockUnderConstruction {
assertTrue("Block is not complete: " + curBlock, assertTrue("Block is not complete: " + curBlock,
curBlock.isComplete()); curBlock.isComplete());
assertTrue("Block is not in BlocksMap: " + curBlock, assertTrue("Block is not in BlocksMap: " + curBlock,
ns.blockManager.getStoredBlock(curBlock) == curBlock); ns.getBlockManager().getStoredBlock(curBlock) == curBlock);
} }
// the penultimate block is either complete or // the penultimate block is either complete or
@ -115,7 +115,7 @@ public class TestBlockUnderConstruction {
(curBlock.getBlockUCState() == (curBlock.getBlockUCState() ==
BlockUCState.COMMITTED))); BlockUCState.COMMITTED)));
assertTrue("Block is not in BlocksMap: " + curBlock, assertTrue("Block is not in BlocksMap: " + curBlock,
ns.blockManager.getStoredBlock(curBlock) == curBlock); ns.getBlockManager().getStoredBlock(curBlock) == curBlock);
} }
// The last block is complete if the file is closed. // The last block is complete if the file is closed.
@ -126,7 +126,7 @@ public class TestBlockUnderConstruction {
curBlock.isComplete()); curBlock.isComplete());
} }
assertTrue("Block is not in BlocksMap: " + curBlock, assertTrue("Block is not in BlocksMap: " + curBlock,
ns.blockManager.getStoredBlock(curBlock) == curBlock); ns.getBlockManager().getStoredBlock(curBlock) == curBlock);
} }
@Test @Test

View File

@ -51,23 +51,23 @@ public class TestComputeInvalidateWork extends TestCase {
for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) { for(int j=0; j<3*namesystem.blockInvalidateLimit+1; j++) {
Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0, Block block = new Block(i*(namesystem.blockInvalidateLimit+1)+j, 0,
GenerationStamp.FIRST_VALID_STAMP); GenerationStamp.FIRST_VALID_STAMP);
namesystem.blockManager.addToInvalidates(block, nodes[i]); namesystem.getBlockManager().addToInvalidates(block, nodes[i]);
} }
} }
assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
namesystem.blockManager.computeInvalidateWork(NUM_OF_DATANODES+1)); namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES+1));
assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES, assertEquals(namesystem.blockInvalidateLimit*NUM_OF_DATANODES,
namesystem.blockManager.computeInvalidateWork(NUM_OF_DATANODES)); namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES));
assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1), assertEquals(namesystem.blockInvalidateLimit*(NUM_OF_DATANODES-1),
namesystem.blockManager.computeInvalidateWork(NUM_OF_DATANODES-1)); namesystem.getBlockManager().computeInvalidateWork(NUM_OF_DATANODES-1));
int workCount = namesystem.blockManager.computeInvalidateWork(1); int workCount = namesystem.getBlockManager().computeInvalidateWork(1);
if (workCount == 1) { if (workCount == 1) {
assertEquals(namesystem.blockInvalidateLimit+1, assertEquals(namesystem.blockInvalidateLimit+1,
namesystem.blockManager.computeInvalidateWork(2)); namesystem.getBlockManager().computeInvalidateWork(2));
} else { } else {
assertEquals(workCount, namesystem.blockInvalidateLimit); assertEquals(workCount, namesystem.blockInvalidateLimit);
assertEquals(2, namesystem.blockManager.computeInvalidateWork(2)); assertEquals(2, namesystem.getBlockManager().computeInvalidateWork(2));
} }
} finally { } finally {
namesystem.writeUnlock(); namesystem.writeUnlock();

View File

@ -78,7 +78,7 @@ public class TestLargeDirectoryDelete {
Assert.assertNotNull("No Namenode in cluster", mc.getNameNode()); Assert.assertNotNull("No Namenode in cluster", mc.getNameNode());
FSNamesystem namesystem = mc.getNamesystem(); FSNamesystem namesystem = mc.getNamesystem();
Assert.assertNotNull("Null Namesystem in cluster", namesystem); Assert.assertNotNull("Null Namesystem in cluster", namesystem);
Assert.assertNotNull("Null Namesystem.blockmanager", namesystem.blockManager); Assert.assertNotNull("Null Namesystem.blockmanager", namesystem.getBlockManager());
return (int) namesystem.getBlocksTotal(); return (int) namesystem.getBlocksTotal();
} }

View File

@ -102,12 +102,12 @@ public class TestNodeCount extends TestCase {
} }
// find out a non-excess node // find out a non-excess node
Iterator<DatanodeDescriptor> iter = namesystem.blockManager.blocksMap Iterator<DatanodeDescriptor> iter = namesystem.getBlockManager().blocksMap
.nodeIterator(block.getLocalBlock()); .nodeIterator(block.getLocalBlock());
DatanodeDescriptor nonExcessDN = null; DatanodeDescriptor nonExcessDN = null;
while (iter.hasNext()) { while (iter.hasNext()) {
DatanodeDescriptor dn = iter.next(); DatanodeDescriptor dn = iter.next();
Collection<Block> blocks = namesystem.blockManager.excessReplicateMap.get(dn.getStorageID()); Collection<Block> blocks = namesystem.getBlockManager().excessReplicateMap.get(dn.getStorageID());
if (blocks == null || !blocks.contains(block) ) { if (blocks == null || !blocks.contains(block) ) {
nonExcessDN = dn; nonExcessDN = dn;
break; break;
@ -184,7 +184,7 @@ public class TestNodeCount extends TestCase {
namesystem.readLock(); namesystem.readLock();
try { try {
lastBlock = block; lastBlock = block;
lastNum = namesystem.blockManager.countNodes(block); lastNum = namesystem.getBlockManager().countNodes(block);
return lastNum; return lastNum;
} }
finally { finally {

View File

@ -100,7 +100,7 @@ public class TestOverReplicatedBlocks extends TestCase {
// corrupt one won't be chosen to be excess one // corrupt one won't be chosen to be excess one
// without 4910 the number of live replicas would be 0: block gets lost // without 4910 the number of live replicas would be 0: block gets lost
assertEquals(1, namesystem.blockManager.countNodes(block.getLocalBlock()) assertEquals(1, namesystem.getBlockManager().countNodes(block.getLocalBlock())
.liveReplicas()); .liveReplicas());
} }
} finally { } finally {

View File

@ -455,7 +455,7 @@ public class TestNNLeaseRecovery {
fsn.leaseManager.addLease("mock-lease", file.toString()); fsn.leaseManager.addLease("mock-lease", file.toString());
if (setStoredBlock) { if (setStoredBlock) {
when(b1.getINode()).thenReturn(iNFmock); when(b1.getINode()).thenReturn(iNFmock);
fsn.blockManager.blocksMap.addINode(b1, iNFmock); fsn.getBlockManager().blocksMap.addINode(b1, iNFmock);
} }
when(fsDir.getFileINode(anyString())).thenReturn(iNFmock); when(fsDir.getFileINode(anyString())).thenReturn(iNFmock);