HDFS-12975. [SBN read] Changes to the NameNode to support reads from standby. Contributed by Chao Sun.

This commit is contained in:
Chao Sun 2018-03-20 18:37:59 -07:00 committed by Chen Liang
parent 15062b6d28
commit 61b510815c
5 changed files with 55 additions and 10 deletions

View File

@ -157,7 +157,9 @@ public interface HdfsServerConstants {
// only used for StorageDirectory.analyzeStorage() in hot swap drive scenario. // only used for StorageDirectory.analyzeStorage() in hot swap drive scenario.
// TODO refactor StorageDirectory.analyzeStorage() so that we can do away with // TODO refactor StorageDirectory.analyzeStorage() so that we can do away with
// this in StartupOption. // this in StartupOption.
HOTSWAP("-hotswap"); HOTSWAP("-hotswap"),
// Startup the namenode in observer mode.
OBSERVER("-observer");
private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile( private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
"(\\w+)\\((\\w+)\\)"); "(\\w+)\\((\\w+)\\)");

View File

@ -505,7 +505,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
private final ReentrantLock cpLock; private final ReentrantLock cpLock;
/** /**
* Used when this NN is in standby state to read from the shared edit log. * Used when this NN is in standby or observer state to read from the
* shared edit log.
*/ */
private EditLogTailer editLogTailer = null; private EditLogTailer editLogTailer = null;
@ -1327,24 +1328,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
} }
/** /**
* Start services required in standby state * Start services required in standby or observer state
* *
* @throws IOException * @throws IOException
*/ */
void startStandbyServices(final Configuration conf) throws IOException { void startStandbyServices(final Configuration conf, boolean isObserver)
LOG.info("Starting services required for standby state"); throws IOException {
LOG.info("Starting services required for " +
(isObserver ? "observer" : "standby") + " state");
if (!getFSImage().editLog.isOpenForRead()) { if (!getFSImage().editLog.isOpenForRead()) {
// During startup, we're already open for read. // During startup, we're already open for read.
getFSImage().editLog.initSharedJournalsForRead(); getFSImage().editLog.initSharedJournalsForRead();
} }
blockManager.setPostponeBlocksFromFuture(true); blockManager.setPostponeBlocksFromFuture(true);
// Disable quota checks while in standby. // Disable quota checks while in standby.
dir.disableQuotaChecks(); dir.disableQuotaChecks();
editLogTailer = new EditLogTailer(this, conf); editLogTailer = new EditLogTailer(this, conf);
editLogTailer.start(); editLogTailer.start();
if (standbyShouldCheckpoint) { if (!isObserver && standbyShouldCheckpoint) {
standbyCheckpointer = new StandbyCheckpointer(conf, this); standbyCheckpointer = new StandbyCheckpointer(conf, this);
standbyCheckpointer.start(); standbyCheckpointer.start();
} }

View File

@ -356,6 +356,7 @@ public class NameNode extends ReconfigurableBase implements
LoggerFactory.getLogger("BlockStateChange"); LoggerFactory.getLogger("BlockStateChange");
public static final HAState ACTIVE_STATE = new ActiveState(); public static final HAState ACTIVE_STATE = new ActiveState();
public static final HAState STANDBY_STATE = new StandbyState(); public static final HAState STANDBY_STATE = new StandbyState();
public static final HAState OBSERVER_STATE = new StandbyState(true);
private static final String NAMENODE_HTRACE_PREFIX = "namenode.htrace."; private static final String NAMENODE_HTRACE_PREFIX = "namenode.htrace.";
@ -981,6 +982,8 @@ public class NameNode extends ReconfigurableBase implements
if (!haEnabled || startOpt == StartupOption.UPGRADE if (!haEnabled || startOpt == StartupOption.UPGRADE
|| startOpt == StartupOption.UPGRADEONLY) { || startOpt == StartupOption.UPGRADEONLY) {
return ACTIVE_STATE; return ACTIVE_STATE;
} else if (startOpt == StartupOption.OBSERVER) {
return OBSERVER_STATE;
} else { } else {
return STANDBY_STATE; return STANDBY_STATE;
} }
@ -1443,6 +1446,8 @@ public class NameNode extends ReconfigurableBase implements
startOpt = StartupOption.BACKUP; startOpt = StartupOption.BACKUP;
} else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) { } else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.CHECKPOINT; startOpt = StartupOption.CHECKPOINT;
} else if (StartupOption.OBSERVER.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.OBSERVER;
} else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)
|| StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) { || StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) {
startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ? startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ?
@ -1774,6 +1779,11 @@ public class NameNode extends ReconfigurableBase implements
if (!haEnabled) { if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled"); throw new ServiceFailedException("HA for namenode is not enabled");
} }
if (state == OBSERVER_STATE) {
// TODO: we may need to remove this when enabling failover for observer
throw new ServiceFailedException(
"Cannot transition from Observer to Active");
}
state.setState(haContext, ACTIVE_STATE); state.setState(haContext, ACTIVE_STATE);
} }
@ -1783,6 +1793,11 @@ public class NameNode extends ReconfigurableBase implements
if (!haEnabled) { if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled"); throw new ServiceFailedException("HA for namenode is not enabled");
} }
if (state == OBSERVER_STATE) {
// TODO: we may need to remove this when enabling failover for observer
throw new ServiceFailedException(
"Cannot transition from Observer to Standby");
}
state.setState(haContext, STANDBY_STATE); state.setState(haContext, STANDBY_STATE);
} }
@ -1837,6 +1852,7 @@ public class NameNode extends ReconfigurableBase implements
@Override // NameNodeStatusMXBean @Override // NameNodeStatusMXBean
public String getState() { public String getState() {
// TODO: maybe we should return a different result for observer namenode?
String servStateStr = ""; String servStateStr = "";
HAServiceState servState = getServiceState(); HAServiceState servState = getServiceState();
if (null != servState) { if (null != servState) {
@ -1937,7 +1953,8 @@ public class NameNode extends ReconfigurableBase implements
@Override @Override
public void startStandbyServices() throws IOException { public void startStandbyServices() throws IOException {
try { try {
namesystem.startStandbyServices(getConf()); namesystem.startStandbyServices(getConf(),
state == NameNode.OBSERVER_STATE);
} catch (Throwable t) { } catch (Throwable t) {
doImmediateShutdown(t); doImmediateShutdown(t);
} }
@ -1984,6 +2001,9 @@ public class NameNode extends ReconfigurableBase implements
@Override @Override
public boolean allowStaleReads() { public boolean allowStaleReads() {
if (state == OBSERVER_STATE) {
return true;
}
return allowStaleStandbyReads; return allowStaleStandbyReads;
} }
@ -1997,6 +2017,10 @@ public class NameNode extends ReconfigurableBase implements
return (state.equals(ACTIVE_STATE)); return (state.equals(ACTIVE_STATE));
} }
public boolean isObserverState() {
return state.equals(OBSERVER_STATE);
}
/** /**
* Returns whether the NameNode is completely started * Returns whether the NameNode is completely started
*/ */

View File

@ -1484,7 +1484,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
if (nn.getFSImage().isUpgradeFinalized() && if (nn.getFSImage().isUpgradeFinalized() &&
!namesystem.isRollingUpgrade() && !namesystem.isRollingUpgrade() &&
!nn.isStandbyState() && nn.isActiveState() &&
noStaleStorages) { noStaleStorages) {
return new FinalizeCommand(poolId); return new FinalizeCommand(poolId);
} }

View File

@ -39,8 +39,15 @@ import org.apache.hadoop.ipc.StandbyException;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class StandbyState extends HAState { public class StandbyState extends HAState {
private final boolean isObserver;
public StandbyState() { public StandbyState() {
this(false);
}
public StandbyState(boolean isObserver) {
super(HAServiceState.STANDBY); super(HAServiceState.STANDBY);
this.isObserver = isObserver;
} }
@Override @Override
@ -49,6 +56,11 @@ public class StandbyState extends HAState {
setStateInternal(context, s); setStateInternal(context, s);
return; return;
} }
if (isObserver && s == NameNode.STANDBY_STATE) {
// To guard against the exception in the following super call.
// The other case, standby -> observer, should not happen.
return;
}
super.setState(context, s); super.setState(context, s);
} }
@ -92,5 +104,10 @@ public class StandbyState extends HAState {
public boolean shouldPopulateReplQueues() { public boolean shouldPopulateReplQueues() {
return false; return false;
} }
@Override
public String toString() {
return isObserver ? "observer" : "standby";
}
} }