HDFS-13286. [SBN read] Add haadmin commands to transition between standby and observer. Contributed by Chao Sun.

This commit is contained in:
Erik Krogen 2018-05-04 12:22:12 -07:00 committed by Chen Liang
parent c0ca2bb853
commit 49d7bb6a92
22 changed files with 259 additions and 21 deletions

View File

@ -129,7 +129,7 @@ public class FailoverController {
if (!toSvcStatus.getState().equals(HAServiceState.STANDBY)) {
throw new FailoverFailedException(
"Can't failover to an active service");
"Can't failover to an " + toSvcStatus.getState() + " service");
}
if (!toSvcStatus.isReadyToBecomeActive()) {

View File

@ -72,6 +72,9 @@ public abstract class HAAdmin extends Configured implements Tool {
new UsageInfo("[--"+FORCEACTIVE+"] <serviceId>", "Transitions the service into Active state"))
.put("-transitionToStandby",
new UsageInfo("<serviceId>", "Transitions the service into Standby state"))
.put("-transitionToObserver",
new UsageInfo("<serviceId>",
"Transitions the service into Observer state"))
.put("-failover",
new UsageInfo("[--"+FORCEFENCE+"] [--"+FORCEACTIVE+"] <serviceId> <serviceId>",
"Failover from the first service to the second.\n" +
@ -221,6 +224,28 @@ public abstract class HAAdmin extends Configured implements Tool {
HAServiceProtocolHelper.transitionToStandby(proto, createReqInfo());
return 0;
}
private int transitionToObserver(final CommandLine cmd)
throws IOException, ServiceFailedException {
String[] argv = cmd.getArgs();
if (argv.length != 1) {
errOut.println("transitionToObserver: incorrect number of arguments");
printUsage(errOut, "-transitionToObserver");
return -1;
}
HAServiceTarget target = resolveTarget(argv[0]);
if (!checkSupportObserver(target)) {
return -1;
}
if (!checkManualStateManagementOK(target)) {
return -1;
}
HAServiceProtocol proto = target.getProxy(getConf(), 0);
HAServiceProtocolHelper.transitionToObserver(proto, createReqInfo());
return 0;
}
/**
* Ensure that we are allowed to manually manage the HA state of the target
* service. If automatic failover is configured, then the automatic
@ -249,6 +274,21 @@ public abstract class HAAdmin extends Configured implements Tool {
return true;
}
/**
* Check if the target supports the Observer state.
* @param target the target to check
* @return true if the target support Observer state, false otherwise.
*/
private boolean checkSupportObserver(HAServiceTarget target) {
if (target.supportObserver()) {
return true;
} else {
errOut.println(
"The target " + target + " doesn't support Observer state.");
return false;
}
}
private StateChangeRequestInfo createReqInfo() {
return new StateChangeRequestInfo(requestSource);
}
@ -461,6 +501,8 @@ public abstract class HAAdmin extends Configured implements Tool {
return transitionToActive(cmdLine);
} else if ("-transitionToStandby".equals(cmd)) {
return transitionToStandby(cmdLine);
} else if ("-transitionToObserver".equals(cmd)) {
return transitionToObserver(cmdLine);
} else if ("-failover".equals(cmd)) {
return failover(cmdLine);
} else if ("-getServiceState".equals(cmd)) {

View File

@ -51,6 +51,7 @@ public interface HAServiceProtocol {
INITIALIZING("initializing"),
ACTIVE("active"),
STANDBY("standby"),
OBSERVER("observer"),
STOPPING("stopping");
private String name;
@ -148,6 +149,23 @@ public interface HAServiceProtocol {
AccessControlException,
IOException;
/**
* Request service to transition to observer state. No operation, if the
* service is already in observer state.
*
* @throws ServiceFailedException
* if transition from standby to observer fails.
* @throws AccessControlException
* if access is denied.
* @throws IOException
* if other errors happen
*/
@Idempotent
void transitionToObserver(StateChangeRequestInfo reqInfo)
throws ServiceFailedException,
AccessControlException,
IOException;
/**
* Return the current status of the service. The status indicates
* the current <em>state</em> (e.g ACTIVE/STANDBY) as well as

View File

@ -60,4 +60,13 @@ public class HAServiceProtocolHelper {
throw e.unwrapRemoteException(ServiceFailedException.class);
}
}
public static void transitionToObserver(HAServiceProtocol svc,
StateChangeRequestInfo reqInfo) throws IOException {
try {
svc.transitionToObserver(reqInfo);
} catch (RemoteException e) {
throw e.unwrapRemoteException(ServiceFailedException.class);
}
}
}

View File

@ -170,4 +170,11 @@ public abstract class HAServiceTarget {
public boolean isAutoFailoverEnabled() {
return false;
}
/**
* @return true if this target supports the Observer state, false otherwise.
*/
public boolean supportObserver() {
return false;
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToObserverRequestProto;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolTranslator;
@ -115,6 +116,19 @@ public class HAServiceProtocolClientSideTranslatorPB implements
}
}
@Override
public void transitionToObserver(StateChangeRequestInfo reqInfo)
throws IOException {
try {
TransitionToObserverRequestProto req =
TransitionToObserverRequestProto.newBuilder()
.setReqInfo(convert(reqInfo)).build();
rpcProxy.transitionToObserver(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public HAServiceStatus getServiceStatus() throws IOException {
GetServiceStatusResponseProto status;
@ -141,6 +155,8 @@ public class HAServiceProtocolClientSideTranslatorPB implements
return HAServiceState.ACTIVE;
case STANDBY:
return HAServiceState.STANDBY;
case OBSERVER:
return HAServiceState.OBSERVER;
case INITIALIZING:
default:
return HAServiceState.INITIALIZING;

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequ
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveResponseProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyResponseProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToObserverRequestProto;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToObserverResponseProto;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
@ -61,6 +63,9 @@ public class HAServiceProtocolServerSideTranslatorPB implements
TransitionToActiveResponseProto.newBuilder().build();
private static final TransitionToStandbyResponseProto TRANSITION_TO_STANDBY_RESP =
TransitionToStandbyResponseProto.newBuilder().build();
private static final TransitionToObserverResponseProto
TRANSITION_TO_OBSERVER_RESP =
TransitionToObserverResponseProto.newBuilder().build();
private static final Logger LOG = LoggerFactory.getLogger(
HAServiceProtocolServerSideTranslatorPB.class);
@ -123,6 +128,18 @@ public class HAServiceProtocolServerSideTranslatorPB implements
}
}
@Override
public TransitionToObserverResponseProto transitionToObserver(
RpcController controller, TransitionToObserverRequestProto request)
throws ServiceException {
try {
server.transitionToObserver(convert(request.getReqInfo()));
return TRANSITION_TO_OBSERVER_RESP;
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public GetServiceStatusResponseProto getServiceStatus(RpcController controller,
GetServiceStatusRequestProto request) throws ServiceException {
@ -141,6 +158,9 @@ public class HAServiceProtocolServerSideTranslatorPB implements
case STANDBY:
retState = HAServiceStateProto.STANDBY;
break;
case OBSERVER:
retState = HAServiceStateProto.OBSERVER;
break;
case INITIALIZING:
default:
retState = HAServiceStateProto.INITIALIZING;

View File

@ -32,6 +32,7 @@ enum HAServiceStateProto {
INITIALIZING = 0;
ACTIVE = 1;
STANDBY = 2;
OBSERVER = 3;
}
enum HARequestSource {
@ -82,6 +83,19 @@ message TransitionToStandbyRequestProto {
message TransitionToStandbyResponseProto {
}
/**
* void request
*/
message TransitionToObserverRequestProto {
required HAStateChangeRequestInfoProto reqInfo = 1;
}
/**
* void response
*/
message TransitionToObserverResponseProto {
}
/**
* void request
*/
@ -126,6 +140,12 @@ service HAServiceProtocolService {
rpc transitionToStandby(TransitionToStandbyRequestProto)
returns(TransitionToStandbyResponseProto);
/**
* Request service to transition to observer state.
*/
rpc transitionToObserver(TransitionToObserverRequestProto)
returns(TransitionToObserverResponseProto);
/**
* Get the current status of the service.
*/

View File

@ -56,7 +56,8 @@ class DummyHAService extends HAServiceTarget {
InetSocketAddress address, healthMonitorAddress;
boolean isHealthy = true;
boolean actUnreachable = false;
boolean failToBecomeActive, failToBecomeStandby, failToFence;
boolean failToBecomeActive, failToBecomeStandby, failToBecomeObserver,
failToFence;
DummySharedResource sharedResource;
public int fenceCount = 0;
@ -216,6 +217,11 @@ class DummyHAService extends HAServiceTarget {
return true;
}
@Override
public boolean supportObserver() {
return true;
}
@Override
public String toString() {
return "DummyHAService #" + index;
@ -263,6 +269,16 @@ class DummyHAService extends HAServiceTarget {
state = HAServiceState.STANDBY;
}
@Override
public void transitionToObserver(StateChangeRequestInfo req)
throws ServiceFailedException, AccessControlException, IOException {
checkUnreachable();
if (failToBecomeObserver) {
throw new ServiceFailedException("injected failure");
}
state = HAServiceState.OBSERVER;
}
@Override
public HAServiceStatus getServiceStatus() throws IOException {
checkUnreachable();

View File

@ -187,6 +187,10 @@ public class MiniZKFCCluster {
svcs.get(idx).actUnreachable = unreachable;
}
public void setFailToBecomeObserver(int idx, boolean doFail) {
svcs.get(idx).failToBecomeObserver = doFail;
}
/**
* Wait for the given HA service to enter the given HA state.
* This is based on the state of ZKFC, not the state of HA service.

View File

@ -35,6 +35,9 @@ public enum FederationNamenodeServiceState {
case ACTIVE:
return FederationNamenodeServiceState.ACTIVE;
case STANDBY:
// TODO: we should probably have a separate state OBSERVER for RBF and
// treat it differently.
case OBSERVER:
return FederationNamenodeServiceState.STANDBY;
case INITIALIZING:
return FederationNamenodeServiceState.UNAVAILABLE;

View File

@ -779,6 +779,8 @@ public class PBHelper {
return HAServiceState.ACTIVE;
case STANDBY:
return HAServiceState.STANDBY;
case OBSERVER:
return HAServiceState.OBSERVER;
default:
throw new IllegalArgumentException("Unexpected HAServiceStateProto:"
+ s);
@ -794,6 +796,8 @@ public class PBHelper {
return NNHAStatusHeartbeatProto.State.ACTIVE;
case STANDBY:
return NNHAStatusHeartbeatProto.State.STANDBY;
case OBSERVER:
return NNHAStatusHeartbeatProto.State.OBSERVER;
default:
throw new IllegalArgumentException("Unexpected HAServiceState:"
+ s);

View File

@ -912,7 +912,7 @@ class BPServiceActor implements Runnable {
scheduler.scheduleHeartbeat();
// HDFS-9917,Standby NN IBR can be very huge if standby namenode is down
// for sometime.
if (state == HAServiceState.STANDBY) {
if (state == HAServiceState.STANDBY || state == HAServiceState.OBSERVER) {
ibrManager.clearIBRs();
}
}

View File

@ -1729,7 +1729,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return haEnabled;
}
return HAServiceState.STANDBY == haContext.getState().getServiceState();
return HAServiceState.STANDBY == haContext.getState().getServiceState() ||
HAServiceState.OBSERVER == haContext.getState().getServiceState();
}
/**

View File

@ -1763,27 +1763,37 @@ public class NameNode extends ReconfigurableBase implements
throw new ServiceFailedException("HA for namenode is not enabled");
}
if (state == OBSERVER_STATE) {
// TODO: we may need to remove this when enabling failover for observer
throw new ServiceFailedException(
"Cannot transition from Observer to Active");
"Cannot transition from '" + OBSERVER_STATE + "' to '" +
ACTIVE_STATE + "'");
}
state.setState(haContext, ACTIVE_STATE);
}
synchronized void transitionToStandby()
synchronized void transitionToStandby()
throws ServiceFailedException, AccessControlException {
namesystem.checkSuperuserPrivilege();
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
if (state == OBSERVER_STATE) {
// TODO: we may need to remove this when enabling failover for observer
throw new ServiceFailedException(
"Cannot transition from Observer to Standby");
}
state.setState(haContext, STANDBY_STATE);
}
synchronized void transitionToObserver()
throws ServiceFailedException, AccessControlException {
namesystem.checkSuperuserPrivilege();
if (!haEnabled) {
throw new ServiceFailedException("HA for namenode is not enabled");
}
// Transition from ACTIVE to OBSERVER is forbidden.
if (state == ACTIVE_STATE) {
throw new ServiceFailedException(
"Cannot transition from '" + ACTIVE_STATE + "' to '" +
OBSERVER_STATE + "'");
}
state.setState(haContext, OBSERVER_STATE);
}
synchronized HAServiceStatus getServiceStatus()
throws ServiceFailedException, AccessControlException {
if (!haEnabled) {
@ -1835,7 +1845,6 @@ public class NameNode extends ReconfigurableBase implements
@Override // NameNodeStatusMXBean
public String getState() {
// TODO: maybe we should return a different result for observer namenode?
String servStateStr = "";
HAServiceState servState = getServiceState();
if (null != servState) {

View File

@ -1755,6 +1755,14 @@ public class NameNodeRpcServer implements NamenodeProtocols {
nn.transitionToStandby();
}
@Override // HAServiceProtocol
public synchronized void transitionToObserver(StateChangeRequestInfo req)
throws ServiceFailedException, AccessControlException, IOException {
checkNNStartup();
nn.checkHaStateChange(req);
nn.transitionToObserver();
}
@Override // HAServiceProtocol
public synchronized HAServiceStatus getServiceStatus()
throws AccessControlException, ServiceFailedException, IOException {

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.StandbyException;
*/
@InterfaceAudience.Private
public class StandbyState extends HAState {
// TODO: consider implementing a ObserverState instead of using the flag.
private final boolean isObserver;
public StandbyState() {
@ -46,21 +47,18 @@ public class StandbyState extends HAState {
}
public StandbyState(boolean isObserver) {
super(HAServiceState.STANDBY);
super(isObserver ? HAServiceState.OBSERVER : HAServiceState.STANDBY);
this.isObserver = isObserver;
}
@Override
public void setState(HAContext context, HAState s) throws ServiceFailedException {
if (s == NameNode.ACTIVE_STATE) {
if (s == NameNode.ACTIVE_STATE ||
(!isObserver && s == NameNode.OBSERVER_STATE) ||
(isObserver && s == NameNode.STANDBY_STATE)) {
setStateInternal(context, s);
return;
}
if (isObserver && s == NameNode.STANDBY_STATE) {
// To guard against the exception in the following super call.
// The other case, standby -> observer, should not happen.
return;
}
super.setState(context, s);
}

View File

@ -186,4 +186,9 @@ public class NNHAServiceTarget extends HAServiceTarget {
public boolean isAutoFailoverEnabled() {
return autoFailoverEnabled;
}
@Override
public boolean supportObserver() {
return true;
}
}

View File

@ -212,6 +212,7 @@ message NNHAStatusHeartbeatProto {
enum State {
ACTIVE = 0;
STANDBY = 1;
OBSERVER = 2;
}
required State state = 1;
required uint64 txid = 2;

View File

@ -275,6 +275,12 @@ public class TestDFSHAAdmin {
Mockito.verify(mockProtocol).transitionToStandby(anyReqInfo());
}
@Test
public void testTransitionToObserver() throws Exception {
assertEquals(0, runTool("-transitionToObserver", "nn1"));
Mockito.verify(mockProtocol).transitionToObserver(anyReqInfo());
}
@Test
public void testFailoverWithNoFencerConfigured() throws Exception {
Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol).getServiceStatus();

View File

@ -116,6 +116,50 @@ public class TestDFSHAAdminMiniCluster {
assertFalse(nnode2.isStandbyState());
assertEquals(0, runTool("-transitionToStandby", "nn2"));
assertTrue(nnode2.isStandbyState());
assertEquals(0, runTool("-transitionToObserver", "nn2"));
assertFalse(nnode2.isStandbyState());
assertTrue(nnode2.isObserverState());
}
@Test
public void testObserverTransition() throws Exception {
NameNode nnode1 = cluster.getNameNode(0);
assertTrue(nnode1.isStandbyState());
// Should be able to transition from STANDBY to OBSERVER
assertEquals(0, runTool("-transitionToObserver", "nn1"));
assertFalse(nnode1.isStandbyState());
assertTrue(nnode1.isObserverState());
// Transition from Observer to Observer should be no-op
assertEquals(0, runTool("-transitionToObserver", "nn1"));
assertTrue(nnode1.isObserverState());
// Should also be able to transition back from OBSERVER to STANDBY
assertEquals(0, runTool("-transitionToStandby", "nn1"));
assertTrue(nnode1.isStandbyState());
assertFalse(nnode1.isObserverState());
}
@Test
public void testObserverIllegalTransition() throws Exception {
NameNode nnode1 = cluster.getNameNode(0);
assertTrue(nnode1.isStandbyState());
assertEquals(0, runTool("-transitionToActive", "nn1"));
assertFalse(nnode1.isStandbyState());
assertTrue(nnode1.isActiveState());
// Should NOT be able to transition from ACTIVE to OBSERVER
assertEquals(-1, runTool("-transitionToObserver", "nn1"));
assertTrue(nnode1.isActiveState());
// Should NOT be able to transition from OBSERVER to ACTIVE
assertEquals(0, runTool("-transitionToStandby", "nn1"));
assertTrue(nnode1.isStandbyState());
assertEquals(0, runTool("-transitionToObserver", "nn1"));
assertTrue(nnode1.isObserverState());
assertEquals(-1, runTool("-transitionToActive", "nn1"));
assertFalse(nnode1.isActiveState());
}
@Test

View File

@ -362,6 +362,13 @@ public class AdminService extends CompositeService implements
}
}
@Override
public synchronized void transitionToObserver(
StateChangeRequestInfo reqInfo) throws IOException {
// Should NOT get here, as RMHAServiceTarget doesn't support observer.
throw new ServiceFailedException("Does not support transition to Observer");
}
/**
* Return the HA status of this RM. This includes the current state and
* whether the RM is ready to become active.