HDFS-1974. Introduce active and standy states to the namenode. Contributed by Suresh Srinivas.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1156418 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2011-08-11 00:44:05 +00:00
parent f0ecbd0043
commit 73f2092b73
9 changed files with 377 additions and 44 deletions

View File

@ -5,3 +5,5 @@ branch is merged.
------------------------------ ------------------------------
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)
HDFS-1974. Introduce active and standy states to the namenode. (suresh)

View File

@ -696,4 +696,14 @@ public class DFSUtil {
ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle, ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle,
NetUtils.getDefaultSocketFactory(conf), socketTimeout); NetUtils.getDefaultSocketFactory(conf), socketTimeout);
} }
/**
* Returns true if HA for namenode is configured.
* @param conf Configuration
* @return true if HA is configured in the configuration; else false.
*/
public static boolean isHAEnabled(Configuration conf) {
// TODO:HA configuration changes pending
return false;
}
} }

View File

@ -188,34 +188,6 @@ public class BackupNode extends NameNode implements JournalProtocol {
} }
} }
/////////////////////////////////////////////////////
// NamenodeProtocol implementation for backup node.
/////////////////////////////////////////////////////
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {
throw new UnsupportedActionException("getBlocks");
}
// Only active name-node can register other nodes.
@Override // NamenodeProtocol
public NamenodeRegistration register(NamenodeRegistration registration
) throws IOException {
throw new UnsupportedActionException("register");
}
@Override // NamenodeProtocol
public NamenodeCommand startCheckpoint(NamenodeRegistration registration)
throws IOException {
throw new UnsupportedActionException("startCheckpoint");
}
@Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException {
throw new UnsupportedActionException("endCheckpoint");
}
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
// BackupNodeProtocol implementation for backup node. // BackupNodeProtocol implementation for backup node.
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
@ -224,6 +196,7 @@ public class BackupNode extends NameNode implements JournalProtocol {
public void journal(NamenodeRegistration nnReg, public void journal(NamenodeRegistration nnReg,
long firstTxId, int numTxns, long firstTxId, int numTxns,
byte[] records) throws IOException { byte[] records) throws IOException {
checkOperation(OperationCategory.JOURNAL);
verifyRequest(nnReg); verifyRequest(nnReg);
if(!nnRpcAddress.equals(nnReg.getAddress())) if(!nnRpcAddress.equals(nnReg.getAddress()))
throw new IOException("Journal request from unexpected name-node: " throw new IOException("Journal request from unexpected name-node: "
@ -234,6 +207,7 @@ public class BackupNode extends NameNode implements JournalProtocol {
@Override @Override
public void startLogSegment(NamenodeRegistration registration, long txid) public void startLogSegment(NamenodeRegistration registration, long txid)
throws IOException { throws IOException {
checkOperation(OperationCategory.JOURNAL);
verifyRequest(registration); verifyRequest(registration);
getBNImage().namenodeStartedLogSegment(txid); getBNImage().namenodeStartedLogSegment(txid);
@ -369,4 +343,14 @@ public class BackupNode extends NameNode implements JournalProtocol {
String getClusterId() { String getClusterId() {
return clusterId; return clusterId;
} }
@Override // NameNode
protected void checkOperation(OperationCategory op)
throws UnsupportedActionException {
if (OperationCategory.JOURNAL != op) {
String msg = "Operation category " + op
+ " is not supported at the BackupNode";
throw new UnsupportedActionException(msg);
}
}
} }

View File

@ -31,6 +31,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
@ -67,11 +69,15 @@ import org.apache.hadoop.hdfs.server.common.HdfsConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport; import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.ha.StandbyState;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@ -145,6 +151,20 @@ public class NameNode implements NamenodeProtocols, FSConstants {
HdfsConfiguration.init(); HdfsConfiguration.init();
} }
/**
* Categories of operations supported by the namenode.
*/
public static enum OperationCategory {
/** Read operation that does not change the namespace state */
READ,
/** Write operation that changes the namespace state */
WRITE,
/** Operations related to checkpointing */
CHECKPOINT,
/** Operations related to {@link JournalProtocol} */
JOURNAL
}
/** /**
* HDFS federation configuration can have two types of parameters: * HDFS federation configuration can have two types of parameters:
* <ol> * <ol>
@ -204,9 +224,15 @@ public class NameNode implements NamenodeProtocols, FSConstants {
public static final Log LOG = LogFactory.getLog(NameNode.class.getName()); public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange"); public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange");
public static final HAState ACTIVE_STATE = new ActiveState();
public static final HAState STANDBY_STATE = new StandbyState();
protected FSNamesystem namesystem; protected FSNamesystem namesystem;
protected NamenodeRole role; protected NamenodeRole role;
private HAState state;
private final boolean haEnabled;
/** RPC server. Package-protected for use in tests. */ /** RPC server. Package-protected for use in tests. */
Server server; Server server;
/** RPC server for HDFS Services communication. /** RPC server for HDFS Services communication.
@ -402,6 +428,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
* @param conf the configuration * @param conf the configuration
*/ */
protected void initialize(Configuration conf) throws IOException { protected void initialize(Configuration conf) throws IOException {
initializeGenericKeys(conf);
InetSocketAddress socAddr = getRpcServerAddress(conf); InetSocketAddress socAddr = getRpcServerAddress(conf);
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf); loginAsNameNodeUser(conf);
@ -450,10 +477,6 @@ public class NameNode implements NamenodeProtocols, FSConstants {
} }
activate(conf); activate(conf);
LOG.info(getRole() + " up at: " + rpcAddress);
if (serviceRPCAddress != null) {
LOG.info(getRole() + " service server is up at: " + serviceRPCAddress);
}
} }
/** /**
@ -503,6 +526,10 @@ public class NameNode implements NamenodeProtocols, FSConstants {
LOG.warn("ServicePlugin " + p + " could not be started", t); LOG.warn("ServicePlugin " + p + " could not be started", t);
} }
} }
LOG.info(getRole() + " up at: " + rpcAddress);
if (serviceRPCAddress != null) {
LOG.info(getRole() + " service server is up at: " + serviceRPCAddress);
}
} }
private void startTrashEmptier(Configuration conf) throws IOException { private void startTrashEmptier(Configuration conf) throws IOException {
@ -556,8 +583,9 @@ public class NameNode implements NamenodeProtocols, FSConstants {
protected NameNode(Configuration conf, NamenodeRole role) protected NameNode(Configuration conf, NamenodeRole role)
throws IOException { throws IOException {
this.role = role; this.role = role;
this.haEnabled = DFSUtil.isHAEnabled(conf);
this.state = !haEnabled ? ACTIVE_STATE : STANDBY_STATE;
try { try {
initializeGenericKeys(conf);
initialize(conf); initialize(conf);
} catch (IOException e) { } catch (IOException e) {
this.stop(); this.stop();
@ -638,6 +666,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
public void errorReport(NamenodeRegistration registration, public void errorReport(NamenodeRegistration registration,
int errorCode, int errorCode,
String msg) throws IOException { String msg) throws IOException {
checkOperation(OperationCategory.WRITE);
verifyRequest(registration); verifyRequest(registration);
LOG.info("Error report from " + registration + ": " + msg); LOG.info("Error report from " + registration + ": " + msg);
if(errorCode == FATAL) if(errorCode == FATAL)
@ -665,27 +694,28 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // NamenodeProtocol @Override // NamenodeProtocol
public void endCheckpoint(NamenodeRegistration registration, public void endCheckpoint(NamenodeRegistration registration,
CheckpointSignature sig) throws IOException { CheckpointSignature sig) throws IOException {
verifyRequest(registration); checkOperation(OperationCategory.CHECKPOINT);
if(!isRole(NamenodeRole.NAMENODE))
throw new IOException("Only an ACTIVE node can invoke endCheckpoint.");
namesystem.endCheckpoint(registration, sig); namesystem.endCheckpoint(registration, sig);
} }
@Override // ClientProtocol @Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
return namesystem.getDelegationToken(renewer); return namesystem.getDelegationToken(renewer);
} }
@Override // ClientProtocol @Override // ClientProtocol
public long renewDelegationToken(Token<DelegationTokenIdentifier> token) public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException { throws InvalidToken, IOException {
checkOperation(OperationCategory.WRITE);
return namesystem.renewDelegationToken(token); return namesystem.renewDelegationToken(token);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.cancelDelegationToken(token); namesystem.cancelDelegationToken(token);
} }
@ -694,6 +724,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
long offset, long offset,
long length) long length)
throws IOException { throws IOException {
checkOperation(OperationCategory.READ);
metrics.incrGetBlockLocations(); metrics.incrGetBlockLocations();
return namesystem.getBlockLocations(getClientMachine(), return namesystem.getBlockLocations(getClientMachine(),
src, offset, length); src, offset, length);
@ -712,6 +743,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
boolean createParent, boolean createParent,
short replication, short replication,
long blockSize) throws IOException { long blockSize) throws IOException {
checkOperation(OperationCategory.WRITE);
String clientMachine = getClientMachine(); String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) { if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file " stateChangeLog.debug("*DIR* NameNode.create: file "
@ -732,6 +764,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public LocatedBlock append(String src, String clientName) public LocatedBlock append(String src, String clientName)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
String clientMachine = getClientMachine(); String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) { if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.append: file " stateChangeLog.debug("*DIR* NameNode.append: file "
@ -744,6 +777,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public boolean recoverLease(String src, String clientName) throws IOException { public boolean recoverLease(String src, String clientName) throws IOException {
checkOperation(OperationCategory.WRITE);
String clientMachine = getClientMachine(); String clientMachine = getClientMachine();
return namesystem.recoverLease(src, clientName, clientMachine); return namesystem.recoverLease(src, clientName, clientMachine);
} }
@ -751,18 +785,21 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public boolean setReplication(String src, short replication) public boolean setReplication(String src, short replication)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
return namesystem.setReplication(src, replication); return namesystem.setReplication(src, replication);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void setPermission(String src, FsPermission permissions) public void setPermission(String src, FsPermission permissions)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.setPermission(src, permissions); namesystem.setPermission(src, permissions);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void setOwner(String src, String username, String groupname) public void setOwner(String src, String username, String groupname)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.setOwner(src, username, groupname); namesystem.setOwner(src, username, groupname);
} }
@ -772,6 +809,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
ExtendedBlock previous, ExtendedBlock previous,
DatanodeInfo[] excludedNodes) DatanodeInfo[] excludedNodes)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) { if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+src+" for "+clientName); +src+" for "+clientName);
@ -795,6 +833,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
final DatanodeInfo[] existings, final DatanodeInfo[] excludes, final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName final int numAdditionalNodes, final String clientName
) throws IOException { ) throws IOException {
checkOperation(OperationCategory.WRITE);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("getAdditionalDatanode: src=" + src LOG.debug("getAdditionalDatanode: src=" + src
+ ", blk=" + blk + ", blk=" + blk
@ -820,8 +859,10 @@ public class NameNode implements NamenodeProtocols, FSConstants {
/** /**
* The client needs to give up on the block. * The client needs to give up on the block.
*/ */
@Override // ClientProtocol
public void abandonBlock(ExtendedBlock b, String src, String holder) public void abandonBlock(ExtendedBlock b, String src, String holder)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) { if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: " stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
+b+" of file "+src); +b+" of file "+src);
@ -834,6 +875,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public boolean complete(String src, String clientName, ExtendedBlock last) public boolean complete(String src, String clientName, ExtendedBlock last)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) { if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.complete: " stateChangeLog.debug("*DIR* NameNode.complete: "
+ src + " for " + clientName); + src + " for " + clientName);
@ -847,8 +889,9 @@ public class NameNode implements NamenodeProtocols, FSConstants {
* mark the block as corrupt. In the future we might * mark the block as corrupt. In the future we might
* check the blocks are actually corrupt. * check the blocks are actually corrupt.
*/ */
@Override @Override // ClientProtocol, DatanodeProtocol
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException { public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
checkOperation(OperationCategory.WRITE);
stateChangeLog.info("*DIR* NameNode.reportBadBlocks"); stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
for (int i = 0; i < blocks.length; i++) { for (int i = 0; i < blocks.length; i++) {
ExtendedBlock blk = blocks[i].getBlock(); ExtendedBlock blk = blocks[i].getBlock();
@ -863,6 +906,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName) public LocatedBlock updateBlockForPipeline(ExtendedBlock block, String clientName)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
return namesystem.updateBlockForPipeline(block, clientName); return namesystem.updateBlockForPipeline(block, clientName);
} }
@ -871,6 +915,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
public void updatePipeline(String clientName, ExtendedBlock oldBlock, public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes) ExtendedBlock newBlock, DatanodeID[] newNodes)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes); namesystem.updatePipeline(clientName, oldBlock, newBlock, newNodes);
} }
@ -879,6 +924,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
long newgenerationstamp, long newlength, long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.commitBlockSynchronization(block, namesystem.commitBlockSynchronization(block,
newgenerationstamp, newlength, closeFile, deleteblock, newtargets); newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
} }
@ -886,12 +932,14 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public long getPreferredBlockSize(String filename) public long getPreferredBlockSize(String filename)
throws IOException { throws IOException {
checkOperation(OperationCategory.READ);
return namesystem.getPreferredBlockSize(filename); return namesystem.getPreferredBlockSize(filename);
} }
@Deprecated @Deprecated
@Override // ClientProtocol @Override // ClientProtocol
public boolean rename(String src, String dst) throws IOException { public boolean rename(String src, String dst) throws IOException {
checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) { if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
} }
@ -908,12 +956,14 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public void concat(String trg, String[] src) throws IOException { public void concat(String trg, String[] src) throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.concat(trg, src); namesystem.concat(trg, src);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void rename(String src, String dst, Options.Rename... options) public void rename(String src, String dst, Options.Rename... options)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) { if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst); stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
} }
@ -928,11 +978,13 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Deprecated @Deprecated
@Override // ClientProtocol @Override // ClientProtocol
public boolean delete(String src) throws IOException { public boolean delete(String src) throws IOException {
checkOperation(OperationCategory.WRITE);
return delete(src, true); return delete(src, true);
} }
@Override // ClientProtocol @Override // ClientProtocol
public boolean delete(String src, boolean recursive) throws IOException { public boolean delete(String src, boolean recursive) throws IOException {
checkOperation(OperationCategory.WRITE);
if (stateChangeLog.isDebugEnabled()) { if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* Namenode.delete: src=" + src stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
+ ", recursive=" + recursive); + ", recursive=" + recursive);
@ -957,6 +1009,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public boolean mkdirs(String src, FsPermission masked, boolean createParent) public boolean mkdirs(String src, FsPermission masked, boolean createParent)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
if(stateChangeLog.isDebugEnabled()) { if(stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src); stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
} }
@ -971,13 +1024,14 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public void renewLease(String clientName) throws IOException { public void renewLease(String clientName) throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.renewLease(clientName); namesystem.renewLease(clientName);
} }
@Override // ClientProtocol @Override // ClientProtocol
public DirectoryListing getListing(String src, byte[] startAfter, public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation) boolean needLocation) throws IOException {
throws IOException { checkOperation(OperationCategory.READ);
DirectoryListing files = namesystem.getListing( DirectoryListing files = namesystem.getListing(
src, startAfter, needLocation); src, startAfter, needLocation);
if (files != null) { if (files != null) {
@ -989,12 +1043,14 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public HdfsFileStatus getFileInfo(String src) throws IOException { public HdfsFileStatus getFileInfo(String src) throws IOException {
checkOperation(OperationCategory.READ);
metrics.incrFileInfoOps(); metrics.incrFileInfoOps();
return namesystem.getFileInfo(src, true); return namesystem.getFileInfo(src, true);
} }
@Override // ClientProtocol @Override // ClientProtocol
public HdfsFileStatus getFileLinkInfo(String src) throws IOException { public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
checkOperation(OperationCategory.READ);
metrics.incrFileInfoOps(); metrics.incrFileInfoOps();
return namesystem.getFileInfo(src, false); return namesystem.getFileInfo(src, false);
} }
@ -1007,6 +1063,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type) public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException { throws IOException {
checkOperation(OperationCategory.READ);
DatanodeInfo results[] = namesystem.datanodeReport(type); DatanodeInfo results[] = namesystem.datanodeReport(type);
if (results == null ) { if (results == null ) {
throw new IOException("Cannot find datanode report"); throw new IOException("Cannot find datanode report");
@ -1016,6 +1073,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public boolean setSafeMode(SafeModeAction action) throws IOException { public boolean setSafeMode(SafeModeAction action) throws IOException {
// TODO:HA decide on OperationCategory for this
return namesystem.setSafeMode(action); return namesystem.setSafeMode(action);
} }
@ -1029,54 +1087,64 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public boolean restoreFailedStorage(String arg) public boolean restoreFailedStorage(String arg)
throws AccessControlException { throws AccessControlException {
// TODO:HA decide on OperationCategory for this
return namesystem.restoreFailedStorage(arg); return namesystem.restoreFailedStorage(arg);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void saveNamespace() throws IOException { public void saveNamespace() throws IOException {
// TODO:HA decide on OperationCategory for this
namesystem.saveNamespace(); namesystem.saveNamespace();
} }
@Override // ClientProtocol @Override // ClientProtocol
public void refreshNodes() throws IOException { public void refreshNodes() throws IOException {
// TODO:HA decide on OperationCategory for this
namesystem.refreshNodes(new HdfsConfiguration()); namesystem.refreshNodes(new HdfsConfiguration());
} }
@Override // NamenodeProtocol @Override // NamenodeProtocol
public long getTransactionID() { public long getTransactionID() {
// TODO:HA decide on OperationCategory for this
return namesystem.getTransactionID(); return namesystem.getTransactionID();
} }
@Override // NamenodeProtocol @Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException { public CheckpointSignature rollEditLog() throws IOException {
// TODO:HA decide on OperationCategory for this
return namesystem.rollEditLog(); return namesystem.rollEditLog();
} }
@Override @Override // NamenodeProtocol
public RemoteEditLogManifest getEditLogManifest(long sinceTxId) public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException { throws IOException {
// TODO:HA decide on OperationCategory for this
return namesystem.getEditLogManifest(sinceTxId); return namesystem.getEditLogManifest(sinceTxId);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void finalizeUpgrade() throws IOException { public void finalizeUpgrade() throws IOException {
// TODO:HA decide on OperationCategory for this
namesystem.finalizeUpgrade(); namesystem.finalizeUpgrade();
} }
@Override // ClientProtocol @Override // ClientProtocol
public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
throws IOException { throws IOException {
// TODO:HA decide on OperationCategory for this
return namesystem.distributedUpgradeProgress(action); return namesystem.distributedUpgradeProgress(action);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void metaSave(String filename) throws IOException { public void metaSave(String filename) throws IOException {
// TODO:HA decide on OperationCategory for this
namesystem.metaSave(filename); namesystem.metaSave(filename);
} }
@Override // ClientProtocol @Override // ClientProtocol
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
throws IOException { throws IOException {
checkOperation(OperationCategory.READ);
Collection<FSNamesystem.CorruptFileBlockInfo> fbs = Collection<FSNamesystem.CorruptFileBlockInfo> fbs =
namesystem.listCorruptFileBlocks(path, cookie); namesystem.listCorruptFileBlocks(path, cookie);
@ -1096,35 +1164,42 @@ public class NameNode implements NamenodeProtocols, FSConstants {
* @param bandwidth Blanacer bandwidth in bytes per second for all datanodes. * @param bandwidth Blanacer bandwidth in bytes per second for all datanodes.
* @throws IOException * @throws IOException
*/ */
@Override // ClientProtocol
public void setBalancerBandwidth(long bandwidth) throws IOException { public void setBalancerBandwidth(long bandwidth) throws IOException {
// TODO:HA decide on OperationCategory for this
namesystem.setBalancerBandwidth(bandwidth); namesystem.setBalancerBandwidth(bandwidth);
} }
@Override // ClientProtocol @Override // ClientProtocol
public ContentSummary getContentSummary(String path) throws IOException { public ContentSummary getContentSummary(String path) throws IOException {
checkOperation(OperationCategory.READ);
return namesystem.getContentSummary(path); return namesystem.getContentSummary(path);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void setQuota(String path, long namespaceQuota, long diskspaceQuota) public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.setQuota(path, namespaceQuota, diskspaceQuota); namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void fsync(String src, String clientName) throws IOException { public void fsync(String src, String clientName) throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.fsync(src, clientName); namesystem.fsync(src, clientName);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void setTimes(String src, long mtime, long atime) public void setTimes(String src, long mtime, long atime)
throws IOException { throws IOException {
checkOperation(OperationCategory.WRITE);
namesystem.setTimes(src, mtime, atime); namesystem.setTimes(src, mtime, atime);
} }
@Override // ClientProtocol @Override // ClientProtocol
public void createSymlink(String target, String link, FsPermission dirPerms, public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent) throws IOException { boolean createParent) throws IOException {
checkOperation(OperationCategory.WRITE);
metrics.incrCreateSymlinkOps(); metrics.incrCreateSymlinkOps();
/* We enforce the MAX_PATH_LENGTH limit even though a symlink target /* We enforce the MAX_PATH_LENGTH limit even though a symlink target
* URI may refer to a non-HDFS file system. * URI may refer to a non-HDFS file system.
@ -1144,6 +1219,7 @@ public class NameNode implements NamenodeProtocols, FSConstants {
@Override // ClientProtocol @Override // ClientProtocol
public String getLinkTarget(String path) throws IOException { public String getLinkTarget(String path) throws IOException {
checkOperation(OperationCategory.READ);
metrics.incrGetLinkTargetOps(); metrics.incrGetLinkTargetOps();
/* Resolves the first symlink in the given path, returning a /* Resolves the first symlink in the given path, returning a
* new path consisting of the target of the symlink and any * new path consisting of the target of the symlink and any
@ -1591,4 +1667,43 @@ public class NameNode implements NamenodeProtocols, FSConstants {
} }
return clientMachine; return clientMachine;
} }
@Override // HAServiceProtocol
public synchronized void monitorHealth() throws HealthCheckFailedException {
if (!haEnabled) {
return; // no-op, if HA is not eanbled
}
// TODO:HA implement health check
return;
}
@Override // HAServiceProtocol
public synchronized void transitionToActive() throws ServiceFailedException {
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
state.setState(this, ACTIVE_STATE);
}
@Override // HAServiceProtocol
public synchronized void transitionToStandby() throws ServiceFailedException {
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
state.setState(this, STANDBY_STATE);
}
/** Check if an operation of given category is allowed */
protected synchronized void checkOperation(final OperationCategory op)
throws UnsupportedActionException {
state.checkOperation(this, op);
}
public synchronized HAState getState() {
return state;
}
public synchronized void setState(final HAState s) {
state = s;
}
} }

View File

@ -32,8 +32,7 @@ public class UnsupportedActionException extends IOException {
/** for java.io.Serializable */ /** for java.io.Serializable */
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
public UnsupportedActionException(String action) { public UnsupportedActionException(String msg) {
super("Action " + action + "() is not supported."); super(msg);
} }
} }

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
/**
* Active state of the namenode. In this state, namenode provides the namenode
* service and handles operations of type {@link OperationCategory#WRITE} and
* {@link OperationCategory#READ}.
*/
public class ActiveState extends HAState {
public ActiveState() {
super("active");
}
@Override
public void checkOperation(NameNode nn, OperationCategory op)
throws UnsupportedActionException {
return; // Other than journal all operations are allowed in active state
}
@Override
public void setState(NameNode nn, HAState s) throws ServiceFailedException {
if (s == NameNode.STANDBY_STATE) {
setStateInternal(nn, s);
return;
}
super.setState(nn, s);
}
@Override
protected void enterState(NameNode nn) throws ServiceFailedException {
// TODO:HA
}
@Override
protected void exitState(NameNode nn) throws ServiceFailedException {
// TODO:HA
}
}

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
/**
* Namenode base state to implement state machine pattern.
*/
@InterfaceAudience.Private
abstract public class HAState {
protected final String name;
/**
* Constructor
* @param name Name of the state.
*/
public HAState(String name) {
this.name = name;
}
/**
* Internal method to transition the state of a given namenode to a new state.
* @param nn Namenode
* @param s new state
* @throws ServiceFailedException on failure to transition to new state.
*/
protected final void setStateInternal(final NameNode nn, final HAState s)
throws ServiceFailedException {
exitState(nn);
nn.setState(s);
s.enterState(nn);
}
/**
* Method to be overridden by subclasses to perform steps necessary for
* entering a state.
* @param nn Namenode
* @throws ServiceFailedException on failure to enter the state.
*/
protected abstract void enterState(final NameNode nn)
throws ServiceFailedException;
/**
* Method to be overridden by subclasses to perform steps necessary for
* exiting a state.
* @param nn Namenode
* @throws ServiceFailedException on failure to enter the state.
*/
protected abstract void exitState(final NameNode nn)
throws ServiceFailedException;
/**
* Move from the existing state to a new state
* @param nn Namenode
* @param s new state
* @throws ServiceFailedException on failure to transition to new state.
*/
public void setState(NameNode nn, HAState s) throws ServiceFailedException {
if (this == s) { // Aleady in the new state
return;
}
throw new ServiceFailedException("Transtion from state " + this + " to "
+ s + " is not allowed.");
}
/**
* Check if an operation is supported in a given state.
* @param nn Namenode
* @param op Type of the operation.
* @throws UnsupportedActionException if a given type of operation is not
* supported in this state.
*/
public void checkOperation(final NameNode nn, final OperationCategory op)
throws UnsupportedActionException {
String msg = "Operation category " + op + " is not supported in state "
+ nn.getState();
throw new UnsupportedActionException(msg);
}
@Override
public String toString() {
return super.toString();
}
}

View File

@ -0,0 +1,58 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
/**
* Namenode standby state. In this state the namenode acts as warm standby and
* keeps the following updated:
* <ul>
* <li>Namespace by getting the edits.</li>
* <li>Block location information by receiving block reports and blocks
* received from the datanodes.</li>
* </ul>
*
* It does not handle read/write/checkpoint operations.
*/
public class StandbyState extends HAState {
public StandbyState() {
super("standby");
}
@Override
public void setState(NameNode nn, HAState s) throws ServiceFailedException {
if (s == NameNode.ACTIVE_STATE) {
setStateInternal(nn, s);
return;
}
super.setState(nn, s);
}
@Override
protected void enterState(NameNode nn) throws ServiceFailedException {
// TODO:HA
}
@Override
protected void exitState(NameNode nn) throws ServiceFailedException {
// TODO:HA
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.protocol; package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.RefreshUserMappingsProtocol; import org.apache.hadoop.security.RefreshUserMappingsProtocol;
@ -32,5 +33,6 @@ public interface NamenodeProtocols
NamenodeProtocol, NamenodeProtocol,
RefreshAuthorizationPolicyProtocol, RefreshAuthorizationPolicyProtocol,
RefreshUserMappingsProtocol, RefreshUserMappingsProtocol,
GetUserMappingsProtocol { GetUserMappingsProtocol,
HAServiceProtocol {
} }