HDFS-12975. [SBN read] Changes to the NameNode to support reads from standby. Contributed by Chao Sun.

This commit is contained in:
Chao Sun 2018-03-20 18:37:59 -07:00 committed by Konstantin V Shvachko
parent 686fcd4db3
commit 3a78e5ffd8
5 changed files with 55 additions and 10 deletions

View File

@ -157,7 +157,9 @@ enum StartupOption{
// only used for StorageDirectory.analyzeStorage() in hot swap drive scenario.
// TODO refactor StorageDirectory.analyzeStorage() so that we can do away with
// this in StartupOption.
HOTSWAP("-hotswap");
HOTSWAP("-hotswap"),
// Startup the namenode in observer mode.
OBSERVER("-observer");
private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
"(\\w+)\\((\\w+)\\)");

View File

@ -542,7 +542,8 @@ private void logAuditEvent(boolean succeeded,
private final ReentrantLock cpLock;
/**
* Used when this NN is in standby state to read from the shared edit log.
* Used when this NN is in standby or observer state to read from the
* shared edit log.
*/
private EditLogTailer editLogTailer = null;
@ -1380,24 +1381,25 @@ void stopActiveServices() {
}
/**
* Start services required in standby state
* Start services required in standby or observer state
*
* @throws IOException
*/
void startStandbyServices(final Configuration conf) throws IOException {
LOG.info("Starting services required for standby state");
void startStandbyServices(final Configuration conf, boolean isObserver)
throws IOException {
LOG.info("Starting services required for " +
(isObserver ? "observer" : "standby") + " state");
if (!getFSImage().editLog.isOpenForRead()) {
// During startup, we're already open for read.
getFSImage().editLog.initSharedJournalsForRead();
}
blockManager.setPostponeBlocksFromFuture(true);
// Disable quota checks while in standby.
dir.disableQuotaChecks();
editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start();
if (standbyShouldCheckpoint) {
if (!isObserver && standbyShouldCheckpoint) {
standbyCheckpointer = new StandbyCheckpointer(conf, this);
standbyCheckpointer.start();
}

View File

@ -365,6 +365,7 @@ public long getProtocolVersion(String protocol,
LoggerFactory.getLogger("BlockStateChange");
public static final HAState ACTIVE_STATE = new ActiveState();
public static final HAState STANDBY_STATE = new StandbyState();
public static final HAState OBSERVER_STATE = new StandbyState(true);
private static final String NAMENODE_HTRACE_PREFIX = "namenode.htrace.";
@ -984,9 +985,11 @@ private void stopAtException(Exception e){
}
protected HAState createHAState(StartupOption startOpt) {
if (!haEnabled || startOpt == StartupOption.UPGRADE
if (!haEnabled || startOpt == StartupOption.UPGRADE
|| startOpt == StartupOption.UPGRADEONLY) {
return ACTIVE_STATE;
} else if (startOpt == StartupOption.OBSERVER) {
return OBSERVER_STATE;
} else {
return STANDBY_STATE;
}
@ -1481,6 +1484,8 @@ static StartupOption parseArguments(String args[]) {
startOpt = StartupOption.BACKUP;
} else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.CHECKPOINT;
} else if (StartupOption.OBSERVER.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.OBSERVER;
} else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)
|| StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ?
@ -1794,6 +1799,11 @@ synchronized void transitionToActive()
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
if (state == OBSERVER_STATE) {
// TODO: we may need to remove this when enabling failover for observer
throw new ServiceFailedException(
"Cannot transition from Observer to Active");
}
state.setState(haContext, ACTIVE_STATE);
}
@ -1803,6 +1813,11 @@ synchronized void transitionToStandby()
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
if (state == OBSERVER_STATE) {
// TODO: we may need to remove this when enabling failover for observer
throw new ServiceFailedException(
"Cannot transition from Observer to Standby");
}
state.setState(haContext, STANDBY_STATE);
}
@ -1857,6 +1872,7 @@ public String getNNRole() {
@Override // NameNodeStatusMXBean
public String getState() {
// TODO: maybe we should return a different result for observer namenode?
String servStateStr = "";
HAServiceState servState = getServiceState();
if (null != servState) {
@ -1957,7 +1973,8 @@ public void stopActiveServices() throws IOException {
@Override
public void startStandbyServices() throws IOException {
try {
namesystem.startStandbyServices(getConf());
namesystem.startStandbyServices(getConf(),
state == NameNode.OBSERVER_STATE);
} catch (Throwable t) {
doImmediateShutdown(t);
}
@ -2004,6 +2021,9 @@ public void checkOperation(final OperationCategory op)
@Override
public boolean allowStaleReads() {
if (state == OBSERVER_STATE) {
return true;
}
return allowStaleStandbyReads;
}
@ -2017,6 +2037,10 @@ public boolean isActiveState() {
return (state.equals(ACTIVE_STATE));
}
public boolean isObserverState() {
return state.equals(OBSERVER_STATE);
}
/**
* Returns whether the NameNode is completely started
*/

View File

@ -1575,7 +1575,7 @@ public Boolean call() throws IOException {
if (nn.getFSImage().isUpgradeFinalized() &&
!namesystem.isRollingUpgrade() &&
!nn.isStandbyState() &&
nn.isActiveState() &&
noStaleStorages) {
return new FinalizeCommand(poolId);
}

View File

@ -39,8 +39,15 @@
*/
@InterfaceAudience.Private
public class StandbyState extends HAState {
private final boolean isObserver;
public StandbyState() {
this(false);
}
public StandbyState(boolean isObserver) {
super(HAServiceState.STANDBY);
this.isObserver = isObserver;
}
@Override
@ -49,6 +56,11 @@ public void setState(HAContext context, HAState s) throws ServiceFailedException
setStateInternal(context, s);
return;
}
if (isObserver && s == NameNode.STANDBY_STATE) {
// To guard against the exception in the following super call.
// The other case, standby -> observer, should not happen.
return;
}
super.setState(context, s);
}
@ -92,5 +104,10 @@ public void checkOperation(HAContext context, OperationCategory op)
public boolean shouldPopulateReplQueues() {
return false;
}
@Override
public String toString() {
return isObserver ? "observer" : "standby";
}
}