svn merge -c 1309218 from trunk for HDFS-3168. Remove unnecessary "throw IOException" and change fields to final in FSNamesystem and BlockManager.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1309219 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-04-04 02:59:00 +00:00
parent 98cb813248
commit 785e5f08cc
6 changed files with 121 additions and 149 deletions

View File

@ -203,6 +203,9 @@ Release 2.0.0 - UNRELEASED
HDFS-3187. Upgrade guava to 11.0.2 (todd) HDFS-3187. Upgrade guava to 11.0.2 (todd)
HDFS-3168. Remove unnecessary "throw IOException" and change fields to
final in FSNamesystem and BlockManager. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the HDFS-2477. Optimize computing the diff between a block report and the

View File

@ -255,9 +255,9 @@ public class BlockManager {
this.replicationRecheckInterval = this.replicationRecheckInterval =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L; DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
LOG.info("defaultReplication = " + defaultReplication); LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication); LOG.info("maxReplication = " + maxReplication);
LOG.info("minReplication = " + minReplication); LOG.info("minReplication = " + minReplication);
LOG.info("maxReplicationStreams = " + maxReplicationStreams); LOG.info("maxReplicationStreams = " + maxReplicationStreams);
LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks); LOG.info("shouldCheckForEnoughRacks = " + shouldCheckForEnoughRacks);
LOG.info("replicationRecheckInterval = " + replicationRecheckInterval); LOG.info("replicationRecheckInterval = " + replicationRecheckInterval);
@ -1032,7 +1032,7 @@ public class BlockManager {
* *
* @return number of blocks scheduled for replication during this iteration. * @return number of blocks scheduled for replication during this iteration.
*/ */
int computeReplicationWork(int blocksToProcess) throws IOException { int computeReplicationWork(int blocksToProcess) {
List<List<Block>> blocksToReplicate = null; List<List<Block>> blocksToReplicate = null;
namesystem.writeLock(); namesystem.writeLock();
try { try {
@ -2176,7 +2176,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
/** Set replication for the blocks. */ /** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl, public void setReplication(final short oldRepl, final short newRepl,
final String src, final Block... blocks) throws IOException { final String src, final Block... blocks) {
if (newRepl == oldRepl) { if (newRepl == oldRepl) {
return; return;
} }
@ -2939,8 +2939,6 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
LOG.warn("ReplicationMonitor thread received InterruptedException.", ie); LOG.warn("ReplicationMonitor thread received InterruptedException.", ie);
break; break;
} catch (IOException ie) {
LOG.warn("ReplicationMonitor thread received exception. " , ie);
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("ReplicationMonitor thread received Runtime exception. ", t); LOG.warn("ReplicationMonitor thread received Runtime exception. ", t);
Runtime.getRuntime().exit(-1); Runtime.getRuntime().exit(-1);
@ -2958,14 +2956,14 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
* @return number of blocks scheduled for replication or removal. * @return number of blocks scheduled for replication or removal.
* @throws IOException * @throws IOException
*/ */
int computeDatanodeWork() throws IOException { int computeDatanodeWork() {
int workFound = 0;
// Blocks should not be replicated or removed if in safe mode. // Blocks should not be replicated or removed if in safe mode.
// It's OK to check safe mode here w/o holding lock, in the worst // It's OK to check safe mode here w/o holding lock, in the worst
// case extra replications will be scheduled, and these will get // case extra replications will be scheduled, and these will get
// fixed up later. // fixed up later.
if (namesystem.isInSafeMode()) if (namesystem.isInSafeMode()) {
return workFound; return 0;
}
final int numlive = heartbeatManager.getLiveDatanodeCount(); final int numlive = heartbeatManager.getLiveDatanodeCount();
final int blocksToProcess = numlive final int blocksToProcess = numlive
@ -2973,7 +2971,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
final int nodesToProcess = (int) Math.ceil(numlive final int nodesToProcess = (int) Math.ceil(numlive
* ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0); * ReplicationMonitor.INVALIDATE_WORK_PCT_PER_ITERATION / 100.0);
workFound = this.computeReplicationWork(blocksToProcess); int workFound = this.computeReplicationWork(blocksToProcess);
// Update counters // Update counters
namesystem.writeLock(); namesystem.writeLock();

View File

@ -25,15 +25,17 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAUL
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT;
@ -49,15 +51,13 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DAT
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_STANDBY_CHECKPOINTS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERSIST_BLOCKS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT;
@ -150,9 +150,9 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirType;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease; import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@ -260,30 +260,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100; static final int DEFAULT_MAX_CORRUPT_FILEBLOCKS_RETURNED = 100;
static int BLOCK_DELETION_INCREMENT = 1000; static int BLOCK_DELETION_INCREMENT = 1000;
private boolean isPermissionEnabled; private final boolean isPermissionEnabled;
private boolean persistBlocks; private final boolean persistBlocks;
private UserGroupInformation fsOwner; private final UserGroupInformation fsOwner;
private String supergroup; private final String supergroup;
private boolean standbyShouldCheckpoint; private final boolean standbyShouldCheckpoint;
// Scan interval is not configurable. // Scan interval is not configurable.
private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL = private static final long DELEGATION_TOKEN_REMOVER_SCAN_INTERVAL =
TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS); TimeUnit.MILLISECONDS.convert(1, TimeUnit.HOURS);
private DelegationTokenSecretManager dtSecretManager; private final DelegationTokenSecretManager dtSecretManager;
private boolean alwaysUseDelegationTokensForTests; private final boolean alwaysUseDelegationTokensForTests;
// /** The namespace tree. */
// Stores the correct file name hierarchy
//
FSDirectory dir; FSDirectory dir;
private BlockManager blockManager; private final BlockManager blockManager;
private DatanodeStatistics datanodeStatistics; private final DatanodeStatistics datanodeStatistics;
// Block pool ID used by this namenode // Block pool ID used by this namenode
private String blockPoolId; private String blockPoolId;
LeaseManager leaseManager = new LeaseManager(this); final LeaseManager leaseManager = new LeaseManager(this);
Daemon smmthread = null; // SafeModeMonitor thread Daemon smmthread = null; // SafeModeMonitor thread
@ -291,23 +289,23 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private volatile boolean hasResourcesAvailable = false; private volatile boolean hasResourcesAvailable = false;
private volatile boolean fsRunning = true; private volatile boolean fsRunning = true;
long systemStart = 0;
/** The start time of the namesystem. */
private final long startTime = now();
//resourceRecheckInterval is how often namenode checks for the disk space availability /** The interval of namenode checking for the disk space availability */
private long resourceRecheckInterval; private final long resourceRecheckInterval;
// The actual resource checker instance. // The actual resource checker instance.
NameNodeResourceChecker nnResourceChecker; NameNodeResourceChecker nnResourceChecker;
private FsServerDefaults serverDefaults; private final FsServerDefaults serverDefaults;
private final boolean supportAppends;
private boolean supportAppends; private final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
private ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure =
ReplaceDatanodeOnFailure.DEFAULT;
private volatile SafeModeInfo safeMode; // safe mode information private volatile SafeModeInfo safeMode; // safe mode information
private long maxFsObjects = 0; // maximum number of fs objects private final long maxFsObjects; // maximum number of fs objects
/** /**
* The global generation stamp for this file system. * The global generation stamp for this file system.
@ -315,10 +313,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private final GenerationStamp generationStamp = new GenerationStamp(); private final GenerationStamp generationStamp = new GenerationStamp();
// precision of access times. // precision of access times.
private long accessTimePrecision = 0; private final long accessTimePrecision;
// lock to protect FSNamesystem. /** Lock to protect FSNamesystem. */
private ReentrantReadWriteLock fsLock; private ReentrantReadWriteLock fsLock = new ReentrantReadWriteLock(true);
/** /**
* Used when this NN is in standby state to read from the shared edit log. * Used when this NN is in standby state to read from the shared edit log.
@ -336,9 +334,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/ */
private HAContext haContext; private HAContext haContext;
private boolean haEnabled; private final boolean haEnabled;
private final Configuration conf;
/** /**
* Instantiates an FSNamesystem loaded from the image and edits * Instantiates an FSNamesystem loaded from the image and edits
@ -390,9 +386,71 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* @throws IOException on bad configuration * @throws IOException on bad configuration
*/ */
FSNamesystem(Configuration conf, FSImage fsImage) throws IOException { FSNamesystem(Configuration conf, FSImage fsImage) throws IOException {
this.conf = conf;
try { try {
initialize(conf, fsImage); resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
this.blockManager = new BlockManager(this, this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsOwner = UserGroupInformation.getCurrentUser();
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
DFS_PERMISSIONS_ENABLED_DEFAULT);
LOG.info("fsOwner = " + fsOwner);
LOG.info("supergroup = " + supergroup);
LOG.info("isPermissionEnabled = " + isPermissionEnabled);
final boolean persistBlocks = conf.getBoolean(DFS_PERSIST_BLOCKS_KEY,
DFS_PERSIST_BLOCKS_DEFAULT);
// block allocation has to be persisted in HA using a shared edits directory
// so that the standby has up-to-date namespace information
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);
this.persistBlocks = persistBlocks || (haEnabled && HAUtil.usesSharedEditsDir(conf));
// Sanity check the HA-related config.
if (nameserviceId != null) {
LOG.info("Determined nameservice ID: " + nameserviceId);
}
LOG.info("HA Enabled: " + haEnabled);
if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
LOG.warn("Configured NNs:\n" + DFSUtil.nnAddressesAsString(conf));
throw new IOException("Invalid configuration: a shared edits dir " +
"must not be specified if HA is not enabled.");
}
this.serverDefaults = new FsServerDefaults(
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY, DFS_SUPPORT_APPEND_DEFAULT);
LOG.info("Append Enabled: " + haEnabled);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.standbyShouldCheckpoint = conf.getBoolean(
DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
// For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled.
alwaysUseDelegationTokensForTests = conf.getBoolean(
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(fsImage, this, conf);
this.safeMode = new SafeModeInfo(conf);
} catch(IOException e) { } catch(IOException e) {
LOG.error(getClass().getSimpleName() + " initialization failed.", e); LOG.error(getClass().getSimpleName() + " initialization failed.", e);
close(); close();
@ -400,24 +458,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
} }
/**
* Initialize FSNamesystem.
*/
private void initialize(Configuration conf, FSImage fsImage)
throws IOException {
resourceRecheckInterval = conf.getLong(
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_DEFAULT);
this.systemStart = now();
this.blockManager = new BlockManager(this, this, conf);
this.datanodeStatistics = blockManager.getDatanodeManager().getDatanodeStatistics();
this.fsLock = new ReentrantReadWriteLock(true); // fair locking
setConfigurationParameters(conf);
dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(fsImage, this, conf);
this.safeMode = new SafeModeInfo(conf);
}
void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled) void loadFSImage(StartupOption startOpt, FSImage fsImage, boolean haEnabled)
throws IOException { throws IOException {
// format before starting up if requested // format before starting up if requested
@ -601,13 +641,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
/** Start services required in standby state */ /** Start services required in standby state */
void startStandbyServices() { void startStandbyServices(final Configuration conf) {
LOG.info("Starting services required for standby state"); LOG.info("Starting services required for standby state");
if (!dir.fsImage.editLog.isOpenForRead()) { if (!dir.fsImage.editLog.isOpenForRead()) {
// During startup, we're already open for read. // During startup, we're already open for read.
dir.fsImage.editLog.initSharedJournalsForRead(); dir.fsImage.editLog.initSharedJournalsForRead();
} }
editLogTailer = new EditLogTailer(this); editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start(); editLogTailer.start();
if (standbyShouldCheckpoint) { if (standbyShouldCheckpoint) {
standbyCheckpointer = new StandbyCheckpointer(conf, this); standbyCheckpointer = new StandbyCheckpointer(conf, this);
@ -768,10 +808,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DFS_NAMENODE_SHARED_EDITS_DIR_KEY); DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
return Util.stringCollectionAsURIs(dirNames); return Util.stringCollectionAsURIs(dirNames);
} }
public Configuration getConf() {
return conf;
}
@Override @Override
public void readLock() { public void readLock() {
@ -806,68 +842,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return hasReadLock() || hasWriteLock(); return hasReadLock() || hasWriteLock();
} }
/**
* Initializes some of the members from configuration
*/
private void setConfigurationParameters(Configuration conf)
throws IOException {
fsOwner = UserGroupInformation.getCurrentUser();
LOG.info("fsOwner=" + fsOwner);
this.supergroup = conf.get(DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.isPermissionEnabled = conf.getBoolean(DFS_PERMISSIONS_ENABLED_KEY,
DFS_PERMISSIONS_ENABLED_DEFAULT);
LOG.info("supergroup=" + supergroup);
LOG.info("isPermissionEnabled=" + isPermissionEnabled);
this.persistBlocks = conf.getBoolean(DFS_PERSIST_BLOCKS_KEY,
DFS_PERSIST_BLOCKS_DEFAULT);
// block allocation has to be persisted in HA using a shared edits directory
// so that the standby has up-to-date namespace information
String nameserviceId = DFSUtil.getNamenodeNameServiceId(conf);
this.haEnabled = HAUtil.isHAEnabled(conf, nameserviceId);
this.persistBlocks |= haEnabled && HAUtil.usesSharedEditsDir(conf);
// Sanity check the HA-related config.
if (nameserviceId != null) {
LOG.info("Determined nameservice ID: " + nameserviceId);
}
LOG.info("HA Enabled: " + haEnabled);
if (!haEnabled && HAUtil.usesSharedEditsDir(conf)) {
LOG.warn("Configured NNs:\n" + DFSUtil.nnAddressesAsString(conf));
throw new IOException("Invalid configuration: a shared edits dir " +
"must not be specified if HA is not enabled.");
}
this.serverDefaults = new FsServerDefaults(
conf.getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT),
conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT),
conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT),
(short) conf.getInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT),
conf.getInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT));
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
this.accessTimePrecision = conf.getLong(DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, 0);
this.supportAppends = conf.getBoolean(DFS_SUPPORT_APPEND_KEY,
DFS_SUPPORT_APPEND_DEFAULT);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.standbyShouldCheckpoint = conf.getBoolean(
DFS_HA_STANDBY_CHECKPOINTS_KEY,
DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
// For testing purposes, allow the DT secret manager to be started regardless
// of whether security is enabled.
alwaysUseDelegationTokensForTests =
conf.getBoolean(DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
}
NamespaceInfo getNamespaceInfo() { NamespaceInfo getNamespaceInfo() {
readLock(); readLock();
try { try {
@ -2759,7 +2733,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
private Lease reassignLease(Lease lease, String src, String newHolder, private Lease reassignLease(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) throws IOException { INodeFileUnderConstruction pendingFile) {
assert hasWriteLock(); assert hasWriteLock();
if(newHolder == null) if(newHolder == null)
return lease; return lease;
@ -3327,7 +3301,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
Date getStartTime() { Date getStartTime() {
return new Date(systemStart); return new Date(startTime);
} }
void finalizeUpgrade() throws IOException { void finalizeUpgrade() throws IOException {
@ -3504,7 +3478,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
if (!isPopulatingReplQueues() && !isInStandbyState()) { if (!isPopulatingReplQueues() && !isInStandbyState()) {
initializeReplQueues(); initializeReplQueues();
} }
long timeInSafemode = now() - systemStart; long timeInSafemode = now() - startTime;
NameNode.stateChangeLog.info("STATE* Leaving safe mode after " NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
+ timeInSafemode/1000 + " secs."); + timeInSafemode/1000 + " secs.");
NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode); NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode);
@ -4874,7 +4848,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* *
* @param key new delegation key. * @param key new delegation key.
*/ */
public void logUpdateMasterKey(DelegationKey key) throws IOException { public void logUpdateMasterKey(DelegationKey key) {
assert !isInSafeMode() : assert !isInSafeMode() :
"this should never be called while in safemode, since we stop " + "this should never be called while in safemode, since we stop " +
@ -4887,7 +4861,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} }
private void logReassignLease(String leaseHolder, String src, private void logReassignLease(String leaseHolder, String src,
String newHolder) throws IOException { String newHolder) {
writeLock(); writeLock();
try { try {
getEditLog().logReassignLease(leaseHolder, src, newHolder); getEditLog().logReassignLease(leaseHolder, src, newHolder);

View File

@ -1061,7 +1061,7 @@ public class NameNode {
@Override @Override
public void startStandbyServices() throws IOException { public void startStandbyServices() throws IOException {
namesystem.startStandbyServices(); namesystem.startStandbyServices(conf);
} }
@Override @Override

View File

@ -61,6 +61,7 @@ public class EditLogTailer {
private final EditLogTailerThread tailerThread; private final EditLogTailerThread tailerThread;
private final Configuration conf;
private final FSNamesystem namesystem; private final FSNamesystem namesystem;
private FSEditLog editLog; private FSEditLog editLog;
@ -98,13 +99,12 @@ public class EditLogTailer {
*/ */
private long sleepTimeMs; private long sleepTimeMs;
public EditLogTailer(FSNamesystem namesystem) { public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
this.tailerThread = new EditLogTailerThread(); this.tailerThread = new EditLogTailerThread();
this.conf = conf;
this.namesystem = namesystem; this.namesystem = namesystem;
this.editLog = namesystem.getEditLog(); this.editLog = namesystem.getEditLog();
Configuration conf = namesystem.getConf();
lastLoadTimestamp = now(); lastLoadTimestamp = now();
logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
@ -129,14 +129,12 @@ public class EditLogTailer {
} }
private InetSocketAddress getActiveNodeAddress() { private InetSocketAddress getActiveNodeAddress() {
Configuration conf = namesystem.getConf();
Configuration activeConf = HAUtil.getConfForOtherNode(conf); Configuration activeConf = HAUtil.getConfForOtherNode(conf);
return NameNode.getServiceAddress(activeConf, true); return NameNode.getServiceAddress(activeConf, true);
} }
private NamenodeProtocol getActiveNodeProxy() throws IOException { private NamenodeProtocol getActiveNodeProxy() throws IOException {
if (cachedActiveProxy == null) { if (cachedActiveProxy == null) {
Configuration conf = namesystem.getConf();
NamenodeProtocolPB proxy = NamenodeProtocolPB proxy =
RPC.waitForProxy(NamenodeProtocolPB.class, RPC.waitForProxy(NamenodeProtocolPB.class,
RPC.getProtocolVersion(NamenodeProtocolPB.class), activeAddr, conf); RPC.getProtocolVersion(NamenodeProtocolPB.class), activeAddr, conf);

View File

@ -23,8 +23,8 @@ import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -143,8 +143,7 @@ public class BlockManagerTestUtil {
* {@link DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to * {@link DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} to
* a high value to ensure that all work is calculated. * a high value to ensure that all work is calculated.
*/ */
public static int computeAllPendingWork(BlockManager bm) public static int computeAllPendingWork(BlockManager bm) {
throws IOException {
int work = computeInvalidationWork(bm); int work = computeInvalidationWork(bm);
work += bm.computeReplicationWork(Integer.MAX_VALUE); work += bm.computeReplicationWork(Integer.MAX_VALUE);
return work; return work;