HDFS-2239. Reduce access levels of the fields and methods in FSNamesystem.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1155998 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2011-08-10 01:50:51 +00:00
parent eb6e44b1ba
commit 5d5b1c6c10
23 changed files with 226 additions and 192 deletions

View File

@ -654,6 +654,9 @@ Trunk (unreleased changes)
FileJournalManager during checkpoint process (Ivan Kelly and Todd Lipcon
via todd)
HDFS-2239. Reduce access levels of the fields and methods in FSNamesystem.
(szetszwo)
OPTIMIZATIONS
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image

View File

@ -380,19 +380,8 @@ public class BlockManager {
"commitBlock length is less than the stored one "
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
block.commitBlock(commitBlock);
// Adjust disk space consumption if required
long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();
if (diff > 0) {
try {
String path = /* For finding parents */
namesystem.leaseManager.findPath(fileINode);
namesystem.dir.updateSpaceConsumed(path, 0, -diff
* fileINode.getReplication());
} catch (IOException e) {
LOG.warn("Unexpected exception while updating disk space.", e);
}
}
namesystem.updateDiskSpaceConsumed(fileINode, commitBlock);
}
/**
@ -682,8 +671,26 @@ public class BlockManager {
minReplication);
}
/** Get all blocks with location information from a datanode. */
public BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
/**
* return a list of blocks & their locations on <code>datanode</code> whose
* total size is <code>size</code>
*
* @param datanode on which blocks are located
* @param size total size of blocks
*/
public BlocksWithLocations getBlocks(DatanodeID datanode, long size
) throws IOException {
namesystem.readLock();
try {
namesystem.checkSuperuserPrivilege();
return getBlocksWithLocations(datanode, size);
} finally {
namesystem.readUnlock();
}
}
/** Get all blocks with location information from a datanode. */
private BlocksWithLocations getBlocksWithLocations(final DatanodeID datanode,
final long size) throws UnregisteredNodeException {
final DatanodeDescriptor node = getDatanodeManager().getDatanode(datanode);
if (node == null) {

View File

@ -649,8 +649,24 @@ public class DatanodeManager {
heartbeatManager.addDatanode(nodeDescr);
}
/**
* Rereads conf to get hosts and exclude list file names.
* Rereads the files to update the hosts and exclude lists. It
* checks if any of the hosts have changed states:
*/
public void refreshNodes(final Configuration conf) throws IOException {
namesystem.checkSuperuserPrivilege();
refreshHostsReader(conf);
namesystem.writeLock();
try {
refreshDatanodes();
} finally {
namesystem.writeUnlock();
}
}
/** Reread include/exclude files. */
public void refreshHostsReader(Configuration conf) throws IOException {
private void refreshHostsReader(Configuration conf) throws IOException {
// Reread the conf to get dfs.hosts and dfs.hosts.exclude filenames.
// Update the file names and refresh internal includes and excludes list.
if (conf == null) {
@ -662,15 +678,12 @@ public class DatanodeManager {
}
/**
* Rereads the config to get hosts and exclude list file names.
* Rereads the files to update the hosts and exclude lists. It
* checks if any of the hosts have changed states:
* 1. Added to hosts --> no further work needed here.
* 2. Removed from hosts --> mark AdminState as decommissioned.
* 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission.
*/
public void refreshDatanodes() throws IOException {
private void refreshDatanodes() throws IOException {
for(DatanodeDescriptor node : datanodeMap.values()) {
// Check if not include.
if (!inHostsList(node, null)) {

View File

@ -1341,7 +1341,7 @@ public class FSDirectory implements Closeable {
* @throws QuotaExceededException if the new count violates any quota limit
* @throws FileNotFound if path does not exist.
*/
public void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
throws QuotaExceededException,
FileNotFoundException,
UnresolvedLinkException {

View File

@ -137,10 +137,6 @@ public class FSImage implements Closeable {
FSImage.getCheckpointEditsDirs(conf, null));
storage = new NNStorage(conf, imageDirs, editsDirs);
if (ns != null) {
storage.setUpgradeManager(ns.upgradeManager);
}
if(conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_RESTORE_DEFAULT)) {
storage.setRestoreFailedStorage(true);

View File

@ -104,13 +104,13 @@ import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.metrics.FSNamesystemMBean;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.hdfs.util.RwLock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
@ -146,8 +146,8 @@ import org.mortbay.util.ajax.JSON;
***************************************************/
@InterfaceAudience.Private
@Metrics(context="dfs")
public class FSNamesystem implements FSConstants, FSNamesystemMBean,
FSClusterStats, NameNodeMXBean {
public class FSNamesystem implements RwLock, FSConstants, FSClusterStats,
FSNamesystemMBean, NameNodeMXBean {
static final Log LOG = LogFactory.getLog(FSNamesystem.class);
private static final ThreadLocal<StringBuilder> auditBuffer =
@ -211,20 +211,16 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
//
// Stores the correct file name hierarchy
//
public FSDirectory dir;
FSDirectory dir;
private BlockManager blockManager;
private DatanodeStatistics datanodeStatistics;
// Block pool ID used by this namenode
String blockPoolId;
private String blockPoolId;
public LeaseManager leaseManager = new LeaseManager(this);
LeaseManager leaseManager = new LeaseManager(this);
//
// Threaded object that checks to see if we have been
// getting heartbeats from all clients.
//
public Daemon lmthread = null; // LeaseMonitor thread
Daemon lmthread = null; // LeaseMonitor thread
Daemon smmthread = null; // SafeModeMonitor thread
Daemon nnrmthread = null; // NamenodeResourceMonitor thread
@ -330,7 +326,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
}
public static Collection<URI> getStorageDirs(Configuration conf,
private static Collection<URI> getStorageDirs(Configuration conf,
String propertyName) {
Collection<String> dirNames = conf.getTrimmedStringCollection(propertyName);
StartupOption startOpt = NameNode.getStartupOption(conf);
@ -364,31 +360,31 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
return getStorageDirs(conf, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
}
// utility methods to acquire and release read lock and write lock
@Override
public void readLock() {
this.fsLock.readLock().lock();
}
@Override
public void readUnlock() {
this.fsLock.readLock().unlock();
}
@Override
public void writeLock() {
this.fsLock.writeLock().lock();
}
@Override
public void writeUnlock() {
this.fsLock.writeLock().unlock();
}
@Override
public boolean hasWriteLock() {
return this.fsLock.isWriteLockedByCurrentThread();
}
boolean hasReadLock() {
@Override
public boolean hasReadLock() {
return this.fsLock.getReadHoldCount() > 0;
}
@Override
public boolean hasReadOrWriteLock() {
return hasReadLock() || hasWriteLock();
}
@ -473,7 +469,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
try {
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
getClusterId(), getBlockPoolId(),
dir.fsImage.getStorage().getCTime(), getDistributedUpgradeVersion());
dir.fsImage.getStorage().getCTime(),
upgradeManager.getUpgradeVersion());
} finally {
readUnlock();
}
@ -484,7 +481,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* Causes heartbeat and lease daemons to stop; waits briefly for
* them to finish, but a short timeout returns control back to caller.
*/
public void close() {
void close() {
fsRunning = false;
try {
if (blockManager != null) blockManager.close();
@ -562,30 +559,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
return accessTimePrecision > 0;
}
/////////////////////////////////////////////////////////
//
// These methods are called by secondary namenodes
//
/////////////////////////////////////////////////////////
/**
* return a list of blocks & their locations on <code>datanode</code> whose
* total size is <code>size</code>
*
* @param datanode on which blocks are located
* @param size total size of blocks
*/
BlocksWithLocations getBlocks(DatanodeID datanode, long size)
throws IOException {
readLock();
try {
checkSuperuserPrivilege();
return blockManager.getBlocksWithLocations(datanode, size);
} finally {
readUnlock();
}
}
/////////////////////////////////////////////////////////
//
// These methods are called by HadoopFS clients
@ -765,7 +738,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* @param srcs
* @throws IOException
*/
public void concat(String target, String [] srcs)
void concat(String target, String [] srcs)
throws IOException, UnresolvedLinkException {
if(FSNamesystem.LOG.isDebugEnabled()) {
FSNamesystem.LOG.debug("concat " + Arrays.toString(srcs) +
@ -813,7 +786,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
/** See {@link #concat(String, String[])} */
public void concatInternal(String target, String [] srcs)
private void concatInternal(String target, String [] srcs)
throws IOException, UnresolvedLinkException {
assert hasWriteLock();
@ -1429,7 +1402,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* are replicated. Will return an empty 2-elt array if we want the
* client to "try again later".
*/
public LocatedBlock getAdditionalBlock(String src,
LocatedBlock getAdditionalBlock(String src,
String clientName,
ExtendedBlock previous,
HashMap<Node, Node> excludedNodes
@ -1632,7 +1605,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* (e.g if not all blocks have reached minimum replication yet)
* @throws IOException on error (eg lease mismatch, file not open, file deleted)
*/
public boolean completeFile(String src, String holder, ExtendedBlock last)
boolean completeFile(String src, String holder, ExtendedBlock last)
throws SafeModeException, UnresolvedLinkException, IOException {
checkBlock(last);
boolean success = false;
@ -2258,7 +2231,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
return false;
}
Lease reassignLease(Lease lease, String src, String newHolder,
private Lease reassignLease(Lease lease, String src, String newHolder,
INodeFileUnderConstruction pendingFile) throws IOException {
assert hasWriteLock();
if(newHolder == null)
@ -2274,6 +2247,22 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
return leaseManager.reassignLease(lease, src, newHolder);
}
/** Update disk space consumed. */
public void updateDiskSpaceConsumed(final INodeFileUnderConstruction fileINode,
final Block commitBlock) throws IOException {
assert hasWriteLock();
// Adjust disk space consumption if required
final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();
if (diff > 0) {
try {
String path = leaseManager.findPath(fileINode);
dir.updateSpaceConsumed(path, 0, -diff * fileINode.getReplication());
} catch (IOException e) {
LOG.warn("Unexpected exception while updating disk space.", e);
}
}
}
private void finalizeINodeFileUnderConstruction(String src,
INodeFileUnderConstruction pendingFile)
@ -2473,8 +2462,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
*
* @see org.apache.hadoop.hdfs.server.datanode.DataNode
*/
public void registerDatanode(DatanodeRegistration nodeReg)
throws IOException {
void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
writeLock();
try {
getBlockManager().getDatanodeManager().registerDatanode(nodeReg);
@ -2505,7 +2493,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* @return an array of datanode commands
* @throws IOException
*/
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes)
throws IOException {
@ -2521,7 +2509,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
//check distributed upgrade
DatanodeCommand cmd = getDistributedUpgradeCommand();
DatanodeCommand cmd = upgradeManager.getBroadcastCommand();
if (cmd != null) {
return new DatanodeCommand[] {cmd};
}
@ -2737,30 +2725,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
}
public Date getStartTime() {
Date getStartTime() {
return new Date(systemStart);
}
/**
* Rereads the config to get hosts and exclude list file names.
* Rereads the files to update the hosts and exclude lists. It
* checks if any of the hosts have changed states:
* 1. Added to hosts --> no further work needed here.
* 2. Removed from hosts --> mark AdminState as decommissioned.
* 3. Added to exclude --> start decommission.
* 4. Removed from exclude --> stop decommission.
*/
public void refreshNodes(Configuration conf) throws IOException {
checkSuperuserPrivilege();
getBlockManager().getDatanodeManager().refreshHostsReader(conf);
writeLock();
try {
getBlockManager().getDatanodeManager().refreshDatanodes();
} finally {
writeUnlock();
}
}
void finalizeUpgrade() throws IOException {
checkSuperuserPrivilege();
getFSImage().finalizeUpgrade();
@ -2908,7 +2876,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
// verify whether a distributed upgrade needs to be started
boolean needUpgrade = false;
try {
needUpgrade = startDistributedUpgradeIfNeeded();
needUpgrade = upgradeManager.startUpgrade();
} catch(IOException e) {
FSNamesystem.LOG.error("IOException in startDistributedUpgradeIfNeeded", e);
}
@ -3101,10 +3069,10 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
leaveMsg = "Safe mode will be turned off automatically";
}
if(isManual()) {
if(getDistributedUpgradeState())
if(upgradeManager.getUpgradeState())
return leaveMsg + " upon completion of " +
"the distributed upgrade: upgrade progress = " +
getDistributedUpgradeStatus() + "%";
upgradeManager.getUpgradeStatus() + "%";
leaveMsg = "Use \"hdfs dfsadmin -safemode leave\" to turn safe mode off";
}
@ -3306,7 +3274,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
/**
* Set the total number of blocks in the system.
*/
void setBlockTotal() {
private void setBlockTotal() {
// safeMode is volatile, and may be set to null at any time
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null)
@ -3327,7 +3295,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* Get the total number of COMPLETE blocks in the system.
* For safe mode only complete blocks are counted.
*/
long getCompleteBlocksTotal() {
private long getCompleteBlocksTotal() {
// Calculate number of blocks under construction
long numUCBlocks = 0;
readLock();
@ -3398,7 +3366,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
return;
}
if(getDistributedUpgradeState())
if(upgradeManager.getUpgradeState())
throw new SafeModeException("Distributed upgrade is in progress",
safeMode);
safeMode.leave(checkForUpgrades);
@ -3487,26 +3455,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
return upgradeManager.processUpgradeCommand(comm);
}
int getDistributedUpgradeVersion() {
return upgradeManager.getUpgradeVersion();
}
UpgradeCommand getDistributedUpgradeCommand() throws IOException {
return upgradeManager.getBroadcastCommand();
}
boolean getDistributedUpgradeState() {
return upgradeManager.getUpgradeState();
}
short getDistributedUpgradeStatus() {
return upgradeManager.getUpgradeStatus();
}
boolean startDistributedUpgradeIfNeeded() throws IOException {
return upgradeManager.startUpgrade();
}
PermissionStatus createFsOwnerPermissions(FsPermission permission) {
return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
}
@ -3536,7 +3484,8 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
return checkPermission(path, false, null, null, null, null);
}
private void checkSuperuserPrivilege() throws AccessControlException {
/** Check if the user has superuser privilege. */
public void checkSuperuserPrivilege() throws AccessControlException {
if (isPermissionEnabled) {
FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
}
@ -3644,7 +3593,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* Register the FSNamesystem MBean using the name
* "hadoop:service=NameNode,name=FSNamesystemState"
*/
void registerMBean() {
private void registerMBean() {
// We can only implement one MXBean interface, so we keep the old one.
try {
StandardMBean bean = new StandardMBean(this, FSNamesystemMBean.class);
@ -3805,7 +3754,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
}
/** @see updatePipeline(String, ExtendedBlock, ExtendedBlock, DatanodeID[]) */
void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException {
assert hasWriteLock();
@ -4043,7 +3992,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* Returns the DelegationTokenSecretManager instance in the namesystem.
* @return delegation token secret manager object
*/
public DelegationTokenSecretManager getDelegationTokenSecretManager() {
DelegationTokenSecretManager getDelegationTokenSecretManager() {
return dtSecretManager;
}
@ -4096,7 +4045,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* @throws InvalidToken
* @throws IOException
*/
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
long expiryTime;
writeLock();
@ -4127,7 +4076,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
* @param token
* @throws IOException
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
writeLock();
try {

View File

@ -626,7 +626,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
"Unexpected not positive size: "+size);
}
return namesystem.getBlocks(datanode, size);
return namesystem.getBlockManager().getBlocks(datanode, size);
}
@Override // NamenodeProtocol
@ -1039,7 +1039,8 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol
public void refreshNodes() throws IOException {
namesystem.refreshNodes(new HdfsConfiguration());
namesystem.getBlockManager().getDatanodeManager().refreshNodes(
new HdfsConfiguration());
}
@Override // NamenodeProtocol

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
/** Read-write lock interface. */
public interface RwLock {
/** Acquire read lock. */
public void readLock();
/** Release read lock. */
public void readUnlock();
/** Check if the current thread holds read lock. */
public boolean hasReadLock();
/** Acquire write lock. */
public void writeLock();
/** Release write lock. */
public void writeUnlock();
/** Check if the current thread holds write lock. */
public boolean hasWriteLock();
/** Check if the current thread holds read or write lock. */
public boolean hasReadOrWriteLock();
}

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@ -47,8 +48,8 @@ public class TestResolveHdfsSymlink {
public static void setUp() throws IOException {
Configuration conf = new HdfsConfiguration();
cluster = new MiniDFSCluster.Builder(conf).build();
cluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
cluster.waitActive();
NameNodeAdapter.getDtSecretManager(cluster.getNamesystem()).startThreads();
}
@AfterClass

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -52,7 +53,7 @@ public class TestViewFileSystemHdfs extends ViewFileSystemBaseTest {
SupportsBlocks = true;
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
cluster.waitClusterUp();
cluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
NameNodeAdapter.getDtSecretManager(cluster.getNamesystem()).startThreads();
fHdfs = cluster.getFileSystem();
defaultWorkingDirectory = fHdfs.makeQualified( new Path("/user/" +
UserGroupInformation.getCurrentUser().getShortUserName()));

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -52,7 +53,7 @@ public class TestViewFsHdfs extends ViewFsBaseTest {
SupportsBlocks = true;
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(2).build();
cluster.waitClusterUp();
cluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
NameNodeAdapter.getDtSecretManager(cluster.getNamesystem()).startThreads();
fc = FileContext.getFileContext(cluster.getURI(0), CONF);
defaultWorkingDirectory = fc.makeQualified( new Path("/user/" +
UserGroupInformation.getCurrentUser().getShortUserName()));

View File

@ -48,13 +48,13 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@ -71,7 +71,6 @@ import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
@ -1659,9 +1658,7 @@ public class MiniDFSCluster {
* Set the softLimit and hardLimit of client lease periods
*/
public void setLeasePeriod(long soft, long hard) {
final FSNamesystem namesystem = getNamesystem();
namesystem.leaseManager.setLeasePeriod(soft, hard);
namesystem.lmthread.interrupt();
NameNodeAdapter.setLeasePeriod(getNamesystem(), soft, hard);
}
/**

View File

@ -17,7 +17,8 @@
*/
package org.apache.hadoop.hdfs;
import java.io.*;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@ -27,10 +28,6 @@ import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
public class TestDFSRemove extends junit.framework.TestCase {
static int countLease(MiniDFSCluster cluster) {
return cluster.getNamesystem().leaseManager.countLease();
}
final Path dir = new Path("/test/remove/");
void list(FileSystem fs, String name) throws IOException {

View File

@ -17,16 +17,18 @@
*/
package org.apache.hadoop.hdfs;
import java.io.*;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
public class TestDFSRename extends junit.framework.TestCase {
static int countLease(MiniDFSCluster cluster) {
return cluster.getNamesystem().leaseManager.countLease();
return NameNodeAdapter.getLeaseManager(cluster.getNamesystem()).countLease();
}
final Path dir = new Path("/test/rename/");

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@ -30,14 +34,12 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import static org.junit.Assert.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -224,7 +226,7 @@ public class TestDecommission {
}
nodes.add(nodename);
writeConfigFile(excludeFile, nodes);
cluster.getNamesystem(nnIndex).refreshNodes(conf);
refreshNodes(cluster.getNamesystem(nnIndex), conf);
DatanodeInfo ret = NameNodeAdapter.getDatanode(
cluster.getNamesystem(nnIndex), info[index]);
waitNodeState(ret, waitForState);
@ -235,7 +237,7 @@ public class TestDecommission {
private void recomissionNode(DatanodeInfo decommissionedNode) throws IOException {
LOG.info("Recommissioning node: " + decommissionedNode.getName());
writeConfigFile(excludeFile, null);
cluster.getNamesystem().refreshNodes(conf);
refreshNodes(cluster.getNamesystem(), conf);
waitNodeState(decommissionedNode, AdminStates.NORMAL);
}
@ -284,6 +286,11 @@ public class TestDecommission {
validateCluster(client, numDatanodes);
}
}
static void refreshNodes(final FSNamesystem ns, final Configuration conf
) throws IOException {
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
}
private void verifyStats(NameNode namenode, FSNamesystem fsn,
DatanodeInfo node, boolean decommissioning) throws InterruptedException {
@ -465,7 +472,7 @@ public class TestDecommission {
// Stop decommissioning and verify stats
writeConfigFile(excludeFile, null);
fsn.refreshNodes(conf);
refreshNodes(fsn, conf);
DatanodeInfo ret = NameNodeAdapter.getDatanode(fsn, downnode);
waitNodeState(ret, AdminStates.NORMAL);
verifyStats(namenode, fsn, ret, false);
@ -509,7 +516,7 @@ public class TestDecommission {
writeConfigFile(hostsFile, list);
for (int j = 0; j < numNameNodes; j++) {
cluster.getNamesystem(j).refreshNodes(conf);
refreshNodes(cluster.getNamesystem(j), conf);
DFSClient client = getDfsClient(cluster.getNameNode(j), conf);
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Assert;
import org.junit.Test;
@ -31,7 +32,8 @@ import org.mockito.Mockito;
public class TestLease {
static boolean hasLease(MiniDFSCluster cluster, Path src) {
return cluster.getNamesystem().leaseManager.getLeaseByPath(src.toString()) != null;
return NameNodeAdapter.getLeaseManager(cluster.getNamesystem()
).getLeaseByPath(src.toString()) != null;
}
final Path dir = new Path("/test/lease/");

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
public class TestLeaseRecovery extends junit.framework.TestCase {
static final int BLOCK_SIZE = 1024;
@ -133,7 +134,7 @@ public class TestLeaseRecovery extends junit.framework.TestCase {
DFSTestUtil.waitReplication(dfs, filepath, (short)1);
waitLeaseRecovery(cluster);
// verify that we still cannot recover the lease
LeaseManager lm = cluster.getNamesystem().leaseManager;
LeaseManager lm = NameNodeAdapter.getLeaseManager(cluster.getNamesystem());
assertTrue("Found " + lm.countLease() + " lease, expected 1", lm.countLease() == 1);
cluster.getNameNode().setSafeMode(FSConstants.SafeModeAction.SAFEMODE_LEAVE);
}

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
@ -48,7 +49,8 @@ import org.junit.Test;
public class TestDelegationToken {
private MiniDFSCluster cluster;
Configuration config;
private DelegationTokenSecretManager dtSecretManager;
private Configuration config;
private static final Log LOG = LogFactory.getLog(TestDelegationToken.class);
@Before
@ -61,7 +63,9 @@ public class TestDelegationToken {
FileSystem.setDefaultUri(config, "hdfs://localhost:" + "0");
cluster = new MiniDFSCluster.Builder(config).build();
cluster.waitActive();
cluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
dtSecretManager = NameNodeAdapter.getDtSecretManager(
cluster.getNamesystem());
dtSecretManager.startThreads();
}
@After
@ -73,8 +77,6 @@ public class TestDelegationToken {
private Token<DelegationTokenIdentifier> generateDelegationToken(
String owner, String renewer) {
DelegationTokenSecretManager dtSecretManager = cluster.getNamesystem()
.getDelegationTokenSecretManager();
DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(new Text(
owner), new Text(renewer), null);
return new Token<DelegationTokenIdentifier>(dtId, dtSecretManager);
@ -82,8 +84,6 @@ public class TestDelegationToken {
@Test
public void testDelegationTokenSecretManager() throws Exception {
DelegationTokenSecretManager dtSecretManager = cluster.getNamesystem()
.getDelegationTokenSecretManager();
Token<DelegationTokenIdentifier> token = generateDelegationToken(
"SomeUser", "JobTracker");
// Fake renewer should not be able to renew
@ -122,8 +122,6 @@ public class TestDelegationToken {
@Test
public void testCancelDelegationToken() throws Exception {
DelegationTokenSecretManager dtSecretManager = cluster.getNamesystem()
.getDelegationTokenSecretManager();
Token<DelegationTokenIdentifier> token = generateDelegationToken(
"SomeUser", "JobTracker");
//Fake renewer should not be able to renew
@ -144,7 +142,6 @@ public class TestDelegationToken {
@Test
public void testDelegationTokenDFSApi() throws Exception {
DelegationTokenSecretManager dtSecretManager = cluster.getNamesystem().getDelegationTokenSecretManager();
DistributedFileSystem dfs = (DistributedFileSystem) cluster.getFileSystem();
Token<DelegationTokenIdentifier> token = dfs.getDelegationToken("JobTracker");
DelegationTokenIdentifier identifier = new DelegationTokenIdentifier();

View File

@ -31,19 +31,20 @@ import java.util.Enumeration;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.Text;
import org.apache.commons.logging.*;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.TestDoAsEffectiveUser;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -98,7 +99,7 @@ public class TestDelegationTokenForProxyUser {
FileSystem.setDefaultUri(config, "hdfs://localhost:" + "0");
cluster = new MiniDFSCluster.Builder(config).build();
cluster.waitActive();
cluster.getNamesystem().getDelegationTokenSecretManager().startThreads();
NameNodeAdapter.getDtSecretManager(cluster.getNamesystem()).startThreads();
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
}

View File

@ -413,7 +413,7 @@ public class TestBlocksWithNotEnoughRacks {
fs.getFileStatus(filePath), 0, Long.MAX_VALUE);
String name = locs[0].getNames()[0];
DFSTestUtil.writeFile(localFileSys, excludeFile, name);
ns.refreshNodes(conf);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name);
// Check the block still has sufficient # replicas across racks
@ -468,7 +468,7 @@ public class TestBlocksWithNotEnoughRacks {
if (!top.startsWith("/rack2")) {
String name = top.substring("/rack1".length()+1);
DFSTestUtil.writeFile(localFileSys, excludeFile, name);
ns.refreshNodes(conf);
ns.getBlockManager().getDatanodeManager().refreshNodes(conf);
DFSTestUtil.waitForDecommission(fs, name);
break;
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import java.util.ArrayList;
import junit.framework.TestCase;
@ -76,7 +75,7 @@ public class TestHeartbeatHandling extends TestCase {
dd.addBlockToBeReplicated(
new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP), ONE_TARGET);
}
DatanodeCommand[]cmds = sendHeartBeat(nodeReg, dd, namesystem);
DatanodeCommand[]cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
@ -86,26 +85,26 @@ public class TestHeartbeatHandling extends TestCase {
blockList.add(new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP));
}
dd.addBlocksToBeInvalidated(blockList);
cmds = sendHeartBeat(nodeReg, dd, namesystem);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
assertEquals(2, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
cmds = sendHeartBeat(nodeReg, dd, namesystem);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
assertEquals(2, cmds.length);
assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
cmds = sendHeartBeat(nodeReg, dd, namesystem);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
assertEquals(1, cmds.length);
assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[0].getAction());
assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
cmds = sendHeartBeat(nodeReg, dd, namesystem);
cmds = NameNodeAdapter.sendHeartBeat(nodeReg, dd, namesystem);
assertEquals(null, cmds);
}
} finally {
@ -115,10 +114,4 @@ public class TestHeartbeatHandling extends TestCase {
cluster.shutdown();
}
}
private static DatanodeCommand[] sendHeartBeat(DatanodeRegistration nodeReg,
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
return namesystem.handleHeartbeat(nodeReg, dd.getCapacity(),
dd.getDfsUsed(), dd.getRemaining(), dd.getBlockPoolUsed(), 0, 0, 0);
}
}

View File

@ -21,7 +21,10 @@ import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.ipc.Server;
/**
@ -52,11 +55,32 @@ public class NameNodeAdapter {
return namenode.server;
}
public static DelegationTokenSecretManager getDtSecretManager(
final FSNamesystem ns) {
return ns.getDelegationTokenSecretManager();
}
public static DatanodeCommand[] sendHeartBeat(DatanodeRegistration nodeReg,
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
return namesystem.handleHeartbeat(nodeReg, dd.getCapacity(),
dd.getDfsUsed(), dd.getRemaining(), dd.getBlockPoolUsed(), 0, 0, 0);
}
public static boolean setReplication(final FSNamesystem ns,
final String src, final short replication) throws IOException {
return ns.setReplication(src, replication);
}
public static LeaseManager getLeaseManager(final FSNamesystem ns) {
return ns.leaseManager;
}
/** Set the softLimit and hardLimit of client lease periods. */
public static void setLeasePeriod(final FSNamesystem namesystem, long soft, long hard) {
getLeaseManager(namesystem).setLeasePeriod(soft, hard);
namesystem.lmthread.interrupt();
}
public static String getLeaseHolderForPath(NameNode namenode, String path) {
return namenode.getNamesystem().leaseManager.getLeaseByPath(path).getHolder();
}

View File

@ -148,7 +148,7 @@ public class TestDecommissioningStatus {
/*
* Decommissions the node at the given index
*/
private String decommissionNode(FSNamesystem namesystem, Configuration conf,
private String decommissionNode(FSNamesystem namesystem,
DFSClient client, FileSystem localFileSys, int nodeIndex)
throws IOException {
DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
@ -160,7 +160,6 @@ public class TestDecommissioningStatus {
ArrayList<String> nodes = new ArrayList<String>(decommissionedNodes);
nodes.add(nodename);
writeConfigFile(localFileSys, excludeFile, nodes);
namesystem.refreshNodes(conf);
return nodename;
}
@ -203,8 +202,8 @@ public class TestDecommissioningStatus {
FSNamesystem fsn = cluster.getNamesystem();
final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
for (int iteration = 0; iteration < numDatanodes; iteration++) {
String downnode = decommissionNode(fsn, conf, client, localFileSys,
iteration);
String downnode = decommissionNode(fsn, client, localFileSys, iteration);
dm.refreshNodes(conf);
decommissionedNodes.add(downnode);
Thread.sleep(5000);
final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
@ -224,7 +223,7 @@ public class TestDecommissioningStatus {
// This will remove the datanodes from decommissioning list and
// make them available again.
writeConfigFile(localFileSys, excludeFile, null);
fsn.refreshNodes(conf);
dm.refreshNodes(conf);
st1.close();
cleanupFile(fileSys, file1);
cleanupFile(fileSys, file2);