HADOOP-8247. Add a config to enable auto-HA, which disables manual FailoverController. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1324566 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
693ec453d2
commit
41a014c31b
|
@ -17,3 +17,5 @@ HADOOP-8257. TestZKFailoverControllerStress occasionally fails with Mockito erro
|
|||
HADOOP-8260. Replace ClientBaseWithFixes with our own modified copy of the class (todd)
|
||||
|
||||
HADOOP-8246. Auto-HA: automatically scope znode by nameservice ID (todd)
|
||||
|
||||
HADOOP-8247. Add a config to enable auto-HA, which disables manual FailoverController (todd)
|
||||
|
|
|
@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -48,9 +50,12 @@ public class FailoverController {
|
|||
|
||||
private final Configuration conf;
|
||||
|
||||
private final RequestSource requestSource;
|
||||
|
||||
public FailoverController(Configuration conf) {
|
||||
public FailoverController(Configuration conf,
|
||||
RequestSource source) {
|
||||
this.conf = conf;
|
||||
this.requestSource = source;
|
||||
|
||||
this.gracefulFenceTimeout = getGracefulFenceTimeout(conf);
|
||||
this.rpcTimeoutToNewActive = getRpcTimeoutToNewActive(conf);
|
||||
|
@ -100,7 +105,7 @@ public class FailoverController {
|
|||
toSvcStatus = toSvc.getServiceStatus();
|
||||
} catch (IOException e) {
|
||||
String msg = "Unable to get service state for " + target;
|
||||
LOG.error(msg, e);
|
||||
LOG.error(msg + ": " + e.getLocalizedMessage());
|
||||
throw new FailoverFailedException(msg, e);
|
||||
}
|
||||
|
||||
|
@ -122,7 +127,7 @@ public class FailoverController {
|
|||
}
|
||||
|
||||
try {
|
||||
HAServiceProtocolHelper.monitorHealth(toSvc);
|
||||
HAServiceProtocolHelper.monitorHealth(toSvc, createReqInfo());
|
||||
} catch (HealthCheckFailedException hce) {
|
||||
throw new FailoverFailedException(
|
||||
"Can't failover to an unhealthy service", hce);
|
||||
|
@ -132,6 +137,9 @@ public class FailoverController {
|
|||
}
|
||||
}
|
||||
|
||||
private StateChangeRequestInfo createReqInfo() {
|
||||
return new StateChangeRequestInfo(requestSource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to get the HA state of the node at the given address. This
|
||||
|
@ -143,7 +151,7 @@ public class FailoverController {
|
|||
HAServiceProtocol proxy = null;
|
||||
try {
|
||||
proxy = svc.getProxy(conf, gracefulFenceTimeout);
|
||||
proxy.transitionToStandby();
|
||||
proxy.transitionToStandby(createReqInfo());
|
||||
return true;
|
||||
} catch (ServiceFailedException sfe) {
|
||||
LOG.warn("Unable to gracefully make " + svc + " standby (" +
|
||||
|
@ -198,7 +206,8 @@ public class FailoverController {
|
|||
Throwable cause = null;
|
||||
try {
|
||||
HAServiceProtocolHelper.transitionToActive(
|
||||
toSvc.getProxy(conf, rpcTimeoutToNewActive));
|
||||
toSvc.getProxy(conf, rpcTimeoutToNewActive),
|
||||
createReqInfo());
|
||||
} catch (ServiceFailedException sfe) {
|
||||
LOG.error("Unable to make " + toSvc + " active (" +
|
||||
sfe.getMessage() + "). Failing back.");
|
||||
|
|
|
@ -19,11 +19,11 @@ package org.apache.hadoop.ha;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.cli.Options;
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.CommandLineParser;
|
||||
import org.apache.commons.cli.GnuParser;
|
||||
import org.apache.commons.cli.ParseException;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -33,6 +33,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
|
@ -49,6 +51,13 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
|
||||
private static final String FORCEFENCE = "forcefence";
|
||||
private static final String FORCEACTIVE = "forceactive";
|
||||
|
||||
/**
|
||||
* Undocumented flag which allows an administrator to use manual failover
|
||||
* state transitions even when auto-failover is enabled. This is an unsafe
|
||||
* operation, which is why it is not documented in the usage below.
|
||||
*/
|
||||
private static final String FORCEMANUAL = "forcemanual";
|
||||
private static final Log LOG = LogFactory.getLog(HAAdmin.class);
|
||||
|
||||
private int rpcTimeoutForChecks = -1;
|
||||
|
@ -79,6 +88,7 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
/** Output stream for errors, for use in tests */
|
||||
protected PrintStream errOut = System.err;
|
||||
PrintStream out = System.out;
|
||||
private RequestSource requestSource = RequestSource.REQUEST_BY_USER;
|
||||
|
||||
protected abstract HAServiceTarget resolveTarget(String string);
|
||||
|
||||
|
@ -106,63 +116,83 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
errOut.println("Usage: HAAdmin [" + cmd + " " + usage.args + "]");
|
||||
}
|
||||
|
||||
private int transitionToActive(final String[] argv)
|
||||
private int transitionToActive(final CommandLine cmd)
|
||||
throws IOException, ServiceFailedException {
|
||||
if (argv.length != 2) {
|
||||
String[] argv = cmd.getArgs();
|
||||
if (argv.length != 1) {
|
||||
errOut.println("transitionToActive: incorrect number of arguments");
|
||||
printUsage(errOut, "-transitionToActive");
|
||||
return -1;
|
||||
}
|
||||
|
||||
HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
|
||||
HAServiceTarget target = resolveTarget(argv[0]);
|
||||
if (!checkManualStateManagementOK(target)) {
|
||||
return -1;
|
||||
}
|
||||
HAServiceProtocol proto = target.getProxy(
|
||||
getConf(), 0);
|
||||
HAServiceProtocolHelper.transitionToActive(proto);
|
||||
HAServiceProtocolHelper.transitionToActive(proto, createReqInfo());
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int transitionToStandby(final String[] argv)
|
||||
private int transitionToStandby(final CommandLine cmd)
|
||||
throws IOException, ServiceFailedException {
|
||||
if (argv.length != 2) {
|
||||
String[] argv = cmd.getArgs();
|
||||
if (argv.length != 1) {
|
||||
errOut.println("transitionToStandby: incorrect number of arguments");
|
||||
printUsage(errOut, "-transitionToStandby");
|
||||
return -1;
|
||||
}
|
||||
|
||||
HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
|
||||
getConf(), 0);
|
||||
HAServiceProtocolHelper.transitionToStandby(proto);
|
||||
return 0;
|
||||
}
|
||||
|
||||
private int failover(final String[] argv)
|
||||
throws IOException, ServiceFailedException {
|
||||
boolean forceFence = false;
|
||||
boolean forceActive = false;
|
||||
|
||||
Options failoverOpts = new Options();
|
||||
// "-failover" isn't really an option but we need to add
|
||||
// it to appease CommandLineParser
|
||||
failoverOpts.addOption("failover", false, "failover");
|
||||
failoverOpts.addOption(FORCEFENCE, false, "force fencing");
|
||||
failoverOpts.addOption(FORCEACTIVE, false, "force failover");
|
||||
|
||||
CommandLineParser parser = new GnuParser();
|
||||
CommandLine cmd;
|
||||
|
||||
try {
|
||||
cmd = parser.parse(failoverOpts, argv);
|
||||
forceFence = cmd.hasOption(FORCEFENCE);
|
||||
forceActive = cmd.hasOption(FORCEACTIVE);
|
||||
} catch (ParseException pe) {
|
||||
errOut.println("failover: incorrect arguments");
|
||||
printUsage(errOut, "-failover");
|
||||
HAServiceTarget target = resolveTarget(argv[0]);
|
||||
if (!checkManualStateManagementOK(target)) {
|
||||
return -1;
|
||||
}
|
||||
HAServiceProtocol proto = target.getProxy(
|
||||
getConf(), 0);
|
||||
HAServiceProtocolHelper.transitionToStandby(proto, createReqInfo());
|
||||
return 0;
|
||||
}
|
||||
/**
|
||||
* Ensure that we are allowed to manually manage the HA state of the target
|
||||
* service. If automatic failover is configured, then the automatic
|
||||
* failover controllers should be doing state management, and it is generally
|
||||
* an error to use the HAAdmin command line to do so.
|
||||
*
|
||||
* @param target the target to check
|
||||
* @return true if manual state management is allowed
|
||||
*/
|
||||
private boolean checkManualStateManagementOK(HAServiceTarget target) {
|
||||
if (target.isAutoFailoverEnabled()) {
|
||||
if (requestSource != RequestSource.REQUEST_BY_USER_FORCED) {
|
||||
errOut.println(
|
||||
"Automatic failover is enabled for " + target + "\n" +
|
||||
"Refusing to manually manage HA state, since it may cause\n" +
|
||||
"a split-brain scenario or other incorrect state.\n" +
|
||||
"If you are very sure you know what you are doing, please \n" +
|
||||
"specify the " + FORCEMANUAL + " flag.");
|
||||
return false;
|
||||
} else {
|
||||
LOG.warn("Proceeding with manual HA state management even though\n" +
|
||||
"automatic failover is enabled for " + target);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private StateChangeRequestInfo createReqInfo() {
|
||||
return new StateChangeRequestInfo(requestSource);
|
||||
}
|
||||
|
||||
private int failover(CommandLine cmd)
|
||||
throws IOException, ServiceFailedException {
|
||||
boolean forceFence = cmd.hasOption(FORCEFENCE);
|
||||
boolean forceActive = cmd.hasOption(FORCEACTIVE);
|
||||
|
||||
int numOpts = cmd.getOptions() == null ? 0 : cmd.getOptions().length;
|
||||
final String[] args = cmd.getArgs();
|
||||
|
||||
if (numOpts > 2 || args.length != 2) {
|
||||
if (numOpts > 3 || args.length != 2) {
|
||||
errOut.println("failover: incorrect arguments");
|
||||
printUsage(errOut, "-failover");
|
||||
return -1;
|
||||
|
@ -171,7 +201,13 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
HAServiceTarget fromNode = resolveTarget(args[0]);
|
||||
HAServiceTarget toNode = resolveTarget(args[1]);
|
||||
|
||||
FailoverController fc = new FailoverController(getConf());
|
||||
if (!checkManualStateManagementOK(fromNode) ||
|
||||
!checkManualStateManagementOK(toNode)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
FailoverController fc = new FailoverController(getConf(),
|
||||
requestSource);
|
||||
|
||||
try {
|
||||
fc.failover(fromNode, toNode, forceFence, forceActive);
|
||||
|
@ -183,18 +219,18 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
return 0;
|
||||
}
|
||||
|
||||
private int checkHealth(final String[] argv)
|
||||
private int checkHealth(final CommandLine cmd)
|
||||
throws IOException, ServiceFailedException {
|
||||
if (argv.length != 2) {
|
||||
String[] argv = cmd.getArgs();
|
||||
if (argv.length != 1) {
|
||||
errOut.println("checkHealth: incorrect number of arguments");
|
||||
printUsage(errOut, "-checkHealth");
|
||||
return -1;
|
||||
}
|
||||
|
||||
HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
|
||||
HAServiceProtocol proto = resolveTarget(argv[0]).getProxy(
|
||||
getConf(), rpcTimeoutForChecks);
|
||||
try {
|
||||
HAServiceProtocolHelper.monitorHealth(proto);
|
||||
HAServiceProtocolHelper.monitorHealth(proto, createReqInfo());
|
||||
} catch (HealthCheckFailedException e) {
|
||||
errOut.println("Health check failed: " + e.getLocalizedMessage());
|
||||
return -1;
|
||||
|
@ -202,15 +238,16 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
return 0;
|
||||
}
|
||||
|
||||
private int getServiceState(final String[] argv)
|
||||
private int getServiceState(final CommandLine cmd)
|
||||
throws IOException, ServiceFailedException {
|
||||
if (argv.length != 2) {
|
||||
String[] argv = cmd.getArgs();
|
||||
if (argv.length != 1) {
|
||||
errOut.println("getServiceState: incorrect number of arguments");
|
||||
printUsage(errOut, "-getServiceState");
|
||||
return -1;
|
||||
}
|
||||
|
||||
HAServiceProtocol proto = resolveTarget(argv[1]).getProxy(
|
||||
HAServiceProtocol proto = resolveTarget(argv[0]).getProxy(
|
||||
getConf(), rpcTimeoutForChecks);
|
||||
out.println(proto.getServiceStatus().getState());
|
||||
return 0;
|
||||
|
@ -264,23 +301,98 @@ public abstract class HAAdmin extends Configured implements Tool {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if ("-transitionToActive".equals(cmd)) {
|
||||
return transitionToActive(argv);
|
||||
} else if ("-transitionToStandby".equals(cmd)) {
|
||||
return transitionToStandby(argv);
|
||||
} else if ("-failover".equals(cmd)) {
|
||||
return failover(argv);
|
||||
} else if ("-getServiceState".equals(cmd)) {
|
||||
return getServiceState(argv);
|
||||
} else if ("-checkHealth".equals(cmd)) {
|
||||
return checkHealth(argv);
|
||||
} else if ("-help".equals(cmd)) {
|
||||
return help(argv);
|
||||
} else {
|
||||
if (!USAGE.containsKey(cmd)) {
|
||||
errOut.println(cmd.substring(1) + ": Unknown command");
|
||||
printUsage(errOut);
|
||||
return -1;
|
||||
}
|
||||
|
||||
Options opts = new Options();
|
||||
|
||||
// Add command-specific options
|
||||
if ("-failover".equals(cmd)) {
|
||||
addFailoverCliOpts(opts);
|
||||
}
|
||||
// Mutative commands take FORCEMANUAL option
|
||||
if ("-transitionToActive".equals(cmd) ||
|
||||
"-transitionToStandby".equals(cmd) ||
|
||||
"-failover".equals(cmd)) {
|
||||
opts.addOption(FORCEMANUAL, false,
|
||||
"force manual control even if auto-failover is enabled");
|
||||
}
|
||||
|
||||
CommandLine cmdLine = parseOpts(cmd, opts, argv);
|
||||
if (cmdLine == null) {
|
||||
// error already printed
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (cmdLine.hasOption(FORCEMANUAL)) {
|
||||
if (!confirmForceManual()) {
|
||||
LOG.fatal("Aborted");
|
||||
return -1;
|
||||
}
|
||||
// Instruct the NNs to honor this request even if they're
|
||||
// configured for manual failover.
|
||||
requestSource = RequestSource.REQUEST_BY_USER_FORCED;
|
||||
}
|
||||
|
||||
if ("-transitionToActive".equals(cmd)) {
|
||||
return transitionToActive(cmdLine);
|
||||
} else if ("-transitionToStandby".equals(cmd)) {
|
||||
return transitionToStandby(cmdLine);
|
||||
} else if ("-failover".equals(cmd)) {
|
||||
return failover(cmdLine);
|
||||
} else if ("-getServiceState".equals(cmd)) {
|
||||
return getServiceState(cmdLine);
|
||||
} else if ("-checkHealth".equals(cmd)) {
|
||||
return checkHealth(cmdLine);
|
||||
} else if ("-help".equals(cmd)) {
|
||||
return help(argv);
|
||||
} else {
|
||||
// we already checked command validity above, so getting here
|
||||
// would be a coding error
|
||||
throw new AssertionError("Should not get here, command: " + cmd);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean confirmForceManual() throws IOException {
|
||||
return ToolRunner.confirmPrompt(
|
||||
"You have specified the " + FORCEMANUAL + " flag. This flag is " +
|
||||
"dangerous, as it can induce a split-brain scenario that WILL " +
|
||||
"CORRUPT your HDFS namespace, possibly irrecoverably.\n" +
|
||||
"\n" +
|
||||
"It is recommended not to use this flag, but instead to shut down the " +
|
||||
"cluster and disable automatic failover if you prefer to manually " +
|
||||
"manage your HA state.\n" +
|
||||
"\n" +
|
||||
"You may abort safely by answering 'n' or hitting ^C now.\n" +
|
||||
"\n" +
|
||||
"Are you sure you want to continue?");
|
||||
}
|
||||
|
||||
/**
|
||||
* Add CLI options which are specific to the failover command and no
|
||||
* others.
|
||||
*/
|
||||
private void addFailoverCliOpts(Options failoverOpts) {
|
||||
failoverOpts.addOption(FORCEFENCE, false, "force fencing");
|
||||
failoverOpts.addOption(FORCEACTIVE, false, "force failover");
|
||||
// Don't add FORCEMANUAL, since that's added separately for all commands
|
||||
// that change state.
|
||||
}
|
||||
|
||||
private CommandLine parseOpts(String cmdName, Options opts, String[] argv) {
|
||||
try {
|
||||
// Strip off the first arg, since that's just the command name
|
||||
argv = Arrays.copyOfRange(argv, 1, argv.length);
|
||||
return new GnuParser().parse(opts, argv);
|
||||
} catch (ParseException pe) {
|
||||
errOut.println(cmdName.substring(1) +
|
||||
": incorrect arguments");
|
||||
printUsage(errOut, cmdName);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private int help(String[] argv) {
|
||||
|
|
|
@ -61,6 +61,31 @@ public interface HAServiceProtocol {
|
|||
}
|
||||
}
|
||||
|
||||
public static enum RequestSource {
|
||||
REQUEST_BY_USER,
|
||||
REQUEST_BY_USER_FORCED,
|
||||
REQUEST_BY_ZKFC;
|
||||
}
|
||||
|
||||
/**
|
||||
* Information describing the source for a request to change state.
|
||||
* This is used to differentiate requests from automatic vs CLI
|
||||
* failover controllers, and in the future may include epoch
|
||||
* information.
|
||||
*/
|
||||
public static class StateChangeRequestInfo {
|
||||
private final RequestSource source;
|
||||
|
||||
public StateChangeRequestInfo(RequestSource source) {
|
||||
super();
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
public RequestSource getSource() {
|
||||
return source;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Monitor the health of service. This periodically called by the HA
|
||||
* frameworks to monitor the health of the service.
|
||||
|
@ -95,7 +120,8 @@ public interface HAServiceProtocol {
|
|||
* @throws IOException
|
||||
* if other errors happen
|
||||
*/
|
||||
public void transitionToActive() throws ServiceFailedException,
|
||||
public void transitionToActive(StateChangeRequestInfo reqInfo)
|
||||
throws ServiceFailedException,
|
||||
AccessControlException,
|
||||
IOException;
|
||||
|
||||
|
@ -110,7 +136,8 @@ public interface HAServiceProtocol {
|
|||
* @throws IOException
|
||||
* if other errors happen
|
||||
*/
|
||||
public void transitionToStandby() throws ServiceFailedException,
|
||||
public void transitionToStandby(StateChangeRequestInfo reqInfo)
|
||||
throws ServiceFailedException,
|
||||
AccessControlException,
|
||||
IOException;
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException;
|
|||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
/**
|
||||
|
@ -30,7 +31,8 @@ import org.apache.hadoop.ipc.RemoteException;
|
|||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class HAServiceProtocolHelper {
|
||||
public static void monitorHealth(HAServiceProtocol svc)
|
||||
public static void monitorHealth(HAServiceProtocol svc,
|
||||
StateChangeRequestInfo reqInfo)
|
||||
throws IOException {
|
||||
try {
|
||||
svc.monitorHealth();
|
||||
|
@ -39,19 +41,21 @@ public class HAServiceProtocolHelper {
|
|||
}
|
||||
}
|
||||
|
||||
public static void transitionToActive(HAServiceProtocol svc)
|
||||
public static void transitionToActive(HAServiceProtocol svc,
|
||||
StateChangeRequestInfo reqInfo)
|
||||
throws IOException {
|
||||
try {
|
||||
svc.transitionToActive();
|
||||
svc.transitionToActive(reqInfo);
|
||||
} catch (RemoteException e) {
|
||||
throw e.unwrapRemoteException(ServiceFailedException.class);
|
||||
}
|
||||
}
|
||||
|
||||
public static void transitionToStandby(HAServiceProtocol svc)
|
||||
public static void transitionToStandby(HAServiceProtocol svc,
|
||||
StateChangeRequestInfo reqInfo)
|
||||
throws IOException {
|
||||
try {
|
||||
svc.transitionToStandby();
|
||||
svc.transitionToStandby(reqInfo);
|
||||
} catch (RemoteException e) {
|
||||
throw e.unwrapRemoteException(ServiceFailedException.class);
|
||||
}
|
||||
|
|
|
@ -99,4 +99,11 @@ public abstract class HAServiceTarget {
|
|||
ret.put(HOST_SUBST_KEY, getAddress().getHostName());
|
||||
ret.put(PORT_SUBST_KEY, String.valueOf(getAddress().getPort()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if auto failover should be considered enabled
|
||||
*/
|
||||
public boolean isAutoFailoverEnabled() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -28,6 +28,8 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
|
||||
import org.apache.hadoop.ha.HealthMonitor.State;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
@ -72,6 +74,8 @@ public abstract class ZKFailoverController implements Tool {
|
|||
static final int ERR_CODE_NO_PARENT_ZNODE = 3;
|
||||
/** Fencing is not properly configured */
|
||||
static final int ERR_CODE_NO_FENCER = 4;
|
||||
/** Automatic failover is not enabled */
|
||||
static final int ERR_CODE_AUTO_FAILOVER_NOT_ENABLED = 5;
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
|
@ -112,6 +116,12 @@ public abstract class ZKFailoverController implements Tool {
|
|||
|
||||
@Override
|
||||
public int run(final String[] args) throws Exception {
|
||||
if (!localTarget.isAutoFailoverEnabled()) {
|
||||
LOG.fatal("Automatic failover is not enabled for " + localTarget + "." +
|
||||
" Please ensure that automatic failover is enabled in the " +
|
||||
"configuration before running the ZK failover controller.");
|
||||
return ERR_CODE_AUTO_FAILOVER_NOT_ENABLED;
|
||||
}
|
||||
loginAsFCUser();
|
||||
try {
|
||||
return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
|
||||
|
@ -300,7 +310,8 @@ public abstract class ZKFailoverController implements Tool {
|
|||
LOG.info("Trying to make " + localTarget + " active...");
|
||||
try {
|
||||
HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
|
||||
conf, FailoverController.getRpcTimeoutToNewActive(conf)));
|
||||
conf, FailoverController.getRpcTimeoutToNewActive(conf)),
|
||||
createReqInfo());
|
||||
LOG.info("Successfully transitioned " + localTarget +
|
||||
" to active state");
|
||||
} catch (Throwable t) {
|
||||
|
@ -323,12 +334,16 @@ public abstract class ZKFailoverController implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
private StateChangeRequestInfo createReqInfo() {
|
||||
return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC);
|
||||
}
|
||||
|
||||
private synchronized void becomeStandby() {
|
||||
LOG.info("ZK Election indicated that " + localTarget +
|
||||
" should become standby");
|
||||
try {
|
||||
int timeout = FailoverController.getGracefulFenceTimeout(conf);
|
||||
localTarget.getProxy(conf, timeout).transitionToStandby();
|
||||
localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
|
||||
LOG.info("Successfully transitioned " + localTarget +
|
||||
" to standby state");
|
||||
} catch (Exception e) {
|
||||
|
@ -381,8 +396,8 @@ public abstract class ZKFailoverController implements Tool {
|
|||
HAServiceTarget target = dataToTarget(data);
|
||||
|
||||
LOG.info("Should fence: " + target);
|
||||
boolean gracefulWorked = new FailoverController(conf)
|
||||
.tryGracefulFence(target);
|
||||
boolean gracefulWorked = new FailoverController(conf,
|
||||
RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
|
||||
if (gracefulWorked) {
|
||||
// It's possible that it's in standby but just about to go into active,
|
||||
// no? Is there some race here?
|
||||
|
|
|
@ -30,13 +30,14 @@ import org.apache.hadoop.ha.HAServiceProtocol;
|
|||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusRequestProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusResponseProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAStateChangeRequestInfoProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HARequestSource;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToActiveRequestProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyRequestProto;
|
||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.ProtocolSignature;
|
||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -57,10 +58,6 @@ public class HAServiceProtocolClientSideTranslatorPB implements
|
|||
private final static RpcController NULL_CONTROLLER = null;
|
||||
private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ =
|
||||
MonitorHealthRequestProto.newBuilder().build();
|
||||
private final static TransitionToActiveRequestProto TRANSITION_TO_ACTIVE_REQ =
|
||||
TransitionToActiveRequestProto.newBuilder().build();
|
||||
private final static TransitionToStandbyRequestProto TRANSITION_TO_STANDBY_REQ =
|
||||
TransitionToStandbyRequestProto.newBuilder().build();
|
||||
private final static GetServiceStatusRequestProto GET_SERVICE_STATUS_REQ =
|
||||
GetServiceStatusRequestProto.newBuilder().build();
|
||||
|
||||
|
@ -94,18 +91,25 @@ public class HAServiceProtocolClientSideTranslatorPB implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public void transitionToActive() throws IOException {
|
||||
public void transitionToActive(StateChangeRequestInfo reqInfo) throws IOException {
|
||||
try {
|
||||
rpcProxy.transitionToActive(NULL_CONTROLLER, TRANSITION_TO_ACTIVE_REQ);
|
||||
TransitionToActiveRequestProto req =
|
||||
TransitionToActiveRequestProto.newBuilder()
|
||||
.setReqInfo(convert(reqInfo)).build();
|
||||
|
||||
rpcProxy.transitionToActive(NULL_CONTROLLER, req);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transitionToStandby() throws IOException {
|
||||
public void transitionToStandby(StateChangeRequestInfo reqInfo) throws IOException {
|
||||
try {
|
||||
rpcProxy.transitionToStandby(NULL_CONTROLLER, TRANSITION_TO_STANDBY_REQ);
|
||||
TransitionToStandbyRequestProto req =
|
||||
TransitionToStandbyRequestProto.newBuilder()
|
||||
.setReqInfo(convert(reqInfo)).build();
|
||||
rpcProxy.transitionToStandby(NULL_CONTROLLER, req);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
|
@ -143,6 +147,27 @@ public class HAServiceProtocolClientSideTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
private HAStateChangeRequestInfoProto convert(StateChangeRequestInfo reqInfo) {
|
||||
HARequestSource src;
|
||||
switch (reqInfo.getSource()) {
|
||||
case REQUEST_BY_USER:
|
||||
src = HARequestSource.REQUEST_BY_USER;
|
||||
break;
|
||||
case REQUEST_BY_USER_FORCED:
|
||||
src = HARequestSource.REQUEST_BY_USER_FORCED;
|
||||
break;
|
||||
case REQUEST_BY_ZKFC:
|
||||
src = HARequestSource.REQUEST_BY_ZKFC;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Bad source: " + reqInfo.getSource());
|
||||
}
|
||||
return HAStateChangeRequestInfoProto.newBuilder()
|
||||
.setReqSource(src)
|
||||
.build();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
RPC.stopProxy(rpcProxy);
|
||||
|
|
|
@ -19,12 +19,17 @@ package org.apache.hadoop.ha.protocolPB;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusRequestProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.GetServiceStatusResponseProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAStateChangeRequestInfoProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthRequestProto;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.MonitorHealthResponseProto;
|
||||
|
@ -56,6 +61,8 @@ public class HAServiceProtocolServerSideTranslatorPB implements
|
|||
TransitionToActiveResponseProto.newBuilder().build();
|
||||
private static final TransitionToStandbyResponseProto TRANSITION_TO_STANDBY_RESP =
|
||||
TransitionToStandbyResponseProto.newBuilder().build();
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
HAServiceProtocolServerSideTranslatorPB.class);
|
||||
|
||||
public HAServiceProtocolServerSideTranslatorPB(HAServiceProtocol server) {
|
||||
this.server = server;
|
||||
|
@ -72,12 +79,32 @@ public class HAServiceProtocolServerSideTranslatorPB implements
|
|||
}
|
||||
}
|
||||
|
||||
private StateChangeRequestInfo convert(HAStateChangeRequestInfoProto proto) {
|
||||
RequestSource src;
|
||||
switch (proto.getReqSource()) {
|
||||
case REQUEST_BY_USER:
|
||||
src = RequestSource.REQUEST_BY_USER;
|
||||
break;
|
||||
case REQUEST_BY_USER_FORCED:
|
||||
src = RequestSource.REQUEST_BY_USER_FORCED;
|
||||
break;
|
||||
case REQUEST_BY_ZKFC:
|
||||
src = RequestSource.REQUEST_BY_ZKFC;
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Unknown request source: " + proto.getReqSource());
|
||||
src = null;
|
||||
}
|
||||
|
||||
return new StateChangeRequestInfo(src);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TransitionToActiveResponseProto transitionToActive(
|
||||
RpcController controller, TransitionToActiveRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
server.transitionToActive();
|
||||
server.transitionToActive(convert(request.getReqInfo()));
|
||||
return TRANSITION_TO_ACTIVE_RESP;
|
||||
} catch(IOException e) {
|
||||
throw new ServiceException(e);
|
||||
|
@ -89,7 +116,7 @@ public class HAServiceProtocolServerSideTranslatorPB implements
|
|||
RpcController controller, TransitionToStandbyRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
server.transitionToStandby();
|
||||
server.transitionToStandby(convert(request.getReqInfo()));
|
||||
return TRANSITION_TO_STANDBY_RESP;
|
||||
} catch(IOException e) {
|
||||
throw new ServiceException(e);
|
||||
|
|
|
@ -27,6 +27,16 @@ enum HAServiceStateProto {
|
|||
STANDBY = 2;
|
||||
}
|
||||
|
||||
enum HARequestSource {
|
||||
REQUEST_BY_USER = 0;
|
||||
REQUEST_BY_USER_FORCED = 1;
|
||||
REQUEST_BY_ZKFC = 2;
|
||||
}
|
||||
|
||||
message HAStateChangeRequestInfoProto {
|
||||
required HARequestSource reqSource = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* void request
|
||||
*/
|
||||
|
@ -43,6 +53,7 @@ message MonitorHealthResponseProto {
|
|||
* void request
|
||||
*/
|
||||
message TransitionToActiveRequestProto {
|
||||
required HAStateChangeRequestInfoProto reqInfo = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -55,6 +66,7 @@ message TransitionToActiveResponseProto {
|
|||
* void request
|
||||
*/
|
||||
message TransitionToStandbyRequestProto {
|
||||
required HAStateChangeRequestInfoProto reqInfo = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -97,6 +97,11 @@ class DummyHAService extends HAServiceTarget {
|
|||
public void checkFencingConfigured() throws BadFencingConfigurationException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoFailoverEnabled() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DummyHAService #" + index;
|
||||
|
@ -118,7 +123,7 @@ class DummyHAService extends HAServiceTarget {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void transitionToActive() throws ServiceFailedException,
|
||||
public void transitionToActive(StateChangeRequestInfo req) throws ServiceFailedException,
|
||||
AccessControlException, IOException {
|
||||
checkUnreachable();
|
||||
if (failToBecomeActive) {
|
||||
|
@ -131,7 +136,7 @@ class DummyHAService extends HAServiceTarget {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void transitionToStandby() throws ServiceFailedException,
|
||||
public void transitionToStandby(StateChangeRequestInfo req) throws ServiceFailedException,
|
||||
AccessControlException, IOException {
|
||||
checkUnreachable();
|
||||
if (sharedResource != null) {
|
||||
|
|
|
@ -27,6 +27,8 @@ import static org.mockito.Mockito.verify;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
|
||||
import org.apache.hadoop.ha.TestNodeFencer.AlwaysFailFencer;
|
||||
import static org.apache.hadoop.ha.TestNodeFencer.setupFencer;
|
||||
|
@ -117,7 +119,8 @@ public class TestFailoverController {
|
|||
public void testFailoverToUnreadyService() throws Exception {
|
||||
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
|
||||
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
|
||||
Mockito.doReturn(STATE_NOT_READY).when(svc2.proxy).getServiceStatus();
|
||||
Mockito.doReturn(STATE_NOT_READY).when(svc2.proxy)
|
||||
.getServiceStatus();
|
||||
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
|
||||
|
||||
try {
|
||||
|
@ -161,7 +164,7 @@ public class TestFailoverController {
|
|||
public void testFailoverFromFaultyServiceSucceeds() throws Exception {
|
||||
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
|
||||
Mockito.doThrow(new ServiceFailedException("Failed!"))
|
||||
.when(svc1.proxy).transitionToStandby();
|
||||
.when(svc1.proxy).transitionToStandby(anyReqInfo());
|
||||
|
||||
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
|
||||
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
|
||||
|
@ -184,7 +187,7 @@ public class TestFailoverController {
|
|||
public void testFailoverFromFaultyServiceFencingFailure() throws Exception {
|
||||
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
|
||||
Mockito.doThrow(new ServiceFailedException("Failed!"))
|
||||
.when(svc1.proxy).transitionToStandby();
|
||||
.when(svc1.proxy).transitionToStandby(anyReqInfo());
|
||||
|
||||
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
|
||||
svc1.fencer = svc2.fencer = setupFencer(AlwaysFailFencer.class.getName());
|
||||
|
@ -283,7 +286,7 @@ public class TestFailoverController {
|
|||
DummyHAService svc1 = spy(new DummyHAService(HAServiceState.ACTIVE, svc1Addr));
|
||||
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
|
||||
Mockito.doThrow(new ServiceFailedException("Failed!"))
|
||||
.when(svc2.proxy).transitionToActive();
|
||||
.when(svc2.proxy).transitionToActive(anyReqInfo());
|
||||
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
|
||||
|
||||
try {
|
||||
|
@ -294,8 +297,8 @@ public class TestFailoverController {
|
|||
}
|
||||
|
||||
// svc1 went standby then back to active
|
||||
verify(svc1.proxy).transitionToStandby();
|
||||
verify(svc1.proxy).transitionToActive();
|
||||
verify(svc1.proxy).transitionToStandby(anyReqInfo());
|
||||
verify(svc1.proxy).transitionToActive(anyReqInfo());
|
||||
assertEquals(HAServiceState.ACTIVE, svc1.state);
|
||||
assertEquals(HAServiceState.STANDBY, svc2.state);
|
||||
}
|
||||
|
@ -305,7 +308,7 @@ public class TestFailoverController {
|
|||
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
|
||||
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
|
||||
Mockito.doThrow(new ServiceFailedException("Failed!"))
|
||||
.when(svc2.proxy).transitionToActive();
|
||||
.when(svc2.proxy).transitionToActive(anyReqInfo());
|
||||
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
|
||||
|
||||
try {
|
||||
|
@ -326,7 +329,7 @@ public class TestFailoverController {
|
|||
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
|
||||
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
|
||||
Mockito.doThrow(new ServiceFailedException("Failed!"))
|
||||
.when(svc2.proxy).transitionToActive();
|
||||
.when(svc2.proxy).transitionToActive(anyReqInfo());
|
||||
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
|
||||
AlwaysSucceedFencer.fenceCalled = 0;
|
||||
|
||||
|
@ -345,12 +348,16 @@ public class TestFailoverController {
|
|||
assertSame(svc2, AlwaysSucceedFencer.fencedSvc);
|
||||
}
|
||||
|
||||
private StateChangeRequestInfo anyReqInfo() {
|
||||
return Mockito.<StateChangeRequestInfo>any();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureToFenceOnFailbackFailsTheFailback() throws Exception {
|
||||
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
|
||||
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
|
||||
Mockito.doThrow(new IOException("Failed!"))
|
||||
.when(svc2.proxy).transitionToActive();
|
||||
.when(svc2.proxy).transitionToActive(anyReqInfo());
|
||||
svc1.fencer = svc2.fencer = setupFencer(AlwaysFailFencer.class.getName());
|
||||
AlwaysFailFencer.fenceCalled = 0;
|
||||
|
||||
|
@ -373,10 +380,10 @@ public class TestFailoverController {
|
|||
public void testFailbackToFaultyServiceFails() throws Exception {
|
||||
DummyHAService svc1 = new DummyHAService(HAServiceState.ACTIVE, svc1Addr);
|
||||
Mockito.doThrow(new ServiceFailedException("Failed!"))
|
||||
.when(svc1.proxy).transitionToActive();
|
||||
.when(svc1.proxy).transitionToActive(anyReqInfo());
|
||||
DummyHAService svc2 = new DummyHAService(HAServiceState.STANDBY, svc2Addr);
|
||||
Mockito.doThrow(new ServiceFailedException("Failed!"))
|
||||
.when(svc2.proxy).transitionToActive();
|
||||
.when(svc2.proxy).transitionToActive(anyReqInfo());
|
||||
|
||||
svc1.fencer = svc2.fencer = setupFencer(AlwaysSucceedFencer.class.getName());
|
||||
|
||||
|
@ -419,7 +426,8 @@ public class TestFailoverController {
|
|||
|
||||
private void doFailover(HAServiceTarget tgt1, HAServiceTarget tgt2,
|
||||
boolean forceFence, boolean forceActive) throws FailoverFailedException {
|
||||
FailoverController fc = new FailoverController(conf);
|
||||
FailoverController fc = new FailoverController(conf,
|
||||
RequestSource.REQUEST_BY_USER);
|
||||
fc.failover(tgt1, tgt2, forceFence, forceActive);
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.security.NoSuchAlgorithmException;
|
|||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HealthMonitor.State;
|
||||
import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
|
||||
import org.apache.log4j.Level;
|
||||
|
@ -128,6 +129,22 @@ public class TestZKFailoverController extends ClientBaseWithFixes {
|
|||
runFC(svc, "-formatZK", "-nonInteractive"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that automatic failover won't run against a target that hasn't
|
||||
* explicitly enabled the feature.
|
||||
*/
|
||||
@Test(timeout=10000)
|
||||
public void testWontRunWhenAutoFailoverDisabled() throws Exception {
|
||||
DummyHAService svc = cluster.getService(1);
|
||||
svc = Mockito.spy(svc);
|
||||
Mockito.doReturn(false).when(svc).isAutoFailoverEnabled();
|
||||
|
||||
assertEquals(ZKFailoverController.ERR_CODE_AUTO_FAILOVER_NOT_ENABLED,
|
||||
runFC(svc, "-formatZK"));
|
||||
assertEquals(ZKFailoverController.ERR_CODE_AUTO_FAILOVER_NOT_ENABLED,
|
||||
runFC(svc));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, if ACLs are specified in the configuration, that
|
||||
* it sets the ACLs when formatting the parent node.
|
||||
|
@ -279,7 +296,7 @@ public class TestZKFailoverController extends ClientBaseWithFixes {
|
|||
|
||||
|
||||
Mockito.verify(svc1.proxy, Mockito.timeout(2000).atLeastOnce())
|
||||
.transitionToActive();
|
||||
.transitionToActive(Mockito.<StateChangeRequestInfo>any());
|
||||
|
||||
cluster.waitForHAState(0, HAServiceState.STANDBY);
|
||||
cluster.waitForHAState(1, HAServiceState.STANDBY);
|
||||
|
|
|
@ -85,4 +85,15 @@ if [ -n "$SECONDARY_NAMENODES" ]; then
|
|||
--script "$bin/hdfs" start secondarynamenode
|
||||
fi
|
||||
|
||||
#---------------------------------------------------------
|
||||
# ZK Failover controllers, if auto-HA is enabled
|
||||
AUTOHA_ENABLED=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.ha.automatic-failover.enabled)
|
||||
if [ "$(echo "$AUTOHA_ENABLED" | tr A-Z a-z)" = "true" ]; then
|
||||
echo "Starting ZK Failover Controllers on NN hosts [$NAMENODES]"
|
||||
"$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
|
||||
--config "$HADOOP_CONF_DIR" \
|
||||
--hostnames "$NAMENODES" \
|
||||
--script "$bin/hdfs" start zkfc
|
||||
fi
|
||||
|
||||
# eof
|
||||
|
|
|
@ -346,4 +346,6 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||
public static final String DFS_HA_TAILEDITS_PERIOD_KEY = "dfs.ha.tail-edits.period";
|
||||
public static final int DFS_HA_TAILEDITS_PERIOD_DEFAULT = 60; // 1m
|
||||
public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods";
|
||||
public static final String DFS_HA_AUTO_FAILOVER_ENABLED_KEY = "dfs.ha.automatic-failover.enabled";
|
||||
public static final boolean DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT = false;
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
|
@ -41,7 +42,6 @@ import org.apache.hadoop.fs.FileUtil;
|
|||
import org.apache.hadoop.fs.Trash;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
||||
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
|
@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
|
@ -136,17 +137,25 @@ public class NameNode {
|
|||
}
|
||||
|
||||
/**
|
||||
* HDFS federation configuration can have two types of parameters:
|
||||
* HDFS configuration can have three types of parameters:
|
||||
* <ol>
|
||||
* <li>Parameter that is common for all the name services in the cluster.</li>
|
||||
* <li>Parameters that are specific to a name service. This keys are suffixed
|
||||
* <li>Parameters that are common for all the name services in the cluster.</li>
|
||||
* <li>Parameters that are specific to a name service. These keys are suffixed
|
||||
* with nameserviceId in the configuration. For example,
|
||||
* "dfs.namenode.rpc-address.nameservice1".</li>
|
||||
* <li>Parameters that are specific to a single name node. These keys are suffixed
|
||||
* with nameserviceId and namenodeId in the configuration. for example,
|
||||
* "dfs.namenode.rpc-address.nameservice1.namenode1"</li>
|
||||
* </ol>
|
||||
*
|
||||
* Following are nameservice specific keys.
|
||||
* In the latter cases, operators may specify the configuration without
|
||||
* any suffix, with a nameservice suffix, or with a nameservice and namenode
|
||||
* suffix. The more specific suffix will take precedence.
|
||||
*
|
||||
* These keys are specific to a given namenode, and thus may be configured
|
||||
* globally, for a nameservice, or for a specific namenode within a nameservice.
|
||||
*/
|
||||
public static final String[] NAMESERVICE_SPECIFIC_KEYS = {
|
||||
public static final String[] NAMENODE_SPECIFIC_KEYS = {
|
||||
DFS_NAMENODE_RPC_ADDRESS_KEY,
|
||||
DFS_NAMENODE_NAME_DIR_KEY,
|
||||
DFS_NAMENODE_EDITS_DIR_KEY,
|
||||
|
@ -166,6 +175,15 @@ public class NameNode {
|
|||
DFS_HA_FENCE_METHODS_KEY
|
||||
};
|
||||
|
||||
/**
|
||||
* @see #NAMENODE_SPECIFIC_KEYS
|
||||
* These keys are specific to a nameservice, but may not be overridden
|
||||
* for a specific namenode.
|
||||
*/
|
||||
public static final String[] NAMESERVICE_SPECIFIC_KEYS = {
|
||||
DFS_HA_AUTO_FAILOVER_ENABLED_KEY
|
||||
};
|
||||
|
||||
public long getProtocolVersion(String protocol,
|
||||
long clientVersion) throws IOException {
|
||||
if (protocol.equals(ClientProtocol.class.getName())) {
|
||||
|
@ -1012,7 +1030,10 @@ public class NameNode {
|
|||
}
|
||||
|
||||
DFSUtil.setGenericConf(conf, nameserviceId, namenodeId,
|
||||
NAMENODE_SPECIFIC_KEYS);
|
||||
DFSUtil.setGenericConf(conf, nameserviceId, null,
|
||||
NAMESERVICE_SPECIFIC_KEYS);
|
||||
|
||||
if (conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY) != null) {
|
||||
URI defaultUri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://"
|
||||
+ conf.get(DFS_NAMENODE_RPC_ADDRESS_KEY));
|
||||
|
@ -1185,4 +1206,43 @@ public class NameNode {
|
|||
public boolean isStandbyState() {
|
||||
return (state.equals(STANDBY_STATE));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check that a request to change this node's HA state is valid.
|
||||
* In particular, verifies that, if auto failover is enabled, non-forced
|
||||
* requests from the HAAdmin CLI are rejected, and vice versa.
|
||||
*
|
||||
* @param req the request to check
|
||||
* @throws AccessControlException if the request is disallowed
|
||||
*/
|
||||
void checkHaStateChange(StateChangeRequestInfo req)
|
||||
throws AccessControlException {
|
||||
boolean autoHaEnabled = conf.getBoolean(DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
|
||||
DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
|
||||
switch (req.getSource()) {
|
||||
case REQUEST_BY_USER:
|
||||
if (autoHaEnabled) {
|
||||
throw new AccessControlException(
|
||||
"Manual HA control for this NameNode is disallowed, because " +
|
||||
"automatic HA is enabled.");
|
||||
}
|
||||
break;
|
||||
case REQUEST_BY_USER_FORCED:
|
||||
if (autoHaEnabled) {
|
||||
LOG.warn("Allowing manual HA control from " +
|
||||
Server.getRemoteAddress() +
|
||||
" even though automatic HA is enabled, because the user " +
|
||||
"specified the force flag");
|
||||
}
|
||||
break;
|
||||
case REQUEST_BY_ZKFC:
|
||||
if (!autoHaEnabled) {
|
||||
throw new AccessControlException(
|
||||
"Request from ZK failover controller at " +
|
||||
Server.getRemoteAddress() + " denied since automatic HA " +
|
||||
"is not enabled");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -966,14 +966,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
|||
}
|
||||
|
||||
@Override // HAServiceProtocol
|
||||
public synchronized void transitionToActive()
|
||||
public synchronized void transitionToActive(StateChangeRequestInfo req)
|
||||
throws ServiceFailedException, AccessControlException {
|
||||
nn.checkHaStateChange(req);
|
||||
nn.transitionToActive();
|
||||
}
|
||||
|
||||
@Override // HAServiceProtocol
|
||||
public synchronized void transitionToStandby()
|
||||
public synchronized void transitionToStandby(StateChangeRequestInfo req)
|
||||
throws ServiceFailedException, AccessControlException {
|
||||
nn.checkHaStateChange(req);
|
||||
nn.transitionToStandby();
|
||||
}
|
||||
|
||||
|
|
|
@ -49,6 +49,7 @@ public class NNHAServiceTarget extends HAServiceTarget {
|
|||
private BadFencingConfigurationException fenceConfigError;
|
||||
private final String nnId;
|
||||
private final String nsId;
|
||||
private final boolean autoFailoverEnabled;
|
||||
|
||||
public NNHAServiceTarget(Configuration conf,
|
||||
String nsId, String nnId) {
|
||||
|
@ -84,6 +85,9 @@ public class NNHAServiceTarget extends HAServiceTarget {
|
|||
}
|
||||
this.nnId = nnId;
|
||||
this.nsId = nsId;
|
||||
this.autoFailoverEnabled = targetConf.getBoolean(
|
||||
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY,
|
||||
DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_DEFAULT);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -130,4 +134,9 @@ public class NNHAServiceTarget extends HAServiceTarget {
|
|||
ret.put(NAMESERVICE_ID_KEY, getNameServiceId());
|
||||
ret.put(NAMENODE_ID_KEY, getNameNodeId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAutoFailoverEnabled() {
|
||||
return autoFailoverEnabled;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,8 +67,10 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceProtocolHelper;
|
||||
import org.apache.hadoop.ha.ServiceFailedException;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology.NNConf;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
@ -1650,12 +1652,14 @@ public class MiniDFSCluster {
|
|||
|
||||
public void transitionToActive(int nnIndex) throws IOException,
|
||||
ServiceFailedException {
|
||||
HAServiceProtocolHelper.transitionToActive(getHaServiceClient(nnIndex));
|
||||
HAServiceProtocolHelper.transitionToActive(getHaServiceClient(nnIndex),
|
||||
new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED));
|
||||
}
|
||||
|
||||
public void transitionToStandby(int nnIndex) throws IOException,
|
||||
ServiceFailedException {
|
||||
HAServiceProtocolHelper.transitionToStandby(getHaServiceClient(nnIndex));
|
||||
HAServiceProtocolHelper.transitionToStandby(getHaServiceClient(nnIndex),
|
||||
new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER_FORCED));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -274,7 +274,7 @@ public class TestDFSUtil {
|
|||
conf.set(DFS_FEDERATION_NAMESERVICE_ID, nsId);
|
||||
|
||||
// Set the nameservice specific keys with nameserviceId in the config key
|
||||
for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
|
||||
for (String key : NameNode.NAMENODE_SPECIFIC_KEYS) {
|
||||
// Note: value is same as the key
|
||||
conf.set(DFSUtil.addKeySuffixes(key, nsId), key);
|
||||
}
|
||||
|
@ -284,7 +284,7 @@ public class TestDFSUtil {
|
|||
|
||||
// Retrieve the keys without nameserviceId and Ensure generic keys are set
|
||||
// to the correct value
|
||||
for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
|
||||
for (String key : NameNode.NAMENODE_SPECIFIC_KEYS) {
|
||||
assertEquals(key, conf.get(key));
|
||||
}
|
||||
}
|
||||
|
@ -304,7 +304,7 @@ public class TestDFSUtil {
|
|||
conf.set(DFS_HA_NAMENODES_KEY_PREFIX + "." + nsId, nnId);
|
||||
|
||||
// Set the nameservice specific keys with nameserviceId in the config key
|
||||
for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
|
||||
for (String key : NameNode.NAMENODE_SPECIFIC_KEYS) {
|
||||
// Note: value is same as the key
|
||||
conf.set(DFSUtil.addKeySuffixes(key, nsId, nnId), key);
|
||||
}
|
||||
|
@ -314,7 +314,7 @@ public class TestDFSUtil {
|
|||
|
||||
// Retrieve the keys without nameserviceId and Ensure generic keys are set
|
||||
// to the correct value
|
||||
for (String key : NameNode.NAMESERVICE_SPECIFIC_KEYS) {
|
||||
for (String key : NameNode.NAMENODE_SPECIFIC_KEYS) {
|
||||
assertEquals(key, conf.get(key));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
||||
import org.apache.hadoop.ha.NodeFencer;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.ZKFailoverController;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.TestNodeFencer.AlwaysSucceedFencer;
|
||||
|
@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
|
||||
import org.apache.hadoop.hdfs.tools.DFSZKFailoverController;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
|
||||
|
@ -59,6 +60,7 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes {
|
|||
conf.set(ZKFailoverController.ZK_QUORUM_KEY + ".ns1", hostPort);
|
||||
conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY,
|
||||
AlwaysSucceedFencer.class.getName());
|
||||
conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
|
||||
|
||||
MiniDFSNNTopology topology = new MiniDFSNNTopology()
|
||||
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
|
||||
|
@ -98,6 +100,18 @@ public class TestDFSZKFailoverController extends ClientBaseWithFixes {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, when automatic failover is enabled, the manual
|
||||
* failover script refuses to run.
|
||||
*/
|
||||
@Test(timeout=10000)
|
||||
public void testManualFailoverIsDisabled() throws Exception {
|
||||
DFSHAAdmin admin = new DFSHAAdmin();
|
||||
admin.setConf(conf);
|
||||
int rc = admin.run(new String[]{"-failover", "nn1", "nn2"});
|
||||
assertEquals(-1, rc);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that automatic failover is triggered by shutting the
|
||||
* active NN down.
|
||||
|
|
|
@ -71,7 +71,7 @@ public class TestEditLogsDuringFailover {
|
|||
// Set the first NN to active, make sure it creates edits
|
||||
// in its own dirs and the shared dir. The standby
|
||||
// should still have no edits!
|
||||
cluster.getNameNode(0).getRpcServer().transitionToActive();
|
||||
cluster.transitionToActive(0);
|
||||
|
||||
assertEditFiles(cluster.getNameDirs(0),
|
||||
NNStorage.getInProgressEditsFileName(1));
|
||||
|
@ -107,7 +107,7 @@ public class TestEditLogsDuringFailover {
|
|||
// If we restart NN0, it'll come back as standby, and we can
|
||||
// transition NN1 to active and make sure it reads edits correctly at this point.
|
||||
cluster.restartNameNode(0);
|
||||
cluster.getNameNode(1).getRpcServer().transitionToActive();
|
||||
cluster.transitionToActive(1);
|
||||
|
||||
// NN1 should have both the edits that came before its restart, and the edits that
|
||||
// came after its restart.
|
||||
|
@ -134,7 +134,7 @@ public class TestEditLogsDuringFailover {
|
|||
NNStorage.getInProgressEditsFileName(1));
|
||||
|
||||
// Transition one of the NNs to active
|
||||
cluster.getNameNode(0).getRpcServer().transitionToActive();
|
||||
cluster.transitionToActive(0);
|
||||
|
||||
// In the transition to active, it should have read the log -- and
|
||||
// hence see one of the dirs we made in the fake log.
|
||||
|
|
|
@ -129,7 +129,7 @@ public class TestHASafeMode {
|
|||
DFSTestUtil
|
||||
.createFile(fs, new Path("/test"), 3 * BLOCK_SIZE, (short) 3, 1L);
|
||||
restartActive();
|
||||
nn0.getRpcServer().transitionToActive();
|
||||
cluster.transitionToActive(0);
|
||||
|
||||
FSNamesystem namesystem = nn0.getNamesystem();
|
||||
String status = namesystem.getSafemode();
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
|
@ -71,6 +73,8 @@ public class TestHAStateTransitions {
|
|||
private static final String TEST_FILE_STR = TEST_FILE_PATH.toUri().getPath();
|
||||
private static final String TEST_FILE_DATA =
|
||||
"Hello state transitioning world";
|
||||
private static final StateChangeRequestInfo REQ_INFO = new StateChangeRequestInfo(
|
||||
RequestSource.REQUEST_BY_USER_FORCED);
|
||||
|
||||
static {
|
||||
((Log4JLogger)EditLogTailer.LOG).getLogger().setLevel(Level.ALL);
|
||||
|
@ -481,19 +485,19 @@ public class TestHAStateTransitions {
|
|||
assertFalse(isDTRunning(nn));
|
||||
|
||||
banner("Transition 1->3. Should not start secret manager.");
|
||||
nn.getRpcServer().transitionToActive();
|
||||
nn.getRpcServer().transitionToActive(REQ_INFO);
|
||||
assertFalse(nn.isStandbyState());
|
||||
assertTrue(nn.isInSafeMode());
|
||||
assertFalse(isDTRunning(nn));
|
||||
|
||||
banner("Transition 3->1. Should not start secret manager.");
|
||||
nn.getRpcServer().transitionToStandby();
|
||||
nn.getRpcServer().transitionToStandby(REQ_INFO);
|
||||
assertTrue(nn.isStandbyState());
|
||||
assertTrue(nn.isInSafeMode());
|
||||
assertFalse(isDTRunning(nn));
|
||||
|
||||
banner("Transition 1->3->4. Should start secret manager.");
|
||||
nn.getRpcServer().transitionToActive();
|
||||
nn.getRpcServer().transitionToActive(REQ_INFO);
|
||||
NameNodeAdapter.leaveSafeMode(nn, false);
|
||||
assertFalse(nn.isStandbyState());
|
||||
assertFalse(nn.isInSafeMode());
|
||||
|
@ -514,13 +518,13 @@ public class TestHAStateTransitions {
|
|||
for (int i = 0; i < 20; i++) {
|
||||
// Loop the last check to suss out races.
|
||||
banner("Transition 4->2. Should stop secret manager.");
|
||||
nn.getRpcServer().transitionToStandby();
|
||||
nn.getRpcServer().transitionToStandby(REQ_INFO);
|
||||
assertTrue(nn.isStandbyState());
|
||||
assertFalse(nn.isInSafeMode());
|
||||
assertFalse(isDTRunning(nn));
|
||||
|
||||
banner("Transition 2->4. Should start secret manager");
|
||||
nn.getRpcServer().transitionToActive();
|
||||
nn.getRpcServer().transitionToActive(REQ_INFO);
|
||||
assertFalse(nn.isStandbyState());
|
||||
assertFalse(nn.isInSafeMode());
|
||||
assertTrue(isDTRunning(nn));
|
||||
|
|
|
@ -22,6 +22,8 @@ import static org.junit.Assert.fail;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.tools;
|
|||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
|
@ -32,14 +33,17 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||
import org.apache.hadoop.ha.HAServiceStatus;
|
||||
import org.apache.hadoop.ha.HAServiceTarget;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.ha.NodeFencer;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.test.MockitoUtil;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.base.Charsets;
|
||||
|
@ -59,6 +63,9 @@ public class TestDFSHAAdmin {
|
|||
new HAServiceStatus(HAServiceState.STANDBY)
|
||||
.setReadyToBecomeActive();
|
||||
|
||||
private ArgumentCaptor<StateChangeRequestInfo> reqInfoCaptor =
|
||||
ArgumentCaptor.forClass(StateChangeRequestInfo.class);
|
||||
|
||||
private static String HOST_A = "1.2.3.1";
|
||||
private static String HOST_B = "1.2.3.2";
|
||||
|
||||
|
@ -139,13 +146,93 @@ public class TestDFSHAAdmin {
|
|||
@Test
|
||||
public void testTransitionToActive() throws Exception {
|
||||
assertEquals(0, runTool("-transitionToActive", "nn1"));
|
||||
Mockito.verify(mockProtocol).transitionToActive();
|
||||
Mockito.verify(mockProtocol).transitionToActive(
|
||||
reqInfoCaptor.capture());
|
||||
assertEquals(RequestSource.REQUEST_BY_USER,
|
||||
reqInfoCaptor.getValue().getSource());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, if automatic HA is enabled, none of the mutative operations
|
||||
* will succeed, unless the -forcemanual flag is specified.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testMutativeOperationsWithAutoHaEnabled() throws Exception {
|
||||
Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol).getServiceStatus();
|
||||
|
||||
// Turn on auto-HA in the config
|
||||
HdfsConfiguration conf = getHAConf();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
|
||||
conf.set(DFSConfigKeys.DFS_HA_FENCE_METHODS_KEY, "shell(true)");
|
||||
tool.setConf(conf);
|
||||
|
||||
// Should fail without the forcemanual flag
|
||||
assertEquals(-1, runTool("-transitionToActive", "nn1"));
|
||||
assertTrue(errOutput.contains("Refusing to manually manage"));
|
||||
assertEquals(-1, runTool("-transitionToStandby", "nn1"));
|
||||
assertTrue(errOutput.contains("Refusing to manually manage"));
|
||||
assertEquals(-1, runTool("-failover", "nn1", "nn2"));
|
||||
assertTrue(errOutput.contains("Refusing to manually manage"));
|
||||
|
||||
Mockito.verify(mockProtocol, Mockito.never())
|
||||
.transitionToActive(anyReqInfo());
|
||||
Mockito.verify(mockProtocol, Mockito.never())
|
||||
.transitionToStandby(anyReqInfo());
|
||||
|
||||
// Force flag should bypass the check and change the request source
|
||||
// for the RPC
|
||||
setupConfirmationOnSystemIn();
|
||||
assertEquals(0, runTool("-transitionToActive", "-forcemanual", "nn1"));
|
||||
setupConfirmationOnSystemIn();
|
||||
assertEquals(0, runTool("-transitionToStandby", "-forcemanual", "nn1"));
|
||||
setupConfirmationOnSystemIn();
|
||||
assertEquals(0, runTool("-failover", "-forcemanual", "nn1", "nn2"));
|
||||
|
||||
Mockito.verify(mockProtocol, Mockito.times(2)).transitionToActive(
|
||||
reqInfoCaptor.capture());
|
||||
Mockito.verify(mockProtocol, Mockito.times(2)).transitionToStandby(
|
||||
reqInfoCaptor.capture());
|
||||
|
||||
// All of the RPCs should have had the "force" source
|
||||
for (StateChangeRequestInfo ri : reqInfoCaptor.getAllValues()) {
|
||||
assertEquals(RequestSource.REQUEST_BY_USER_FORCED, ri.getSource());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup System.in with a stream that feeds a "yes" answer on the
|
||||
* next prompt.
|
||||
*/
|
||||
private static void setupConfirmationOnSystemIn() {
|
||||
// Answer "yes" to the prompt about transition to active
|
||||
System.setIn(new ByteArrayInputStream("yes\n".getBytes()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, even if automatic HA is enabled, the monitoring operations
|
||||
* still function correctly.
|
||||
*/
|
||||
@Test
|
||||
public void testMonitoringOperationsWithAutoHaEnabled() throws Exception {
|
||||
Mockito.doReturn(STANDBY_READY_RESULT).when(mockProtocol).getServiceStatus();
|
||||
|
||||
// Turn on auto-HA
|
||||
HdfsConfiguration conf = getHAConf();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_HA_AUTO_FAILOVER_ENABLED_KEY, true);
|
||||
tool.setConf(conf);
|
||||
|
||||
assertEquals(0, runTool("-checkHealth", "nn1"));
|
||||
Mockito.verify(mockProtocol).monitorHealth();
|
||||
|
||||
assertEquals(0, runTool("-getServiceState", "nn1"));
|
||||
Mockito.verify(mockProtocol).getServiceStatus();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTransitionToStandby() throws Exception {
|
||||
assertEquals(0, runTool("-transitionToStandby", "nn1"));
|
||||
Mockito.verify(mockProtocol).transitionToStandby();
|
||||
Mockito.verify(mockProtocol).transitionToStandby(anyReqInfo());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -283,4 +370,8 @@ public class TestDFSHAAdmin {
|
|||
LOG.info("Output:\n" + errOutput);
|
||||
return ret;
|
||||
}
|
||||
|
||||
private StateChangeRequestInfo anyReqInfo() {
|
||||
return Mockito.<StateChangeRequestInfo>any();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue