svn merge -c 1190491 from trunk for HDFS-2493.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1190492 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-10-28 18:29:48 +00:00
parent e20f1826f3
commit 3c644c319b
9 changed files with 96 additions and 38 deletions

View File

@ -744,6 +744,9 @@ Release 0.23.0 - Unreleased
HDFS-2371. Refactor BlockSender.java for better readability. (suresh) HDFS-2371. Refactor BlockSender.java for better readability. (suresh)
HDFS-2493. Remove reference to FSNamesystem in blockmanagement classes.
(szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -53,7 +53,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.INode; import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile; import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction; import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
@ -70,8 +70,6 @@ import com.google.common.annotations.VisibleForTesting;
/** /**
* Keeps information related to the blocks stored in the Hadoop cluster. * Keeps information related to the blocks stored in the Hadoop cluster.
* This class is a helper class for {@link FSNamesystem} and requires several
* methods to be called with lock held on {@link FSNamesystem}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class BlockManager { public class BlockManager {
@ -174,15 +172,16 @@ public class BlockManager {
/** for block replicas placement */ /** for block replicas placement */
private BlockPlacementPolicy blockplacement; private BlockPlacementPolicy blockplacement;
public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException { public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
namesystem = fsn; final Configuration conf) throws IOException {
datanodeManager = new DatanodeManager(this, fsn, conf); this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager(); heartbeatManager = datanodeManager.getHeartbeatManager();
invalidateBlocks = new InvalidateBlocks(datanodeManager); invalidateBlocks = new InvalidateBlocks(datanodeManager);
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR); blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
blockplacement = BlockPlacementPolicy.getInstance( blockplacement = BlockPlacementPolicy.getInstance(
conf, fsn, datanodeManager.getNetworkTopology()); conf, stats, datanodeManager.getNetworkTopology());
pendingReplications = new PendingReplicationBlocks(conf.getInt( pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L); DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
@ -2556,7 +2555,7 @@ public class BlockManager {
workFound = this.computeReplicationWork(blocksToProcess); workFound = this.computeReplicationWork(blocksToProcess);
// Update FSNamesystemMetrics counters // Update counters
namesystem.writeLock(); namesystem.writeLock();
try { try {
this.updateState(); this.updateState();

View File

@ -50,8 +50,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand; import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
@ -78,7 +78,7 @@ import org.apache.hadoop.util.ReflectionUtils;
public class DatanodeManager { public class DatanodeManager {
static final Log LOG = LogFactory.getLog(DatanodeManager.class); static final Log LOG = LogFactory.getLog(DatanodeManager.class);
private final FSNamesystem namesystem; private final Namesystem namesystem;
private final BlockManager blockManager; private final BlockManager blockManager;
private final HeartbeatManager heartbeatManager; private final HeartbeatManager heartbeatManager;
@ -124,12 +124,12 @@ public class DatanodeManager {
final int blockInvalidateLimit; final int blockInvalidateLimit;
DatanodeManager(final BlockManager blockManager, DatanodeManager(final BlockManager blockManager,
final FSNamesystem namesystem, final Configuration conf final Namesystem namesystem, final Configuration conf
) throws IOException { ) throws IOException {
this.namesystem = namesystem; this.namesystem = namesystem;
this.blockManager = blockManager; this.blockManager = blockManager;
this.heartbeatManager = new HeartbeatManager(namesystem, conf); this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
this.hostsReader = new HostsFileReader( this.hostsReader = new HostsFileReader(
conf.get(DFSConfigKeys.DFS_HOSTS, ""), conf.get(DFSConfigKeys.DFS_HOSTS, ""),
@ -163,7 +163,8 @@ public class DatanodeManager {
private Daemon decommissionthread = null; private Daemon decommissionthread = null;
void activate(final Configuration conf) { void activate(final Configuration conf) {
this.decommissionthread = new Daemon(new DecommissionManager(namesystem).new Monitor( final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
this.decommissionthread = new Daemon(dm.new Monitor(
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT), DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY, conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,

View File

@ -56,4 +56,7 @@ public interface DatanodeStatistics {
* The block related entries are set to -1. * The block related entries are set to -1.
*/ */
public long[] getStats(); public long[] getStats();
/** @return the expired heartbeats */
public int getExpiredHeartbeats();
} }

View File

@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
/** /**
* Manage node decommissioning. * Manage node decommissioning.
@ -33,10 +33,13 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
class DecommissionManager { class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class); static final Log LOG = LogFactory.getLog(DecommissionManager.class);
private final FSNamesystem fsnamesystem; private final Namesystem namesystem;
private final BlockManager blockmanager;
DecommissionManager(final FSNamesystem namesystem) { DecommissionManager(final Namesystem namesystem,
this.fsnamesystem = namesystem; final BlockManager blockmanager) {
this.namesystem = namesystem;
this.blockmanager = blockmanager;
} }
/** Periodically check decommission status. */ /** Periodically check decommission status. */
@ -61,12 +64,12 @@ class DecommissionManager {
*/ */
@Override @Override
public void run() { public void run() {
for(; fsnamesystem.isRunning(); ) { for(; namesystem.isRunning(); ) {
fsnamesystem.writeLock(); namesystem.writeLock();
try { try {
check(); check();
} finally { } finally {
fsnamesystem.writeUnlock(); namesystem.writeUnlock();
} }
try { try {
@ -78,7 +81,7 @@ class DecommissionManager {
} }
private void check() { private void check() {
final DatanodeManager dm = fsnamesystem.getBlockManager().getDatanodeManager(); final DatanodeManager dm = blockmanager.getDatanodeManager();
int count = 0; int count = 0;
for(Map.Entry<String, DatanodeDescriptor> entry for(Map.Entry<String, DatanodeDescriptor> entry
: dm.getDatanodeCyclicIteration(firstkey)) { : dm.getDatanodeCyclicIteration(firstkey)) {

View File

@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
/** /**
@ -55,14 +55,17 @@ class HeartbeatManager implements DatanodeStatistics {
/** Heartbeat monitor thread */ /** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor()); private final Daemon heartbeatThread = new Daemon(new Monitor());
final FSNamesystem namesystem; final Namesystem namesystem;
final BlockManager blockManager;
HeartbeatManager(final FSNamesystem namesystem, final Configuration conf) { HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
final Configuration conf) {
this.heartbeatRecheckInterval = conf.getInt( this.heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.namesystem = namesystem; this.namesystem = namesystem;
this.blockManager = blockManager;
} }
void activate(Configuration conf) { void activate(Configuration conf) {
@ -136,6 +139,11 @@ class HeartbeatManager implements DatanodeStatistics {
getBlockPoolUsed()}; getBlockPoolUsed()};
} }
@Override
public synchronized int getExpiredHeartbeats() {
return stats.expiredHeartbeats;
}
synchronized void register(final DatanodeDescriptor d) { synchronized void register(final DatanodeDescriptor d) {
if (!datanodes.contains(d)) { if (!datanodes.contains(d)) {
addDatanode(d); addDatanode(d);
@ -191,7 +199,7 @@ class HeartbeatManager implements DatanodeStatistics {
* effect causes more datanodes to be declared dead. * effect causes more datanodes to be declared dead.
*/ */
void heartbeatCheck() { void heartbeatCheck() {
final DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager(); final DatanodeManager dm = blockManager.getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check // It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode. // for safe mode after taking the lock before removing a datanode.
if (namesystem.isInSafeMode()) { if (namesystem.isInSafeMode()) {
@ -204,7 +212,7 @@ class HeartbeatManager implements DatanodeStatistics {
synchronized(this) { synchronized(this) {
for (DatanodeDescriptor d : datanodes) { for (DatanodeDescriptor d : datanodes) {
if (dm.isDatanodeDead(d)) { if (dm.isDatanodeDead(d)) {
namesystem.incrExpiredHeartbeats(); stats.incrExpiredHeartbeats();
dead = d; dead = d;
break; break;
} }
@ -244,8 +252,7 @@ class HeartbeatManager implements DatanodeStatistics {
heartbeatCheck(); heartbeatCheck();
lastHeartbeatCheck = now; lastHeartbeatCheck = now;
} }
if (namesystem.getBlockManager().shouldUpdateBlockKey( if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) { synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) { for(DatanodeDescriptor d : datanodes) {
d.needKeyUpdate = true; d.needKeyUpdate = true;
@ -274,6 +281,8 @@ class HeartbeatManager implements DatanodeStatistics {
private long blockPoolUsed = 0L; private long blockPoolUsed = 0L;
private int xceiverCount = 0; private int xceiverCount = 0;
private int expiredHeartbeats = 0;
private void add(final DatanodeDescriptor node) { private void add(final DatanodeDescriptor node) {
capacityUsed += node.getDfsUsed(); capacityUsed += node.getDfsUsed();
blockPoolUsed += node.getBlockPoolUsed(); blockPoolUsed += node.getBlockPoolUsed();
@ -297,5 +306,10 @@ class HeartbeatManager implements DatanodeStatistics {
capacityTotal -= node.getDfsUsed(); capacityTotal -= node.getDfsUsed();
} }
} }
/** Increment expired heartbeat counter. */
private void incrExpiredHeartbeats() {
expiredHeartbeats++;
}
} }
} }

View File

@ -17,6 +17,45 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_UPGRADE_PERMISSION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
import static org.apache.hadoop.hdfs.server.common.Util.now; import static org.apache.hadoop.hdfs.server.common.Util.now;
import java.io.BufferedWriter; import java.io.BufferedWriter;
@ -68,7 +107,6 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@ -119,7 +157,6 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node; import org.apache.hadoop.net.Node;
@ -202,8 +239,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private UserGroupInformation fsOwner; private UserGroupInformation fsOwner;
private String supergroup; private String supergroup;
private PermissionStatus defaultPermission; private PermissionStatus defaultPermission;
// FSNamesystemMetrics counter variables
@Metric private MutableCounterInt expiredHeartbeats;
// Scan interval is not configurable. // Scan interval is not configurable.
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
@ -282,7 +317,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
nnResourceChecker = new NameNodeResourceChecker(conf); nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources(); checkAvailableResources();
this.systemStart = now(); this.systemStart = now();
this.blockManager = new BlockManager(this, conf); this.blockManager = new BlockManager(this, this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics(); this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsLock = new ReentrantReadWriteLock(true); // fair locking this.fsLock = new ReentrantReadWriteLock(true); // fair locking
setConfigurationParameters(conf); setConfigurationParameters(conf);
@ -2589,9 +2624,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return blockManager.getMissingBlocksCount(); return blockManager.getMissingBlocksCount();
} }
/** Increment expired heartbeat counter. */ @Metric({"ExpiredHeartbeats", "Number of expired heartbeats"})
public void incrExpiredHeartbeats() { public int getExpiredHeartbeats() {
expiredHeartbeats.incr(); return datanodeStatistics.getExpiredHeartbeats();
} }
/** @see ClientProtocol#getStats() */ /** @see ClientProtocol#getStats() */

View File

@ -77,7 +77,7 @@ public class TestDatanodeReport extends TestCase {
NUM_OF_DATANODES); NUM_OF_DATANODES);
Thread.sleep(5000); Thread.sleep(5000);
assertCounter("ExpiredHeartbeats", 1, getMetrics("FSNamesystem")); assertGauge("ExpiredHeartbeats", 1, getMetrics("FSNamesystem"));
}finally { }finally {
cluster.shutdown(); cluster.shutdown();
} }

View File

@ -80,7 +80,7 @@ public class TestBlockManager {
"need to set a dummy value here so it assumes a multi-rack cluster"); "need to set a dummy value here so it assumes a multi-rack cluster");
fsn = Mockito.mock(FSNamesystem.class); fsn = Mockito.mock(FSNamesystem.class);
Mockito.doReturn(true).when(fsn).hasWriteLock(); Mockito.doReturn(true).when(fsn).hasWriteLock();
bm = new BlockManager(fsn, conf); bm = new BlockManager(fsn, fsn, conf);
} }
private void addNodes(Iterable<DatanodeDescriptor> nodesToAdd) { private void addNodes(Iterable<DatanodeDescriptor> nodesToAdd) {