HDFS-2266. Add Namesystem and SafeMode interfaces to avoid directly referring to FSNamesystem in BlockManager.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1160493 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
513f17d115
commit
2892f6d817
|
@ -676,6 +676,9 @@ Trunk (unreleased changes)
|
||||||
HDFS-2273. Refactor BlockManager.recentInvalidateSets to a new class.
|
HDFS-2273. Refactor BlockManager.recentInvalidateSets to a new class.
|
||||||
(szetszwo)
|
(szetszwo)
|
||||||
|
|
||||||
|
HDFS-2266. Add Namesystem and SafeMode interfaces to avoid directly
|
||||||
|
referring to FSNamesystem in BlockManager. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||||
|
|
|
@ -58,6 +58,7 @@ import org.apache.hadoop.hdfs.server.namenode.INode;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
|
@ -77,7 +78,7 @@ public class BlockManager {
|
||||||
/** Default load factor of map */
|
/** Default load factor of map */
|
||||||
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
|
public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
|
||||||
|
|
||||||
private final FSNamesystem namesystem;
|
private final Namesystem namesystem;
|
||||||
|
|
||||||
private final DatanodeManager datanodeManager;
|
private final DatanodeManager datanodeManager;
|
||||||
private final HeartbeatManager heartbeatManager;
|
private final HeartbeatManager heartbeatManager;
|
||||||
|
@ -178,7 +179,7 @@ public class BlockManager {
|
||||||
|
|
||||||
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
|
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
|
||||||
blockplacement = BlockPlacementPolicy.getInstance(
|
blockplacement = BlockPlacementPolicy.getInstance(
|
||||||
conf, namesystem, datanodeManager.getNetworkTopology());
|
conf, fsn, datanodeManager.getNetworkTopology());
|
||||||
pendingReplications = new PendingReplicationBlocks(conf.getInt(
|
pendingReplications = new PendingReplicationBlocks(conf.getInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
|
||||||
|
@ -374,23 +375,21 @@ public class BlockManager {
|
||||||
/**
|
/**
|
||||||
* Commit a block of a file
|
* Commit a block of a file
|
||||||
*
|
*
|
||||||
* @param fileINode file inode
|
|
||||||
* @param block block to be committed
|
* @param block block to be committed
|
||||||
* @param commitBlock - contains client reported block length and generation
|
* @param commitBlock - contains client reported block length and generation
|
||||||
|
* @return true if the block is changed to committed state.
|
||||||
* @throws IOException if the block does not have at least a minimal number
|
* @throws IOException if the block does not have at least a minimal number
|
||||||
* of replicas reported from data-nodes.
|
* of replicas reported from data-nodes.
|
||||||
*/
|
*/
|
||||||
private void commitBlock(INodeFileUnderConstruction fileINode,
|
private boolean commitBlock(final BlockInfoUnderConstruction block,
|
||||||
BlockInfoUnderConstruction block,
|
final Block commitBlock) throws IOException {
|
||||||
Block commitBlock) throws IOException {
|
|
||||||
if (block.getBlockUCState() == BlockUCState.COMMITTED)
|
if (block.getBlockUCState() == BlockUCState.COMMITTED)
|
||||||
return;
|
return false;
|
||||||
assert block.getNumBytes() <= commitBlock.getNumBytes() :
|
assert block.getNumBytes() <= commitBlock.getNumBytes() :
|
||||||
"commitBlock length is less than the stored one "
|
"commitBlock length is less than the stored one "
|
||||||
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
|
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
|
||||||
block.commitBlock(commitBlock);
|
block.commitBlock(commitBlock);
|
||||||
|
return true;
|
||||||
namesystem.updateDiskSpaceConsumed(fileINode, commitBlock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -399,24 +398,24 @@ public class BlockManager {
|
||||||
*
|
*
|
||||||
* @param fileINode file inode
|
* @param fileINode file inode
|
||||||
* @param commitBlock - contains client reported block length and generation
|
* @param commitBlock - contains client reported block length and generation
|
||||||
|
* @return true if the last block is changed to committed state.
|
||||||
* @throws IOException if the block does not have at least a minimal number
|
* @throws IOException if the block does not have at least a minimal number
|
||||||
* of replicas reported from data-nodes.
|
* of replicas reported from data-nodes.
|
||||||
*/
|
*/
|
||||||
public void commitOrCompleteLastBlock(INodeFileUnderConstruction fileINode,
|
public boolean commitOrCompleteLastBlock(INodeFileUnderConstruction fileINode,
|
||||||
Block commitBlock) throws IOException {
|
Block commitBlock) throws IOException {
|
||||||
|
|
||||||
if(commitBlock == null)
|
if(commitBlock == null)
|
||||||
return; // not committing, this is a block allocation retry
|
return false; // not committing, this is a block allocation retry
|
||||||
BlockInfo lastBlock = fileINode.getLastBlock();
|
BlockInfo lastBlock = fileINode.getLastBlock();
|
||||||
if(lastBlock == null)
|
if(lastBlock == null)
|
||||||
return; // no blocks in file yet
|
return false; // no blocks in file yet
|
||||||
if(lastBlock.isComplete())
|
if(lastBlock.isComplete())
|
||||||
return; // already completed (e.g. by syncBlock)
|
return false; // already completed (e.g. by syncBlock)
|
||||||
|
|
||||||
commitBlock(fileINode, (BlockInfoUnderConstruction)lastBlock, commitBlock);
|
|
||||||
|
|
||||||
|
final boolean b = commitBlock((BlockInfoUnderConstruction)lastBlock, commitBlock);
|
||||||
if(countNodes(lastBlock).liveReplicas() >= minReplication)
|
if(countNodes(lastBlock).liveReplicas() >= minReplication)
|
||||||
completeBlock(fileINode,fileINode.numBlocks()-1);
|
completeBlock(fileINode,fileINode.numBlocks()-1);
|
||||||
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -426,8 +425,8 @@ public class BlockManager {
|
||||||
* @throws IOException if the block does not have at least a minimal number
|
* @throws IOException if the block does not have at least a minimal number
|
||||||
* of replicas reported from data-nodes.
|
* of replicas reported from data-nodes.
|
||||||
*/
|
*/
|
||||||
BlockInfo completeBlock(INodeFile fileINode, int blkIndex)
|
private BlockInfo completeBlock(final INodeFile fileINode,
|
||||||
throws IOException {
|
final int blkIndex) throws IOException {
|
||||||
if(blkIndex < 0)
|
if(blkIndex < 0)
|
||||||
return null;
|
return null;
|
||||||
BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
|
BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
|
||||||
|
@ -444,8 +443,8 @@ public class BlockManager {
|
||||||
return blocksMap.replaceBlock(completeBlock);
|
return blocksMap.replaceBlock(completeBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
BlockInfo completeBlock(INodeFile fileINode, BlockInfo block)
|
private BlockInfo completeBlock(final INodeFile fileINode,
|
||||||
throws IOException {
|
final BlockInfo block) throws IOException {
|
||||||
BlockInfo[] fileBlocks = fileINode.getBlocks();
|
BlockInfo[] fileBlocks = fileINode.getBlocks();
|
||||||
for(int idx = 0; idx < fileBlocks.length; idx++)
|
for(int idx = 0; idx < fileBlocks.length; idx++)
|
||||||
if(fileBlocks[idx] == block) {
|
if(fileBlocks[idx] == block) {
|
||||||
|
@ -491,8 +490,9 @@ public class BlockManager {
|
||||||
invalidateBlocks.remove(datanodeId, oldBlock);
|
invalidateBlocks.remove(datanodeId, oldBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
long fileLength = fileINode.computeContentSummary().getLength();
|
final long fileLength = fileINode.computeContentSummary().getLength();
|
||||||
return createLocatedBlock(ucBlock, fileLength - ucBlock.getNumBytes());
|
final long pos = fileLength - ucBlock.getNumBytes();
|
||||||
|
return createLocatedBlock(ucBlock, pos, AccessMode.WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -513,8 +513,8 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
|
private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
|
||||||
final long offset, final long length, final int nrBlocksToReturn
|
final long offset, final long length, final int nrBlocksToReturn,
|
||||||
) throws IOException {
|
final AccessMode mode) throws IOException {
|
||||||
int curBlk = 0;
|
int curBlk = 0;
|
||||||
long curPos = 0, blkSize = 0;
|
long curPos = 0, blkSize = 0;
|
||||||
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
|
int nrBlocks = (blocks[0].getNumBytes() == 0) ? 0 : blocks.length;
|
||||||
|
@ -533,7 +533,7 @@ public class BlockManager {
|
||||||
long endOff = offset + length;
|
long endOff = offset + length;
|
||||||
List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
|
List<LocatedBlock> results = new ArrayList<LocatedBlock>(blocks.length);
|
||||||
do {
|
do {
|
||||||
results.add(createLocatedBlock(blocks[curBlk], curPos));
|
results.add(createLocatedBlock(blocks[curBlk], curPos, mode));
|
||||||
curPos += blocks[curBlk].getNumBytes();
|
curPos += blocks[curBlk].getNumBytes();
|
||||||
curBlk++;
|
curBlk++;
|
||||||
} while (curPos < endOff
|
} while (curPos < endOff
|
||||||
|
@ -542,6 +542,15 @@ public class BlockManager {
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos,
|
||||||
|
final BlockTokenSecretManager.AccessMode mode) throws IOException {
|
||||||
|
final LocatedBlock lb = createLocatedBlock(blk, pos);
|
||||||
|
if (mode != null) {
|
||||||
|
setBlockToken(lb, mode);
|
||||||
|
}
|
||||||
|
return lb;
|
||||||
|
}
|
||||||
|
|
||||||
/** @return a LocatedBlock for the given block */
|
/** @return a LocatedBlock for the given block */
|
||||||
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
|
private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
|
@ -600,21 +609,15 @@ public class BlockManager {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
|
LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
|
||||||
}
|
}
|
||||||
|
final AccessMode mode = needBlockToken? AccessMode.READ: null;
|
||||||
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
|
final List<LocatedBlock> locatedblocks = createLocatedBlockList(
|
||||||
blocks, offset, length, Integer.MAX_VALUE);
|
blocks, offset, length, Integer.MAX_VALUE, mode);
|
||||||
|
|
||||||
final BlockInfo last = blocks[blocks.length - 1];
|
final BlockInfo last = blocks[blocks.length - 1];
|
||||||
final long lastPos = last.isComplete()?
|
final long lastPos = last.isComplete()?
|
||||||
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
|
fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
|
||||||
: fileSizeExcludeBlocksUnderConstruction;
|
: fileSizeExcludeBlocksUnderConstruction;
|
||||||
final LocatedBlock lastlb = createLocatedBlock(last, lastPos);
|
final LocatedBlock lastlb = createLocatedBlock(last, lastPos, mode);
|
||||||
|
|
||||||
if (isBlockTokenEnabled() && needBlockToken) {
|
|
||||||
for(LocatedBlock lb : locatedblocks) {
|
|
||||||
setBlockToken(lb, AccessMode.READ);
|
|
||||||
}
|
|
||||||
setBlockToken(lastlb, AccessMode.READ);
|
|
||||||
}
|
|
||||||
return new LocatedBlocks(
|
return new LocatedBlocks(
|
||||||
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
|
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
|
||||||
locatedblocks, lastlb, last.isComplete());
|
locatedblocks, lastlb, last.isComplete());
|
||||||
|
|
|
@ -113,7 +113,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||||
import org.apache.hadoop.hdfs.util.RwLock;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
@ -149,7 +148,7 @@ import org.mortbay.util.ajax.JSON;
|
||||||
***************************************************/
|
***************************************************/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@Metrics(context="dfs")
|
@Metrics(context="dfs")
|
||||||
public class FSNamesystem implements RwLock, FSClusterStats,
|
public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
FSNamesystemMBean, NameNodeMXBean {
|
FSNamesystemMBean, NameNodeMXBean {
|
||||||
static final Log LOG = LogFactory.getLog(FSNamesystem.class);
|
static final Log LOG = LogFactory.getLog(FSNamesystem.class);
|
||||||
|
|
||||||
|
@ -517,7 +516,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Is this name system running? */
|
@Override
|
||||||
public boolean isRunning() {
|
public boolean isRunning() {
|
||||||
return fsRunning;
|
return fsRunning;
|
||||||
}
|
}
|
||||||
|
@ -1200,13 +1199,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
leaseManager.addLease(cons.getClientName(), src);
|
leaseManager.addLease(cons.getClientName(), src);
|
||||||
|
|
||||||
// convert last block to under-construction
|
// convert last block to under-construction
|
||||||
LocatedBlock lb =
|
return blockManager.convertLastBlockToUnderConstruction(cons);
|
||||||
blockManager.convertLastBlockToUnderConstruction(cons);
|
|
||||||
|
|
||||||
if (lb != null) {
|
|
||||||
blockManager.setBlockToken(lb, AccessMode.WRITE);
|
|
||||||
}
|
|
||||||
return lb;
|
|
||||||
} else {
|
} else {
|
||||||
// Now we can add the name to the filesystem. This file has no
|
// Now we can add the name to the filesystem. This file has no
|
||||||
// blocks associated with it.
|
// blocks associated with it.
|
||||||
|
@ -1443,8 +1436,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
INodeFileUnderConstruction pendingFile = checkLease(src, clientName);
|
||||||
|
|
||||||
// commit the last block and complete it if it has minimum replicas
|
// commit the last block and complete it if it has minimum replicas
|
||||||
blockManager.commitOrCompleteLastBlock(pendingFile, ExtendedBlock
|
commitOrCompleteLastBlock(pendingFile, ExtendedBlock.getLocalBlock(previous));
|
||||||
.getLocalBlock(previous));
|
|
||||||
|
|
||||||
//
|
//
|
||||||
// If we fail this, bad things happen!
|
// If we fail this, bad things happen!
|
||||||
|
@ -1643,7 +1635,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
|
|
||||||
INodeFileUnderConstruction pendingFile = checkLease(src, holder);
|
INodeFileUnderConstruction pendingFile = checkLease(src, holder);
|
||||||
// commit the last block and complete it if it has minimum replicas
|
// commit the last block and complete it if it has minimum replicas
|
||||||
blockManager.commitOrCompleteLastBlock(pendingFile, last);
|
commitOrCompleteLastBlock(pendingFile, last);
|
||||||
|
|
||||||
if (!checkFileProgress(pendingFile, true)) {
|
if (!checkFileProgress(pendingFile, true)) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -2256,10 +2248,12 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
return leaseManager.reassignLease(lease, src, newHolder);
|
return leaseManager.reassignLease(lease, src, newHolder);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Update disk space consumed. */
|
private void commitOrCompleteLastBlock(final INodeFileUnderConstruction fileINode,
|
||||||
public void updateDiskSpaceConsumed(final INodeFileUnderConstruction fileINode,
|
|
||||||
final Block commitBlock) throws IOException {
|
final Block commitBlock) throws IOException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
|
if (!blockManager.commitOrCompleteLastBlock(fileINode, commitBlock)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Adjust disk space consumption if required
|
// Adjust disk space consumption if required
|
||||||
final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();
|
final long diff = fileINode.getPreferredBlockSize() - commitBlock.getNumBytes();
|
||||||
|
@ -2366,7 +2360,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
src = leaseManager.findPath(pendingFile);
|
src = leaseManager.findPath(pendingFile);
|
||||||
if (closeFile) {
|
if (closeFile) {
|
||||||
// commit the last block and complete it if it has minimum replicas
|
// commit the last block and complete it if it has minimum replicas
|
||||||
blockManager.commitOrCompleteLastBlock(pendingFile, storedBlock);
|
commitOrCompleteLastBlock(pendingFile, storedBlock);
|
||||||
|
|
||||||
//remove lease, close file
|
//remove lease, close file
|
||||||
finalizeINodeFileUnderConstruction(src, pendingFile);
|
finalizeINodeFileUnderConstruction(src, pendingFile);
|
||||||
|
@ -2806,7 +2800,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
*
|
*
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
*/
|
*/
|
||||||
SafeModeInfo(Configuration conf) {
|
private SafeModeInfo(Configuration conf) {
|
||||||
this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
|
this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
|
||||||
DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
|
DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);
|
||||||
this.datanodeThreshold = conf.getInt(
|
this.datanodeThreshold = conf.getInt(
|
||||||
|
@ -2850,7 +2844,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
* Check if safe mode is on.
|
* Check if safe mode is on.
|
||||||
* @return true if in safe mode
|
* @return true if in safe mode
|
||||||
*/
|
*/
|
||||||
synchronized boolean isOn() {
|
private synchronized boolean isOn() {
|
||||||
try {
|
try {
|
||||||
assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
|
assert isConsistent() : " SafeMode: Inconsistent filesystem state: "
|
||||||
+ "Total num of blocks, active blocks, or "
|
+ "Total num of blocks, active blocks, or "
|
||||||
|
@ -2864,14 +2858,14 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
/**
|
/**
|
||||||
* Check if we are populating replication queues.
|
* Check if we are populating replication queues.
|
||||||
*/
|
*/
|
||||||
synchronized boolean isPopulatingReplQueues() {
|
private synchronized boolean isPopulatingReplQueues() {
|
||||||
return initializedReplQueues;
|
return initializedReplQueues;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Enter safe mode.
|
* Enter safe mode.
|
||||||
*/
|
*/
|
||||||
void enter() {
|
private void enter() {
|
||||||
this.reached = 0;
|
this.reached = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2881,7 +2875,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
* Switch to manual safe mode if distributed upgrade is required.<br>
|
* Switch to manual safe mode if distributed upgrade is required.<br>
|
||||||
* Check for invalid, under- & over-replicated blocks in the end of startup.
|
* Check for invalid, under- & over-replicated blocks in the end of startup.
|
||||||
*/
|
*/
|
||||||
synchronized void leave(boolean checkForUpgrades) {
|
private synchronized void leave(boolean checkForUpgrades) {
|
||||||
if(checkForUpgrades) {
|
if(checkForUpgrades) {
|
||||||
// verify whether a distributed upgrade needs to be started
|
// verify whether a distributed upgrade needs to be started
|
||||||
boolean needUpgrade = false;
|
boolean needUpgrade = false;
|
||||||
|
@ -2921,7 +2915,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
/**
|
/**
|
||||||
* Initialize replication queues.
|
* Initialize replication queues.
|
||||||
*/
|
*/
|
||||||
synchronized void initializeReplQueues() {
|
private synchronized void initializeReplQueues() {
|
||||||
LOG.info("initializing replication queues");
|
LOG.info("initializing replication queues");
|
||||||
if (isPopulatingReplQueues()) {
|
if (isPopulatingReplQueues()) {
|
||||||
LOG.warn("Replication queues already initialized.");
|
LOG.warn("Replication queues already initialized.");
|
||||||
|
@ -2939,7 +2933,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
* Check whether we have reached the threshold for
|
* Check whether we have reached the threshold for
|
||||||
* initializing replication queues.
|
* initializing replication queues.
|
||||||
*/
|
*/
|
||||||
synchronized boolean canInitializeReplQueues() {
|
private synchronized boolean canInitializeReplQueues() {
|
||||||
return blockSafe >= blockReplQueueThreshold;
|
return blockSafe >= blockReplQueueThreshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2949,7 +2943,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
* the extension time have passed.
|
* the extension time have passed.
|
||||||
* @return true if can leave or false otherwise.
|
* @return true if can leave or false otherwise.
|
||||||
*/
|
*/
|
||||||
synchronized boolean canLeave() {
|
private synchronized boolean canLeave() {
|
||||||
if (reached == 0)
|
if (reached == 0)
|
||||||
return false;
|
return false;
|
||||||
if (now() - reached < extension) {
|
if (now() - reached < extension) {
|
||||||
|
@ -2963,7 +2957,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
* There is no need to enter safe mode
|
* There is no need to enter safe mode
|
||||||
* if DFS is empty or {@link #threshold} == 0
|
* if DFS is empty or {@link #threshold} == 0
|
||||||
*/
|
*/
|
||||||
boolean needEnter() {
|
private boolean needEnter() {
|
||||||
return (threshold != 0 && blockSafe < blockThreshold) ||
|
return (threshold != 0 && blockSafe < blockThreshold) ||
|
||||||
(getNumLiveDataNodes() < datanodeThreshold) ||
|
(getNumLiveDataNodes() < datanodeThreshold) ||
|
||||||
(!nameNodeHasResourcesAvailable());
|
(!nameNodeHasResourcesAvailable());
|
||||||
|
@ -3007,7 +3001,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
/**
|
/**
|
||||||
* Set total number of blocks.
|
* Set total number of blocks.
|
||||||
*/
|
*/
|
||||||
synchronized void setBlockTotal(int total) {
|
private synchronized void setBlockTotal(int total) {
|
||||||
this.blockTotal = total;
|
this.blockTotal = total;
|
||||||
this.blockThreshold = (int) (blockTotal * threshold);
|
this.blockThreshold = (int) (blockTotal * threshold);
|
||||||
this.blockReplQueueThreshold =
|
this.blockReplQueueThreshold =
|
||||||
|
@ -3020,7 +3014,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
* reached minimal replication.
|
* reached minimal replication.
|
||||||
* @param replication current replication
|
* @param replication current replication
|
||||||
*/
|
*/
|
||||||
synchronized void incrementSafeBlockCount(short replication) {
|
private synchronized void incrementSafeBlockCount(short replication) {
|
||||||
if ((int)replication == safeReplication)
|
if ((int)replication == safeReplication)
|
||||||
this.blockSafe++;
|
this.blockSafe++;
|
||||||
checkMode();
|
checkMode();
|
||||||
|
@ -3031,7 +3025,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
* fallen below minimal replication.
|
* fallen below minimal replication.
|
||||||
* @param replication current replication
|
* @param replication current replication
|
||||||
*/
|
*/
|
||||||
synchronized void decrementSafeBlockCount(short replication) {
|
private synchronized void decrementSafeBlockCount(short replication) {
|
||||||
if (replication == safeReplication-1)
|
if (replication == safeReplication-1)
|
||||||
this.blockSafe--;
|
this.blockSafe--;
|
||||||
checkMode();
|
checkMode();
|
||||||
|
@ -3041,28 +3035,28 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
* Check if safe mode was entered manually or automatically (at startup, or
|
* Check if safe mode was entered manually or automatically (at startup, or
|
||||||
* when disk space is low).
|
* when disk space is low).
|
||||||
*/
|
*/
|
||||||
boolean isManual() {
|
private boolean isManual() {
|
||||||
return extension == Integer.MAX_VALUE && !resourcesLow;
|
return extension == Integer.MAX_VALUE && !resourcesLow;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set manual safe mode.
|
* Set manual safe mode.
|
||||||
*/
|
*/
|
||||||
synchronized void setManual() {
|
private synchronized void setManual() {
|
||||||
extension = Integer.MAX_VALUE;
|
extension = Integer.MAX_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if safe mode was entered due to resources being low.
|
* Check if safe mode was entered due to resources being low.
|
||||||
*/
|
*/
|
||||||
boolean areResourcesLow() {
|
private boolean areResourcesLow() {
|
||||||
return resourcesLow;
|
return resourcesLow;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set that resources are low for this instance of safe mode.
|
* Set that resources are low for this instance of safe mode.
|
||||||
*/
|
*/
|
||||||
void setResourcesLow() {
|
private void setResourcesLow() {
|
||||||
resourcesLow = true;
|
resourcesLow = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3139,9 +3133,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
lastStatusReport = curTime;
|
lastStatusReport = curTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Returns printable state of the class.
|
|
||||||
*/
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
String resText = "Current safe blocks = "
|
String resText = "Current safe blocks = "
|
||||||
+ blockSafe
|
+ blockSafe
|
||||||
|
@ -3156,7 +3148,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
* Checks consistency of the class state.
|
* Checks consistency of the class state.
|
||||||
* This is costly and currently called only in assert.
|
* This is costly and currently called only in assert.
|
||||||
*/
|
*/
|
||||||
boolean isConsistent() throws IOException {
|
private boolean isConsistent() throws IOException {
|
||||||
if (blockTotal == -1 && blockSafe == -1) {
|
if (blockTotal == -1 && blockSafe == -1) {
|
||||||
return true; // manual safe mode
|
return true; // manual safe mode
|
||||||
}
|
}
|
||||||
|
@ -3215,7 +3207,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
return isInSafeMode();
|
return isInSafeMode();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check and trigger safe mode. */
|
@Override
|
||||||
public void checkSafeMode() {
|
public void checkSafeMode() {
|
||||||
// safeMode is volatile, and may be set to null at any time
|
// safeMode is volatile, and may be set to null at any time
|
||||||
SafeModeInfo safeMode = this.safeMode;
|
SafeModeInfo safeMode = this.safeMode;
|
||||||
|
@ -3224,10 +3216,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Check whether the name node is in safe mode.
|
|
||||||
* @return true if safe mode is ON, false otherwise
|
|
||||||
*/
|
|
||||||
public boolean isInSafeMode() {
|
public boolean isInSafeMode() {
|
||||||
// safeMode is volatile, and may be set to null at any time
|
// safeMode is volatile, and may be set to null at any time
|
||||||
SafeModeInfo safeMode = this.safeMode;
|
SafeModeInfo safeMode = this.safeMode;
|
||||||
|
@ -3236,9 +3225,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
return safeMode.isOn();
|
return safeMode.isOn();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Check whether the name node is in startup mode.
|
|
||||||
*/
|
|
||||||
public boolean isInStartupSafeMode() {
|
public boolean isInStartupSafeMode() {
|
||||||
// safeMode is volatile, and may be set to null at any time
|
// safeMode is volatile, and may be set to null at any time
|
||||||
SafeModeInfo safeMode = this.safeMode;
|
SafeModeInfo safeMode = this.safeMode;
|
||||||
|
@ -3247,9 +3234,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
return !safeMode.isManual() && safeMode.isOn();
|
return !safeMode.isManual() && safeMode.isOn();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Check whether replication queues are populated.
|
|
||||||
*/
|
|
||||||
public boolean isPopulatingReplQueues() {
|
public boolean isPopulatingReplQueues() {
|
||||||
// safeMode is volatile, and may be set to null at any time
|
// safeMode is volatile, and may be set to null at any time
|
||||||
SafeModeInfo safeMode = this.safeMode;
|
SafeModeInfo safeMode = this.safeMode;
|
||||||
|
@ -3258,10 +3243,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
return safeMode.isPopulatingReplQueues();
|
return safeMode.isPopulatingReplQueues();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Increment number of blocks that reached minimal replication.
|
|
||||||
* @param replication current replication
|
|
||||||
*/
|
|
||||||
public void incrementSafeBlockCount(int replication) {
|
public void incrementSafeBlockCount(int replication) {
|
||||||
// safeMode is volatile, and may be set to null at any time
|
// safeMode is volatile, and may be set to null at any time
|
||||||
SafeModeInfo safeMode = this.safeMode;
|
SafeModeInfo safeMode = this.safeMode;
|
||||||
|
@ -3270,9 +3252,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
safeMode.incrementSafeBlockCount((short)replication);
|
safeMode.incrementSafeBlockCount((short)replication);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Override
|
||||||
* Decrement number of blocks that reached minimal replication.
|
|
||||||
*/
|
|
||||||
public void decrementSafeBlockCount(Block b) {
|
public void decrementSafeBlockCount(Block b) {
|
||||||
// safeMode is volatile, and may be set to null at any time
|
// safeMode is volatile, and may be set to null at any time
|
||||||
SafeModeInfo safeMode = this.safeMode;
|
SafeModeInfo safeMode = this.safeMode;
|
||||||
|
@ -3397,10 +3377,6 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getTransactionID() {
|
|
||||||
return getEditLog().getSyncTxId();
|
|
||||||
}
|
|
||||||
|
|
||||||
CheckpointSignature rollEditLog() throws IOException {
|
CheckpointSignature rollEditLog() throws IOException {
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
|
@ -3494,7 +3470,7 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
return checkPermission(path, false, null, null, null, null);
|
return checkPermission(path, false, null, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Check if the user has superuser privilege. */
|
@Override
|
||||||
public void checkSuperuserPrivilege() throws AccessControlException {
|
public void checkSuperuserPrivilege() throws AccessControlException {
|
||||||
if (isPermissionEnabled) {
|
if (isPermissionEnabled) {
|
||||||
FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
|
FSPermissionChecker.checkSuperuserPrivilege(fsOwner, supergroup);
|
||||||
|
@ -3916,10 +3892,6 @@ public class FSNamesystem implements RwLock, FSClusterStats,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public int numCorruptReplicas(Block blk) {
|
|
||||||
return blockManager.numCorruptReplicas(blk);
|
|
||||||
}
|
|
||||||
|
|
||||||
static class CorruptFileBlockInfo {
|
static class CorruptFileBlockInfo {
|
||||||
String path;
|
String path;
|
||||||
Block block;
|
Block block;
|
||||||
|
|
|
@ -1050,7 +1050,7 @@ public class NameNode implements NamenodeProtocols {
|
||||||
|
|
||||||
@Override // NamenodeProtocol
|
@Override // NamenodeProtocol
|
||||||
public long getTransactionID() {
|
public long getTransactionID() {
|
||||||
return namesystem.getTransactionID();
|
return namesystem.getEditLog().getSyncTxId();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // NamenodeProtocol
|
@Override // NamenodeProtocol
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.util.RwLock;
|
||||||
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
|
||||||
|
/** Namesystem operations. */
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public interface Namesystem extends RwLock, SafeMode {
|
||||||
|
/** Is this name system running? */
|
||||||
|
public boolean isRunning();
|
||||||
|
|
||||||
|
/** Check if the user has superuser privilege. */
|
||||||
|
public void checkSuperuserPrivilege() throws AccessControlException;
|
||||||
|
|
||||||
|
/** @return the block pool ID */
|
||||||
|
public String getBlockPoolId();
|
||||||
|
}
|
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
|
||||||
|
/** SafeMode related operations. */
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public interface SafeMode {
|
||||||
|
/**
|
||||||
|
* Check safe mode conditions.
|
||||||
|
* If the corresponding conditions are satisfied,
|
||||||
|
* trigger the system to enter/leave safe mode.
|
||||||
|
*/
|
||||||
|
public void checkSafeMode();
|
||||||
|
|
||||||
|
/** Is the system in safe mode? */
|
||||||
|
public boolean isInSafeMode();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Is the system in startup safe mode, i.e. the system is starting up with
|
||||||
|
* safe mode turned on automatically?
|
||||||
|
*/
|
||||||
|
public boolean isInStartupSafeMode();
|
||||||
|
|
||||||
|
/** Check whether replication queues are being populated. */
|
||||||
|
public boolean isPopulatingReplQueues();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Increment number of blocks that reached minimal replication.
|
||||||
|
* @param replication current replication
|
||||||
|
*/
|
||||||
|
public void incrementSafeBlockCount(int replication);
|
||||||
|
|
||||||
|
/** Decrement number of blocks that reached minimal replication. */
|
||||||
|
public void decrementSafeBlockCount(Block b);
|
||||||
|
}
|
|
@ -303,7 +303,7 @@ public class DFSTestUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Keep accessing the given file until the namenode reports that the
|
* Keep accessing the given file until the namenode reports that the
|
||||||
* given block in the file contains the given number of corrupt replicas.
|
* given block in the file contains the given number of corrupt replicas.
|
||||||
*/
|
*/
|
||||||
|
@ -312,7 +312,7 @@ public class DFSTestUtil {
|
||||||
throws IOException, TimeoutException {
|
throws IOException, TimeoutException {
|
||||||
int count = 0;
|
int count = 0;
|
||||||
final int ATTEMPTS = 50;
|
final int ATTEMPTS = 50;
|
||||||
int repls = ns.numCorruptReplicas(b.getLocalBlock());
|
int repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
|
||||||
while (repls != corruptRepls && count < ATTEMPTS) {
|
while (repls != corruptRepls && count < ATTEMPTS) {
|
||||||
try {
|
try {
|
||||||
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
|
IOUtils.copyBytes(fs.open(file), new IOUtils.NullOutputStream(),
|
||||||
|
@ -321,7 +321,7 @@ public class DFSTestUtil {
|
||||||
// Swallow exceptions
|
// Swallow exceptions
|
||||||
}
|
}
|
||||||
System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
|
System.out.println("Waiting for "+corruptRepls+" corrupt replicas");
|
||||||
repls = ns.numCorruptReplicas(b.getLocalBlock());
|
repls = ns.getBlockManager().numCorruptReplicas(b.getLocalBlock());
|
||||||
count++;
|
count++;
|
||||||
}
|
}
|
||||||
if (count == ATTEMPTS) {
|
if (count == ATTEMPTS) {
|
||||||
|
|
Loading…
Reference in New Issue