HDFS-12975. [SBN read] Changes to the NameNode to support reads from standby. Contributed by Chao Sun.
This commit is contained in:
parent
773591ee42
commit
4cf63905d0
|
@ -156,7 +156,9 @@ public interface HdfsServerConstants {
|
|||
// only used for StorageDirectory.analyzeStorage() in hot swap drive scenario.
|
||||
// TODO refactor StorageDirectory.analyzeStorage() so that we can do away with
|
||||
// this in StartupOption.
|
||||
HOTSWAP("-hotswap");
|
||||
HOTSWAP("-hotswap"),
|
||||
// Startup the namenode in observer mode.
|
||||
OBSERVER("-observer");
|
||||
|
||||
private static final Pattern ENUM_WITH_ROLLING_UPGRADE_OPTION = Pattern.compile(
|
||||
"(\\w+)\\((\\w+)\\)");
|
||||
|
|
|
@ -537,7 +537,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
private final ReentrantLock cpLock;
|
||||
|
||||
/**
|
||||
* Used when this NN is in standby state to read from the shared edit log.
|
||||
* Used when this NN is in standby or observer state to read from the
|
||||
* shared edit log.
|
||||
*/
|
||||
private EditLogTailer editLogTailer = null;
|
||||
|
||||
|
@ -1368,24 +1369,25 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
|
||||
/**
|
||||
* Start services required in standby state
|
||||
* Start services required in standby or observer state
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
void startStandbyServices(final Configuration conf) throws IOException {
|
||||
LOG.info("Starting services required for standby state");
|
||||
void startStandbyServices(final Configuration conf, boolean isObserver)
|
||||
throws IOException {
|
||||
LOG.info("Starting services required for " +
|
||||
(isObserver ? "observer" : "standby") + " state");
|
||||
if (!getFSImage().editLog.isOpenForRead()) {
|
||||
// During startup, we're already open for read.
|
||||
getFSImage().editLog.initSharedJournalsForRead();
|
||||
}
|
||||
|
||||
blockManager.setPostponeBlocksFromFuture(true);
|
||||
|
||||
// Disable quota checks while in standby.
|
||||
dir.disableQuotaChecks();
|
||||
editLogTailer = new EditLogTailer(this, conf);
|
||||
editLogTailer.start();
|
||||
if (standbyShouldCheckpoint) {
|
||||
if (!isObserver && standbyShouldCheckpoint) {
|
||||
standbyCheckpointer = new StandbyCheckpointer(conf, this);
|
||||
standbyCheckpointer.start();
|
||||
}
|
||||
|
|
|
@ -369,6 +369,7 @@ public class NameNode extends ReconfigurableBase implements
|
|||
LoggerFactory.getLogger("BlockStateChange");
|
||||
public static final HAState ACTIVE_STATE = new ActiveState();
|
||||
public static final HAState STANDBY_STATE = new StandbyState();
|
||||
public static final HAState OBSERVER_STATE = new StandbyState(true);
|
||||
|
||||
private static final String NAMENODE_HTRACE_PREFIX = "namenode.htrace.";
|
||||
|
||||
|
@ -972,9 +973,11 @@ public class NameNode extends ReconfigurableBase implements
|
|||
}
|
||||
|
||||
protected HAState createHAState(StartupOption startOpt) {
|
||||
if (!haEnabled || startOpt == StartupOption.UPGRADE
|
||||
if (!haEnabled || startOpt == StartupOption.UPGRADE
|
||||
|| startOpt == StartupOption.UPGRADEONLY) {
|
||||
return ACTIVE_STATE;
|
||||
} else if (startOpt == StartupOption.OBSERVER) {
|
||||
return OBSERVER_STATE;
|
||||
} else {
|
||||
return STANDBY_STATE;
|
||||
}
|
||||
|
@ -1440,6 +1443,8 @@ public class NameNode extends ReconfigurableBase implements
|
|||
startOpt = StartupOption.BACKUP;
|
||||
} else if (StartupOption.CHECKPOINT.getName().equalsIgnoreCase(cmd)) {
|
||||
startOpt = StartupOption.CHECKPOINT;
|
||||
} else if (StartupOption.OBSERVER.getName().equalsIgnoreCase(cmd)) {
|
||||
startOpt = StartupOption.OBSERVER;
|
||||
} else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)
|
||||
|| StartupOption.UPGRADEONLY.getName().equalsIgnoreCase(cmd)) {
|
||||
startOpt = StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd) ?
|
||||
|
@ -1753,6 +1758,11 @@ public class NameNode extends ReconfigurableBase implements
|
|||
if (!haEnabled) {
|
||||
throw new ServiceFailedException("HA for namenode is not enabled");
|
||||
}
|
||||
if (state == OBSERVER_STATE) {
|
||||
// TODO: we may need to remove this when enabling failover for observer
|
||||
throw new ServiceFailedException(
|
||||
"Cannot transition from Observer to Active");
|
||||
}
|
||||
state.setState(haContext, ACTIVE_STATE);
|
||||
}
|
||||
|
||||
|
@ -1762,6 +1772,11 @@ public class NameNode extends ReconfigurableBase implements
|
|||
if (!haEnabled) {
|
||||
throw new ServiceFailedException("HA for namenode is not enabled");
|
||||
}
|
||||
if (state == OBSERVER_STATE) {
|
||||
// TODO: we may need to remove this when enabling failover for observer
|
||||
throw new ServiceFailedException(
|
||||
"Cannot transition from Observer to Standby");
|
||||
}
|
||||
state.setState(haContext, STANDBY_STATE);
|
||||
}
|
||||
|
||||
|
@ -1816,6 +1831,7 @@ public class NameNode extends ReconfigurableBase implements
|
|||
|
||||
@Override // NameNodeStatusMXBean
|
||||
public String getState() {
|
||||
// TODO: maybe we should return a different result for observer namenode?
|
||||
String servStateStr = "";
|
||||
HAServiceState servState = getServiceState();
|
||||
if (null != servState) {
|
||||
|
@ -1916,7 +1932,8 @@ public class NameNode extends ReconfigurableBase implements
|
|||
@Override
|
||||
public void startStandbyServices() throws IOException {
|
||||
try {
|
||||
namesystem.startStandbyServices(getConf());
|
||||
namesystem.startStandbyServices(getConf(),
|
||||
state == NameNode.OBSERVER_STATE);
|
||||
} catch (Throwable t) {
|
||||
doImmediateShutdown(t);
|
||||
}
|
||||
|
@ -1963,6 +1980,9 @@ public class NameNode extends ReconfigurableBase implements
|
|||
|
||||
@Override
|
||||
public boolean allowStaleReads() {
|
||||
if (state == OBSERVER_STATE) {
|
||||
return true;
|
||||
}
|
||||
return allowStaleStandbyReads;
|
||||
}
|
||||
|
||||
|
@ -1976,6 +1996,10 @@ public class NameNode extends ReconfigurableBase implements
|
|||
return (state.equals(ACTIVE_STATE));
|
||||
}
|
||||
|
||||
public boolean isObserverState() {
|
||||
return state.equals(OBSERVER_STATE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the NameNode is completely started
|
||||
*/
|
||||
|
|
|
@ -1551,7 +1551,7 @@ public class NameNodeRpcServer implements NamenodeProtocols {
|
|||
|
||||
if (nn.getFSImage().isUpgradeFinalized() &&
|
||||
!namesystem.isRollingUpgrade() &&
|
||||
!nn.isStandbyState() &&
|
||||
nn.isActiveState() &&
|
||||
noStaleStorages) {
|
||||
return new FinalizeCommand(poolId);
|
||||
}
|
||||
|
|
|
@ -39,8 +39,15 @@ import org.apache.hadoop.ipc.StandbyException;
|
|||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class StandbyState extends HAState {
|
||||
private final boolean isObserver;
|
||||
|
||||
public StandbyState() {
|
||||
this(false);
|
||||
}
|
||||
|
||||
public StandbyState(boolean isObserver) {
|
||||
super(HAServiceState.STANDBY);
|
||||
this.isObserver = isObserver;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -49,6 +56,11 @@ public class StandbyState extends HAState {
|
|||
setStateInternal(context, s);
|
||||
return;
|
||||
}
|
||||
if (isObserver && s == NameNode.STANDBY_STATE) {
|
||||
// To guard against the exception in the following super call.
|
||||
// The other case, standby -> observer, should not happen.
|
||||
return;
|
||||
}
|
||||
super.setState(context, s);
|
||||
}
|
||||
|
||||
|
@ -92,5 +104,10 @@ public class StandbyState extends HAState {
|
|||
public boolean shouldPopulateReplQueues() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return isObserver ? "observer" : "standby";
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue