HDFS-2693. Fix synchronization issues around state transition. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1221582 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-12-21 03:03:23 +00:00
parent 57ef902bbc
commit 36d1c49486
22 changed files with 432 additions and 134 deletions

View File

@ -71,3 +71,5 @@ HDFS-2677. Web UI should indicate the NN state. (eli via todd)
HDFS-2678. When a FailoverProxyProvider is used, DFSClient should not retry connection ten times before failing over (atm via todd)
HDFS-2682. When a FailoverProxyProvider is used, Client should not retry for 45 times if it is timing out to connect to server. (Uma Maheswara Rao G via todd)
HDFS-2693. Fix synchronization issues around state transition (todd)

View File

@ -99,4 +99,16 @@ public class HAUtil {
return null;
}
/**
* This is used only by tests at the moment.
* @return true if the NN should allow read operations while in standby mode.
*/
public static boolean shouldAllowStandbyReads(Configuration conf) {
return conf.getBoolean("dfs.ha.allow.stale.reads", false);
}
public static void setAllowStandbyReads(Configuration conf, boolean val) {
conf.setBoolean("dfs.ha.allow.stale.reads", val);
}
}

View File

@ -817,22 +817,18 @@ public class BlockManager {
*/
public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
final DatanodeInfo dn) throws IOException {
namesystem.writeLock();
try {
final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
if (storedBlock == null) {
// Check if the replica is in the blockMap, if not
// ignore the request for now. This could happen when BlockScanner
// thread of Datanode reports bad block before Block reports are sent
// by the Datanode on startup
NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
+ blk + " not found.");
return;
}
markBlockAsCorrupt(storedBlock, dn);
} finally {
namesystem.writeUnlock();
assert namesystem.hasWriteLock();
final BlockInfo storedBlock = getStoredBlock(blk.getLocalBlock());
if (storedBlock == null) {
// Check if the replica is in the blockMap, if not
// ignore the request for now. This could happen when BlockScanner
// thread of Datanode reports bad block before Block reports are sent
// by the Datanode on startup
NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
+ blk + " not found.");
return;
}
markBlockAsCorrupt(storedBlock, dn);
}
private void markBlockAsCorrupt(BlockInfo storedBlock,

View File

@ -244,18 +244,17 @@ public class BackupNode extends NameNode {
@Override
public void startLogSegment(NamenodeRegistration registration, long txid)
throws IOException {
nn.checkOperation(OperationCategory.JOURNAL);
namesystem.checkOperation(OperationCategory.JOURNAL);
verifyRequest(registration);
verifyRequest(registration);
getBNImage().namenodeStartedLogSegment(txid);
getBNImage().namenodeStartedLogSegment(txid);
}
@Override
public void journal(NamenodeRegistration nnReg,
long firstTxId, int numTxns,
byte[] records) throws IOException {
nn.checkOperation(OperationCategory.JOURNAL);
namesystem.checkOperation(OperationCategory.JOURNAL);
verifyRequest(nnReg);
if(!nnRpcAddress.equals(nnReg.getAddress()))
throw new IOException("Journal request from unexpected name-node: "
@ -401,13 +400,21 @@ public class BackupNode extends NameNode {
return clusterId;
}
@Override // NameNode
protected void checkOperation(OperationCategory op)
throws StandbyException {
if (OperationCategory.JOURNAL != op) {
String msg = "Operation category " + op
+ " is not supported at the BackupNode";
throw new StandbyException(msg);
@Override
protected NameNodeHAContext createHAContext() {
return new BNHAContext();
}
private class BNHAContext extends NameNodeHAContext {
@Override // NameNode
public void checkOperation(OperationCategory op)
throws StandbyException {
if (OperationCategory.JOURNAL != op &&
!(OperationCategory.READ == op && allowStaleStandbyReads)) {
String msg = "Operation category " + op
+ " is not supported at the BackupNode";
throw new StandbyException(msg);
}
}
}

View File

@ -149,6 +149,7 @@ import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReceivedDeleteMessage;
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.BlockReportMessage;
import org.apache.hadoop.hdfs.server.namenode.PendingDataNodeMessages.CommitBlockSynchronizationMessage;
@ -170,6 +171,7 @@ import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -563,6 +565,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
dir.fsImage.editLog.close();
}
void checkOperation(OperationCategory op) throws StandbyException {
haContext.checkOperation(op);
}
public static Collection<URI> getNamespaceDirs(Configuration conf) {
return getStorageDirs(conf, DFS_NAMENODE_NAME_DIR_KEY);
}
@ -793,7 +800,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return serverDefaults.getBlockSize();
}
FsServerDefaults getServerDefaults() {
FsServerDefaults getServerDefaults() throws StandbyException {
checkOperation(OperationCategory.READ);
return serverDefaults;
}
@ -820,6 +828,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot set permission for " + src, safeMode);
}
@ -849,6 +859,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot set owner for " + src, safeMode);
}
@ -939,13 +951,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
} else { // second attempt is with write lock
writeLock(); // writelock is needed to set accesstime
}
// if the namenode is in safemode, then do not update access time
if (isInSafeMode()) {
doAccessTime = false;
}
try {
checkOperation(OperationCategory.READ);
// if the namenode is in safemode, then do not update access time
if (isInSafeMode()) {
doAccessTime = false;
}
long now = now();
INodeFile inode = dir.getFileINode(src);
if (inode == null) {
@ -1013,6 +1026,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot concat " + target, safeMode);
}
@ -1144,6 +1158,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
writeLock();
try {
checkOperation(OperationCategory.WRITE);
// Write access is required to set access and modification times
if (isPermissionEnabled) {
checkPathAccess(src, FsAction.WRITE);
@ -1174,6 +1190,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
HdfsFileStatus resultingStat = null;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (!createParent) {
verifyParentDir(link);
}
@ -1243,6 +1261,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final boolean isFile;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot set replication for " + src, safeMode);
}
@ -1273,6 +1293,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException, UnresolvedLinkException {
readLock();
try {
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkTraverse(filename);
}
@ -1315,6 +1336,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
FileNotFoundException, ParentNotDirectoryException, IOException {
writeLock();
try {
checkOperation(OperationCategory.WRITE);
startFileInternal(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
} finally {
@ -1495,6 +1518,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException {
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot recover the lease of " + src, safeMode);
@ -1614,6 +1639,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlock lb = null;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
lb = startFileInternal(src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND),
false, blockManager.maxReplication, 0);
@ -1678,6 +1705,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot add block to " + src, safeMode);
}
@ -1711,6 +1740,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
// Allocate a new block and record it in the INode.
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot add block to " + src, safeMode);
}
@ -1757,6 +1787,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
final List<DatanodeDescriptor> chosen;
readLock();
try {
checkOperation(OperationCategory.WRITE);
//check safe mode
if (isInSafeMode()) {
throw new SafeModeException("Cannot add datanode; src=" + src
@ -1798,6 +1829,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
UnresolvedLinkException, IOException {
writeLock();
try {
checkOperation(OperationCategory.WRITE);
//
// Remove the block from the pending creates list
//
@ -1873,6 +1905,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
boolean success = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
success = completeFileInternal(src, holder,
ExtendedBlock.getLocalBlock(last));
} finally {
@ -2012,6 +2046,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
writeLock();
try {
checkOperation(OperationCategory.WRITE);
status = renameToInternal(src, dst);
if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(dst, false);
@ -2067,6 +2103,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
writeLock();
try {
checkOperation(OperationCategory.WRITE);
renameToInternal(src, dst, options);
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
resultingStat = dir.getFileInfo(dst, false);
@ -2145,6 +2183,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot delete " + src, safeMode);
}
@ -2222,11 +2261,15 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*
* @return object containing information regarding the file
* or null if file not found
* @throws StandbyException
*/
HdfsFileStatus getFileInfo(String src, boolean resolveLink)
throws AccessControlException, UnresolvedLinkException {
throws AccessControlException, UnresolvedLinkException,
StandbyException {
readLock();
try {
checkOperation(OperationCategory.READ);
if (!DFSUtil.isValidName(src)) {
throw new InvalidPathException("Invalid file name: " + src);
}
@ -2250,6 +2293,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
writeLock();
try {
checkOperation(OperationCategory.WRITE);
status = mkdirsInternal(src, permissions, createParent);
} finally {
writeUnlock();
@ -2304,9 +2349,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
ContentSummary getContentSummary(String src) throws AccessControlException,
FileNotFoundException, UnresolvedLinkException {
FileNotFoundException, UnresolvedLinkException, StandbyException {
readLock();
try {
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
checkPermission(src, false, null, null, null, FsAction.READ_EXECUTE);
}
@ -2325,6 +2372,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException, UnresolvedLinkException {
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot set quota on " + path, safeMode);
}
@ -2349,6 +2397,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
+ src + " for " + clientName);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot fsync file " + src, safeMode);
}
@ -2558,6 +2607,20 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
String src = "";
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (haContext.getState().equals(NameNode.STANDBY_STATE)) {
// TODO(HA) we'll never get here, since we check for WRITE operation above!
if (isGenStampInFuture(newgenerationstamp)) {
LOG.info("Required GS=" + newgenerationstamp
+ ", Queuing commitBlockSynchronization message");
getPendingDataNodeMessages().queueMessage(
new PendingDataNodeMessages.CommitBlockSynchronizationMessage(
lastblock, newgenerationstamp, newlength, closeFile, deleteblock,
newtargets, newgenerationstamp));
return;
}
}
if (isInSafeMode()) {
throw new SafeModeException(
"Cannot commitBlockSynchronization while in safe mode",
@ -2658,6 +2721,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void renewLease(String holder) throws IOException {
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot renew lease for " + holder, safeMode);
}
@ -2685,6 +2750,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
DirectoryListing dl;
readLock();
try {
checkOperation(OperationCategory.READ);
if (isPermissionEnabled) {
if (dir.isDir(src)) {
checkPathAccess(src, FsAction.READ_EXECUTE);
@ -3699,6 +3766,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException {
writeLock();
try {
checkOperation(OperationCategory.CHECKPOINT);
if (isInSafeMode()) {
throw new SafeModeException("Checkpoint not started", safeMode);
}
@ -3715,6 +3784,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
CheckpointSignature sig) throws IOException {
readLock();
try {
checkOperation(OperationCategory.CHECKPOINT);
if (isInSafeMode()) {
throw new SafeModeException("Checkpoint not ended", safeMode);
}
@ -3976,6 +4047,28 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
return pendingFile;
}
/**
* Client is reporting some bad block locations.
*/
void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
writeLock();
try {
checkOperation(OperationCategory.WRITE);
NameNode.stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
for (int i = 0; i < blocks.length; i++) {
ExtendedBlock blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
for (int j = 0; j < nodes.length; j++) {
DatanodeInfo dn = nodes[j];
blockManager.findAndMarkBlockAsCorrupt(blk, dn);
}
}
} finally {
writeUnlock();
}
}
/**
* Get a new generation stamp together with an access token for
* a block under construction
@ -3993,6 +4086,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
LocatedBlock locatedBlock;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
// check vadility of parameters
checkUCBlock(block, clientName);
@ -4022,6 +4117,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException {
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Pipeline not updated", safeMode);
}
@ -4222,6 +4319,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
readLock();
try {
checkOperation(OperationCategory.READ);
if (!isPopulatingReplQueues()) {
throw new IOException("Cannot run listCorruptFileBlocks because " +
"replication queues have not been initialized.");
@ -4314,6 +4413,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
Token<DelegationTokenIdentifier> token;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot issue delegation token", safeMode);
}
@ -4358,6 +4459,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
long expiryTime;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot renew delegation token", safeMode);
}
@ -4388,6 +4491,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws IOException {
writeLock();
try {
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException("Cannot cancel delegation token", safeMode);
}
@ -4727,4 +4832,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public EditLogTailer getEditLogTailer() {
return editLogTailer;
}
@VisibleForTesting
void setFsLockForTests(ReentrantReadWriteLock lock) {
this.fsLock = lock;
}
@VisibleForTesting
ReentrantReadWriteLock getFsLockForTests() {
return fsLock;
}
}

View File

@ -182,6 +182,7 @@ public class NameNode {
private HAState state;
private final boolean haEnabled;
private final HAContext haContext;
protected boolean allowStaleStandbyReads;
/** httpServer */
@ -531,7 +532,8 @@ public class NameNode {
this.role = role;
String nsId = getNameServiceId(conf);
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
this.haContext = new NameNodeHAContext();
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
initializeGenericKeys(conf, nsId);
initialize(conf);
@ -553,6 +555,10 @@ public class NameNode {
}
}
protected HAContext createHAContext() {
return new NameNodeHAContext();
}
/**
* Wait for service to finish.
* (Normally, it runs forever.)
@ -914,11 +920,6 @@ public class NameNode {
return state.getServiceState();
}
/** Check if an operation of given category is allowed */
protected synchronized void checkOperation(final OperationCategory op)
throws StandbyException {
state.checkOperation(haContext, op);
}
/**
* Class used as expose {@link NameNode} as context to {@link HAState}
@ -928,7 +929,7 @@ public class NameNode {
* appropriate action is needed todo either shutdown the node or recover
* from failure.
*/
private class NameNodeHAContext implements HAContext {
protected class NameNodeHAContext implements HAContext {
@Override
public void setState(HAState s) {
state = s;
@ -961,6 +962,28 @@ public class NameNode {
// TODO(HA): Are we guaranteed to be the only active here?
namesystem.stopStandbyServices();
}
@Override
public void writeLock() {
namesystem.writeLock();
}
@Override
public void writeUnlock() {
namesystem.writeUnlock();
}
/** Check if an operation of given category is allowed */
@Override
public void checkOperation(final OperationCategory op)
throws StandbyException {
state.checkOperation(haContext, op);
}
@Override
public boolean allowStaleReads() {
return allowStaleStandbyReads;
}
}
public boolean isStandbyState() {

View File

@ -126,7 +126,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
private static final Log stateChangeLog = NameNode.stateChangeLog;
// Dependencies from other parts of NN.
private final FSNamesystem namesystem;
protected final FSNamesystem namesystem;
protected final NameNode nn;
private final NameNodeMetrics metrics;
@ -318,7 +318,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void errorReport(NamenodeRegistration registration,
int errorCode,
String msg) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
// nn.checkOperation(OperationCategory.WRITE);
// TODO: I dont think this should be checked - it's just for logging
// and dropping backups
verifyRequest(registration);
LOG.info("Error report from " + registration + ": " + msg);
if(errorCode == FATAL)
@ -346,28 +348,24 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
nn.checkOperation(OperationCategory.CHECKPOINT);
namesystem.endCheckpoint(registration, sig);
}
@Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
return namesystem.getDelegationToken(renewer);
}
@Override // ClientProtocol
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
nn.checkOperation(OperationCategory.WRITE);
return namesystem.renewDelegationToken(token);
}
@Override // ClientProtocol
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
namesystem.cancelDelegationToken(token);
}
@ -376,7 +374,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
long offset,
long length)
throws IOException {
nn.checkOperation(OperationCategory.READ);
metrics.incrGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(),
src, offset, length);
@ -384,7 +381,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public FsServerDefaults getServerDefaults() throws IOException {
nn.checkOperation(OperationCategory.READ);
return namesystem.getServerDefaults();
}
@ -396,7 +392,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
boolean createParent,
short replication,
long blockSize) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "
@ -417,7 +412,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public LocatedBlock append(String src, String clientName)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.append: file "
@ -430,7 +424,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public boolean recoverLease(String src, String clientName) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
String clientMachine = getClientMachine();
return namesystem.recoverLease(src, clientName, clientMachine);
}
@ -438,21 +431,18 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public boolean setReplication(String src, short replication)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
return namesystem.setReplication(src, replication);
}
@Override // ClientProtocol
public void setPermission(String src, FsPermission permissions)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
namesystem.setPermission(src, permissions);
}
@Override // ClientProtocol
public void setOwner(String src, String username, String groupname)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
namesystem.setOwner(src, username, groupname);
}
@ -462,7 +452,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
ExtendedBlock previous,
DatanodeInfo[] excludedNodes)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+src+" for "+clientName);
@ -486,7 +475,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName
) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
if (LOG.isDebugEnabled()) {
LOG.debug("getAdditionalDatanode: src=" + src
+ ", blk=" + blk
@ -514,7 +502,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void abandonBlock(ExtendedBlock b, String src, String holder)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
+b+" of file "+src);
@ -527,7 +514,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public boolean complete(String src, String clientName, ExtendedBlock last)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.complete: "
+ src + " for " + clientName);
@ -543,22 +529,12 @@ class NameNodeRpcServer implements NamenodeProtocols {
*/
@Override // ClientProtocol, DatanodeProtocol
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
for (int i = 0; i < blocks.length; i++) {
ExtendedBlock blk = blocks[i].getBlock();
DatanodeInfo[] nodes = blocks[i].getLocations();
for (int j = 0; j < nodes.length; j++) {
DatanodeInfo dn = nodes[j];
namesystem.getBlockManager().findAndMarkBlockAsCorrupt(blk, dn);
}
}
namesystem.reportBadBlocks(blocks);
}
@Override // ClientProtocol
public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
return namesystem.updateBlockForPipeline(block, clientName);
}
@ -567,7 +543,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
}
@ -576,18 +551,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
if (nn.isStandbyState()) {
if (namesystem.isGenStampInFuture(newgenerationstamp)) {
LOG.info("Required GS=" + newgenerationstamp
+ ", Queuing commitBlockSynchronization message");
namesystem.getPendingDataNodeMessages().queueMessage(
new PendingDataNodeMessages.CommitBlockSynchronizationMessage(
block, newgenerationstamp, newlength, closeFile, deleteblock,
newtargets, newgenerationstamp));
return;
}
}
namesystem.commitBlockSynchronization(block,
newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
}
@ -595,14 +558,12 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public long getPreferredBlockSize(String filename)
throws IOException {
nn.checkOperation(OperationCategory.READ);
return namesystem.getPreferredBlockSize(filename);
}
@Deprecated
@Override // ClientProtocol
public boolean rename(String src, String dst) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
}
@ -619,14 +580,12 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void concat(String trg, String[] src) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
namesystem.concat(trg, src);
}
@Override // ClientProtocol
public void rename2(String src, String dst, Options.Rename... options)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
}
@ -640,7 +599,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public boolean delete(String src, boolean recursive) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ ", recursive=" + recursive);
@ -665,7 +623,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public boolean mkdirs(String src, FsPermission masked, boolean createParent)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
}
@ -680,14 +637,12 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public void renewLease(String clientName) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
namesystem.renewLease(clientName);
}
@Override // ClientProtocol
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation) throws IOException {
nn.checkOperation(OperationCategory.READ);
DirectoryListing files = namesystem.getListing(
src, startAfter, needLocation);
if (files != null) {
@ -699,21 +654,19 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public HdfsFileStatus getFileInfo(String src) throws IOException {
nn.checkOperation(OperationCategory.READ);
metrics.incrFileInfoOps();
return namesystem.getFileInfo(src, true);
}
@Override // ClientProtocol
public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
nn.checkOperation(OperationCategory.READ);
metrics.incrFileInfoOps();
return namesystem.getFileInfo(src, false);
}
@Override // ClientProtocol
public long[] getStats() throws IOException {
nn.checkOperation(OperationCategory.READ);
namesystem.checkOperation(OperationCategory.READ);
return namesystem.getStats();
}
@ -793,7 +746,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
throws IOException {
nn.checkOperation(OperationCategory.READ);
String[] cookieTab = new String[] { cookie };
Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
namesystem.listCorruptFileBlocks(path, cookieTab);
@ -820,34 +772,29 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public ContentSummary getContentSummary(String path) throws IOException {
nn.checkOperation(OperationCategory.READ);
return namesystem.getContentSummary(path);
}
@Override // ClientProtocol
public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
}
@Override // ClientProtocol
public void fsync(String src, String clientName) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
namesystem.fsync(src, clientName);
}
@Override // ClientProtocol
public void setTimes(String src, long mtime, long atime)
throws IOException {
nn.checkOperation(OperationCategory.WRITE);
namesystem.setTimes(src, mtime, atime);
}
@Override // ClientProtocol
public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent) throws IOException {
nn.checkOperation(OperationCategory.WRITE);
metrics.incrCreateSymlinkOps();
/* We enforce the MAX_PATH_LENGTH limit even though a symlink target
* URI may refer to a non-HDFS file system.
@ -867,7 +814,6 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // ClientProtocol
public String getLinkTarget(String path) throws IOException {
nn.checkOperation(OperationCategory.READ);
metrics.incrGetLinkTargetOps();
try {
HdfsFileStatus stat = namesystem.getFileInfo(path, false);

View File

@ -85,27 +85,37 @@ public class EditLogTailer {
Preconditions.checkState(tailerThread == null ||
!tailerThread.isAlive(),
"Tailer thread should not be running once failover starts");
doTailEdits();
try {
doTailEdits();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
private void doTailEdits() throws IOException {
// TODO(HA) in a transition from active to standby,
// the following is wrong and ends up causing all of the
// last log segment to get re-read
long lastTxnId = image.getLastAppliedTxId();
if (LOG.isDebugEnabled()) {
LOG.debug("lastTxnId: " + lastTxnId);
}
Collection<EditLogInputStream> streams = editLog
.selectInputStreams(lastTxnId + 1, 0, false);
if (LOG.isDebugEnabled()) {
LOG.debug("edit streams to load from: " + streams.size());
}
long editsLoaded = image.loadEdits(streams, namesystem);
if (LOG.isDebugEnabled()) {
LOG.debug("editsLoaded: " + editsLoaded);
private void doTailEdits() throws IOException, InterruptedException {
// Write lock needs to be interruptible here because the
// transitionToActive RPC takes the write lock before calling
// tailer.stop() -- so if we're not interruptible, it will
// deadlock.
namesystem.writeLockInterruptibly();
try {
long lastTxnId = image.getLastAppliedTxId();
if (LOG.isDebugEnabled()) {
LOG.debug("lastTxnId: " + lastTxnId);
}
Collection<EditLogInputStream> streams = editLog
.selectInputStreams(lastTxnId + 1, 0, false);
if (LOG.isDebugEnabled()) {
LOG.debug("edit streams to load from: " + streams.size());
}
long editsLoaded = image.loadEdits(streams, namesystem);
if (LOG.isDebugEnabled()) {
LOG.debug("editsLoaded: " + editsLoaded);
}
} finally {
namesystem.writeUnlock();
}
}

View File

@ -3,6 +3,8 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.ipc.StandbyException;
/**
* Context that is to be used by {@link HAState} for getting/setting the
@ -27,4 +29,29 @@ public interface HAContext {
/** Stop the services when exiting standby state */
public void stopStandbyServices() throws IOException;
/**
* Take a write-lock on the underlying namesystem
* so that no concurrent state transitions or edits
* can be made.
*/
void writeLock();
/**
* Unlock the lock taken by {@link #writeLock()}
*/
void writeUnlock();
/**
* Verify that the given operation category is allowed in the
* current state. This is to allow NN implementations (eg BackupNode)
* to override it with node-specific handling.
*/
void checkOperation(OperationCategory op) throws StandbyException;
/**
* @return true if the node should allow stale reads (ie reads
* while the namespace is not up to date)
*/
boolean allowStaleReads();
}

View File

@ -54,9 +54,14 @@ abstract public class HAState {
*/
protected final void setStateInternal(final HAContext context, final HAState s)
throws ServiceFailedException {
exitState(context);
context.setState(s);
s.enterState(context);
context.writeLock();
try {
exitState(context);
context.setState(s);
s.enterState(context);
} finally {
context.writeUnlock();
}
}
/**
@ -107,4 +112,4 @@ abstract public class HAState {
public String toString() {
return state.toString();
}
}
}

View File

@ -73,6 +73,9 @@ public class StandbyState extends HAState {
@Override
public void checkOperation(HAContext context, OperationCategory op)
throws StandbyException {
if (op == OperationCategory.READ && context.allowStaleReads()) {
return;
}
String msg = "Operation category " + op + " is not supported in state "
+ context.getState();
throw new StandbyException(msg);

View File

@ -307,6 +307,14 @@ public class MiniDFSCluster {
private boolean waitSafeMode = true;
private boolean federation;
/**
* A unique instance identifier for the cluster. This
* is used to disambiguate HA filesystems in the case where
* multiple MiniDFSClusters are used in the same test suite.
*/
private int instanceId;
private static int instanceCount = 0;
/**
* Stores the information related to a namenode in the cluster
*/
@ -325,6 +333,9 @@ public class MiniDFSCluster {
*/
public MiniDFSCluster() {
nameNodes = new NameNodeInfo[0]; // No namenode in the cluster
synchronized (MiniDFSCluster.class) {
instanceId = instanceCount++;
}
}
/**
@ -510,6 +521,10 @@ public class MiniDFSCluster {
boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology)
throws IOException {
synchronized (MiniDFSCluster.class) {
instanceId = instanceCount++;
}
this.conf = conf;
base_dir = new File(determineDfsBaseDir());
data_dir = new File(base_dir, "data");
@ -737,6 +752,10 @@ public class MiniDFSCluster {
}
return uri;
}
public int getInstanceId() {
return instanceId;
}
/**
* @return Configuration of for the given namenode

View File

@ -46,7 +46,7 @@ public class TestDFSClientFailover {
private Configuration conf = new Configuration();
private MiniDFSCluster cluster;
private static final String LOGICAL_HOSTNAME = "ha-nn-uri";
private static final String LOGICAL_HOSTNAME = "ha-nn-uri-%d";
@Before
public void setUpCluster() throws IOException {
@ -91,7 +91,8 @@ public class TestDFSClientFailover {
// Check that it functions even if the URL becomes canonicalized
// to include a port number.
Path withPort = new Path("hdfs://" + LOGICAL_HOSTNAME + ":" +
Path withPort = new Path("hdfs://" +
getLogicalHostname(cluster) + ":" +
NameNode.DEFAULT_PORT + "/" + TEST_FILE.toUri().getPath());
FileSystem fs2 = withPort.getFileSystem(fs.getConf());
assertTrue(fs2.exists(withPort));
@ -126,6 +127,7 @@ public class TestDFSClientFailover {
String nameNodeId1 = "nn1";
String nameNodeId2 = "nn2";
String logicalName = getLogicalHostname(cluster);
conf = new Configuration(conf);
String address1 = "hdfs://" + nnAddr1.getHostName() + ":" + nnAddr1.getPort();
@ -138,11 +140,15 @@ public class TestDFSClientFailover {
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, nsId);
conf.set(DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY, nsId),
nameNodeId1 + "," + nameNodeId2);
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + LOGICAL_HOSTNAME,
conf.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + logicalName,
ConfiguredFailoverProxyProvider.class.getName());
FileSystem fs = FileSystem.get(new URI("hdfs://" + LOGICAL_HOSTNAME), conf);
FileSystem fs = FileSystem.get(new URI("hdfs://" + logicalName), conf);
return fs;
}
private static String getLogicalHostname(MiniDFSCluster cluster) {
return String.format(LOGICAL_HOSTNAME, cluster.getInstanceId());
}
}

View File

@ -146,8 +146,14 @@ public class TestFileCorruption extends TestCase {
// report corrupted block by the third datanode
DatanodeRegistration dnR =
DataNodeTestUtils.getDNRegistrationForBP(dataNode, blk.getBlockPoolId());
cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt(
blk, new DatanodeInfo(dnR));
FSNamesystem ns = cluster.getNamesystem();
ns.writeLock();
try {
cluster.getNamesystem().getBlockManager().findAndMarkBlockAsCorrupt(
blk, new DatanodeInfo(dnR));
} finally {
ns.writeUnlock();
}
// open the file
fs.open(FILE_PATH);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.PermissionStatus;
@ -29,7 +30,9 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.security.AccessControlException;
import org.mockito.Mockito;
/**
* This is a utility class to expose NameNode functionality for unit tests.
@ -52,7 +55,8 @@ public class NameNodeAdapter {
}
public static HdfsFileStatus getFileInfo(NameNode namenode, String src,
boolean resolveLink) throws AccessControlException, UnresolvedLinkException {
boolean resolveLink) throws AccessControlException, UnresolvedLinkException,
StandbyException {
return namenode.getNamesystem().getFileInfo(src, resolveLink);
}
@ -134,4 +138,10 @@ public class NameNodeAdapter {
public static long[] getStats(final FSNamesystem fsn) {
return fsn.getStats();
}
public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) {
ReentrantReadWriteLock spy = Mockito.spy(fsn.getFsLockForTests());
fsn.setFsLockForTests(spy);
return spy;
}
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@ -120,6 +121,7 @@ public class TestBackupNode extends TestCase {
*/
public void testBackupNodeTailsEdits() throws Exception {
Configuration conf = new HdfsConfiguration();
HAUtil.setAllowStandbyReads(conf, true);
MiniDFSCluster cluster = null;
FileSystem fileSys = null;
BackupNode backup = null;
@ -245,6 +247,7 @@ public class TestBackupNode extends TestCase {
Path file3 = new Path("/backup.dat");
Configuration conf = new HdfsConfiguration();
HAUtil.setAllowStandbyReads(conf, true);
short replication = (short)conf.getInt("dfs.replication", 3);
int numDatanodes = Math.max(3, replication);
conf.set(DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY, "0");

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
@ -52,6 +53,7 @@ public class TestEditLogTailer {
public void testTailer() throws IOException, InterruptedException,
ServiceFailedException {
Configuration conf = new HdfsConfiguration();
HAUtil.setAllowStandbyReads(conf, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())

View File

@ -29,6 +29,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
@ -52,6 +53,7 @@ public class TestEditLogsDuringFailover {
@Test
public void testStartup() throws Exception {
Configuration conf = new Configuration();
HAUtil.setAllowStandbyReads(conf, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -28,8 +30,12 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.TestDFSClientFailover;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Tests state transition from active->standby, and manual failover
@ -133,4 +139,57 @@ public class TestHAStateTransitions {
cluster.shutdown();
}
}
/**
* Regression test for HDFS-2693: when doing state transitions, we need to
* lock the FSNamesystem so that we don't end up doing any writes while it's
* "in between" states.
* This test case starts up several client threads which do mutation operations
* while flipping a NN back and forth from active to standby.
*/
@Test(timeout=120000)
public void testTransitionSynchronization() throws Exception {
Configuration conf = new Configuration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(0)
.build();
try {
cluster.waitActive();
ReentrantReadWriteLock spyLock = NameNodeAdapter.spyOnFsLock(
cluster.getNameNode(0).getNamesystem());
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(50))
.when(spyLock).writeLock();
final FileSystem fs = TestDFSClientFailover.configureFailoverFs(
cluster, conf);
TestContext ctx = new TestContext();
for (int i = 0; i < 50; i++) {
final int finalI = i;
ctx.addThread(new RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
Path p = new Path("/test-" + finalI);
fs.mkdirs(p);
fs.delete(p, true);
}
});
}
ctx.addThread(new RepeatingTestThread(ctx) {
@Override
public void doAnAction() throws Exception {
cluster.transitionToStandby(0);
Thread.sleep(50);
cluster.transitionToActive(0);
}
});
ctx.startThreads();
ctx.waitFor(20000);
ctx.stop();
} finally {
cluster.shutdown();
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.TestDFSClientFailover;
@ -54,6 +55,8 @@ public class TestStandbyIsHot {
@Test
public void testStandbyIsHot() throws Exception {
Configuration conf = new Configuration();
// We read from the standby to watch block locations
HAUtil.setAllowStandbyReads(conf, true);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.nnTopology(MiniDFSNNTopology.simpleHATopology())
.numDataNodes(3)

View File

@ -167,7 +167,12 @@ public class TestNameNodeMetrics extends TestCase {
// Corrupt first replica of the block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
} finally {
cluster.getNamesystem().writeUnlock();
}
updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("CorruptBlocks", 1L, rb);
@ -204,7 +209,12 @@ public class TestNameNodeMetrics extends TestCase {
// Corrupt the only replica of the block to result in a missing block
LocatedBlock block = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), file.toString(), 0, 1).get(0);
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
cluster.getNamesystem().writeLock();
try {
bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0]);
} finally {
cluster.getNamesystem().writeUnlock();
}
updateMetrics();
MetricsRecordBuilder rb = getMetrics(NS_METRICS);
assertGauge("UnderReplicatedBlocks", 1L, rb);

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
@ -176,4 +177,35 @@ public abstract class GenericTestUtils {
}
}
/**
* An Answer implementation which sleeps for a random number of milliseconds
* between 0 and a configurable value before delegating to the real
* implementation of the method. This can be useful for drawing out race
* conditions.
*/
public static class SleepAnswer implements Answer<Object> {
private final int maxSleepTime;
private static Random r = new Random();
public SleepAnswer(int maxSleepTime) {
this.maxSleepTime = maxSleepTime;
}
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
boolean interrupted = false;
try {
Thread.sleep(r.nextInt(maxSleepTime));
} catch (InterruptedException ie) {
interrupted = true;
}
try {
return invocation.callRealMethod();
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
}