HADOOP-8206. Common portion of a ZK-based failover controller. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1305672 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-03-26 23:37:25 +00:00
parent 28c307519a
commit 1340df3b0b
10 changed files with 989 additions and 16 deletions

View File

@ -25,6 +25,8 @@ Release 0.23.3 - UNRELEASED
HADOOP-7030. Add TableMapping topology implementation to read host to rack HADOOP-7030. Add TableMapping topology implementation to read host to rack
mapping from a file. (Patrick Angeles and tomwhite via tomwhite) mapping from a file. (Patrick Angeles and tomwhite via tomwhite)
HADOOP-8206. Common portion of a ZK-based failover controller (todd)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7524. Change RPC to allow multiple protocols including multuple HADOOP-7524. Change RPC to allow multiple protocols including multuple

View File

@ -35,6 +35,7 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.AsyncCallback.*; import org.apache.zookeeper.AsyncCallback.*;
@ -135,11 +136,11 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
private static final int NUM_RETRIES = 3; private static final int NUM_RETRIES = 3;
private enum ConnectionState { private static enum ConnectionState {
DISCONNECTED, CONNECTED, TERMINATED DISCONNECTED, CONNECTED, TERMINATED
}; };
private enum State { static enum State {
INIT, ACTIVE, STANDBY, NEUTRAL INIT, ACTIVE, STANDBY, NEUTRAL
}; };
@ -283,6 +284,32 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
LOG.info("Successfully created " + znodeWorkingDir + " in ZK."); LOG.info("Successfully created " + znodeWorkingDir + " in ZK.");
} }
/**
* Clear all of the state held within the parent ZNode.
* This recursively deletes everything within the znode as well as the
* parent znode itself. It should only be used when it's certain that
* no electors are currently participating in the election.
*/
public synchronized void clearParentZNode()
throws IOException, InterruptedException {
try {
LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
zkDoWithRetries(new ZKAction<Void>() {
@Override
public Void run() throws KeeperException, InterruptedException {
ZKUtil.deleteRecursive(zkClient, znodeWorkingDir);
return null;
}
});
} catch (KeeperException e) {
throw new IOException("Couldn't clear parent znode " + znodeWorkingDir,
e);
}
LOG.info("Successfully deleted " + znodeWorkingDir + " from ZK.");
}
/** /**
* Any service instance can drop out of the election by calling quitElection. * Any service instance can drop out of the election by calling quitElection.
* <br/> * <br/>
@ -593,6 +620,11 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
return zkClient.getSessionId(); return zkClient.getSessionId();
} }
@VisibleForTesting
synchronized State getStateForTests() {
return state;
}
private boolean reEstablishSession() { private boolean reEstablishSession() {
int connectionRetryCount = 0; int connectionRetryCount = 0;
boolean success = false; boolean success = false;

View File

@ -24,7 +24,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ipc.RPC;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -40,6 +42,8 @@ public class FailoverController {
private static final Log LOG = LogFactory.getLog(FailoverController.class); private static final Log LOG = LogFactory.getLog(FailoverController.class);
private static final int GRACEFUL_FENCE_TIMEOUT = 5000;
/** /**
* Perform pre-failover checks on the given service we plan to * Perform pre-failover checks on the given service we plan to
* failover to, eg to prevent failing over to a service (eg due * failover to, eg to prevent failing over to a service (eg due
@ -97,6 +101,34 @@ public class FailoverController {
} }
} }
/**
* Try to get the HA state of the node at the given address. This
* function is guaranteed to be "quick" -- ie it has a short timeout
* and no retries. Its only purpose is to avoid fencing a node that
* has already restarted.
*/
static boolean tryGracefulFence(Configuration conf,
HAServiceTarget svc) {
HAServiceProtocol proxy = null;
try {
proxy = svc.getProxy(conf, GRACEFUL_FENCE_TIMEOUT);
proxy.transitionToStandby();
return true;
} catch (ServiceFailedException sfe) {
LOG.warn("Unable to gracefully make " + svc + " standby (" +
sfe.getMessage() + ")");
} catch (IOException ioe) {
LOG.warn("Unable to gracefully make " + svc +
" standby (unable to connect)", ioe);
} finally {
if (proxy != null) {
RPC.stopProxy(proxy);
}
}
return false;
}
/** /**
* Failover from service 1 to service 2. If the failover fails * Failover from service 1 to service 2. If the failover fails
* then try to failback. * then try to failback.
@ -118,16 +150,9 @@ public class FailoverController {
// Try to make fromSvc standby // Try to make fromSvc standby
boolean tryFence = true; boolean tryFence = true;
try {
HAServiceProtocolHelper.transitionToStandby(fromSvc.getProxy()); if (tryGracefulFence(new Configuration(), fromSvc)) {
// We should try to fence if we failed or it was forced tryFence = forceFence;
tryFence = forceFence ? true : false;
} catch (ServiceFailedException sfe) {
LOG.warn("Unable to make " + fromSvc + " standby (" +
sfe.getMessage() + ")");
} catch (IOException ioe) {
LOG.warn("Unable to make " + fromSvc +
" standby (unable to connect)", ioe);
} }
// Fence fromSvc if it's required or forced by the user // Fence fromSvc if it's required or forced by the user

View File

@ -188,7 +188,8 @@ class HealthMonitor {
proxy.monitorHealth(); proxy.monitorHealth();
healthy = true; healthy = true;
} catch (HealthCheckFailedException e) { } catch (HealthCheckFailedException e) {
LOG.warn("Service health check failed: " + e.getMessage()); LOG.warn("Service health check failed for " + targetToMonitor
+ ": " + e.getMessage());
enterState(State.SERVICE_UNHEALTHY); enterState(State.SERVICE_UNHEALTHY);
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Transport-level exception trying to monitor health of " + LOG.warn("Transport-level exception trying to monitor health of " +

View File

@ -0,0 +1,387 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import java.io.IOException;
import java.security.PrivilegedAction;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.ACL;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@InterfaceAudience.LimitedPrivate("HDFS")
public abstract class ZKFailoverController implements Tool {
static final Log LOG = LogFactory.getLog(ZKFailoverController.class);
// TODO: this should be namespace-scoped
public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
/** Unable to format the parent znode in ZK */
static final int ERR_CODE_FORMAT_DENIED = 2;
/** The parent znode doesn't exist in ZK */
static final int ERR_CODE_NO_PARENT_ZNODE = 3;
/** Fencing is not properly configured */
static final int ERR_CODE_NO_FENCER = 4;
private Configuration conf;
private HealthMonitor healthMonitor;
private ActiveStandbyElector elector;
private HAServiceTarget localTarget;
private String parentZnode;
private State lastHealthState = State.INITIALIZING;
/** Set if a fatal error occurs */
private String fatalError = null;
@Override
public void setConf(Configuration conf) {
this.conf = conf;
localTarget = getLocalTarget();
}
protected abstract byte[] targetToData(HAServiceTarget target);
protected abstract HAServiceTarget getLocalTarget();
protected abstract HAServiceTarget dataToTarget(byte[] data);
@Override
public Configuration getConf() {
return conf;
}
@Override
public int run(final String[] args) throws Exception {
// TODO: need to hook DFS here to find the NN keytab info, etc,
// similar to what DFSHAAdmin does. Annoying that this is in common.
try {
return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
@Override
public Integer run() {
try {
return doRun(args);
} catch (Exception t) {
throw new RuntimeException(t);
}
}
});
} catch (RuntimeException rte) {
throw (Exception)rte.getCause();
}
}
private int doRun(String[] args)
throws HadoopIllegalArgumentException, IOException, InterruptedException {
initZK();
if (args.length > 0) {
if ("-formatZK".equals(args[0])) {
boolean force = false;
boolean interactive = true;
for (int i = 1; i < args.length; i++) {
if ("-force".equals(args[i])) {
force = true;
} else if ("-nonInteractive".equals(args[i])) {
interactive = false;
} else {
badArg(args[i]);
}
}
return formatZK(force, interactive);
} else {
badArg(args[0]);
}
}
if (!elector.parentZNodeExists()) {
LOG.fatal("Unable to start failover controller. " +
"Parent znode does not exist.\n" +
"Run with -formatZK flag to initialize ZooKeeper.");
return ERR_CODE_NO_PARENT_ZNODE;
}
try {
localTarget.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
"You must configure a fencing method before using automatic " +
"failover.", e);
return ERR_CODE_NO_FENCER;
}
initHM();
mainLoop();
return 0;
}
private void badArg(String arg) {
printUsage();
throw new HadoopIllegalArgumentException(
"Bad argument: " + arg);
}
private void printUsage() {
System.err.println("Usage: " + this.getClass().getSimpleName() +
" [-formatZK [-force | -nonInteractive]]");
}
private int formatZK(boolean force, boolean interactive)
throws IOException, InterruptedException {
if (elector.parentZNodeExists()) {
if (!force && (!interactive || !confirmFormat())) {
return ERR_CODE_FORMAT_DENIED;
}
try {
elector.clearParentZNode();
} catch (IOException e) {
LOG.error("Unable to clear zk parent znode", e);
return 1;
}
}
elector.ensureParentZNode();
return 0;
}
private boolean confirmFormat() {
System.err.println(
"===============================================\n" +
"The configured parent znode " + parentZnode + " already exists.\n" +
"Are you sure you want to clear all failover information from\n" +
"ZooKeeper?\n" +
"WARNING: Before proceeding, ensure that all HDFS services and\n" +
"failover controllers are stopped!\n" +
"===============================================");
try {
return ToolRunner.confirmPrompt("Proceed formatting " + parentZnode + "?");
} catch (IOException e) {
LOG.debug("Failed to confirm", e);
return false;
}
}
// ------------------------------------------
// Begin actual guts of failover controller
// ------------------------------------------
private void initHM() {
healthMonitor = new HealthMonitor(conf, localTarget);
healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.start();
}
private void initZK() throws HadoopIllegalArgumentException, IOException {
String zkQuorum = conf.get(ZK_QUORUM_KEY);
int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
ZK_SESSION_TIMEOUT_DEFAULT);
parentZnode = conf.get(ZK_PARENT_ZNODE_KEY,
ZK_PARENT_ZNODE_DEFAULT);
// TODO: need ZK ACL support in config, also maybe auth!
List<ACL> zkAcls = Ids.OPEN_ACL_UNSAFE;
Preconditions.checkArgument(zkQuorum != null,
"Missing required configuration '%s' for ZooKeeper quorum",
ZK_QUORUM_KEY);
Preconditions.checkArgument(zkTimeout > 0,
"Invalid ZK session timeout %s", zkTimeout);
elector = new ActiveStandbyElector(zkQuorum,
zkTimeout, parentZnode, zkAcls, new ElectorCallbacks());
}
private synchronized void mainLoop() throws InterruptedException {
while (fatalError == null) {
wait();
}
assert fatalError != null; // only get here on fatal
throw new RuntimeException(
"ZK Failover Controller failed: " + fatalError);
}
private synchronized void fatalError(String err) {
LOG.fatal("Fatal error occurred:" + err);
fatalError = err;
notifyAll();
}
private synchronized void becomeActive() {
LOG.info("Trying to make " + localTarget + " active...");
try {
localTarget.getProxy().transitionToActive();
LOG.info("Successfully transitioned " + localTarget +
" to active state");
} catch (Throwable t) {
LOG.fatal("Couldn't make " + localTarget + " active", t);
elector.quitElection(true);
/*
* TODO:
* we need to make sure that if we get fenced and then quickly restarted,
* none of these calls will retry across the restart boundary
* perhaps the solution is that, whenever the nn starts, it gets a unique
* ID, and when we start becoming active, we record it, and then any future
* calls use the same ID
*/
}
}
private synchronized void becomeStandby() {
LOG.info("ZK Election indicated that " + localTarget +
" should become standby");
try {
localTarget.getProxy().transitionToStandby();
LOG.info("Successfully transitioned " + localTarget +
" to standby state");
} catch (Exception e) {
LOG.error("Couldn't transition " + localTarget + " to standby state",
e);
// TODO handle this. It's a likely case since we probably got fenced
// at the same time.
}
}
/**
* @return the last health state passed to the FC
* by the HealthMonitor.
*/
@VisibleForTesting
State getLastHealthState() {
return lastHealthState;
}
@VisibleForTesting
ActiveStandbyElector getElectorForTests() {
return elector;
}
/**
* Callbacks from elector
*/
class ElectorCallbacks implements ActiveStandbyElectorCallback {
@Override
public void becomeActive() {
ZKFailoverController.this.becomeActive();
}
@Override
public void becomeStandby() {
ZKFailoverController.this.becomeStandby();
}
@Override
public void enterNeutralMode() {
}
@Override
public void notifyFatalError(String errorMessage) {
fatalError(errorMessage);
}
@Override
public void fenceOldActive(byte[] data) {
HAServiceTarget target = dataToTarget(data);
LOG.info("Should fence: " + target);
boolean gracefulWorked =
FailoverController.tryGracefulFence(conf, target);
if (gracefulWorked) {
// It's possible that it's in standby but just about to go into active,
// no? Is there some race here?
LOG.info("Successfully transitioned " + target + " to standby " +
"state without fencing");
return;
}
try {
target.checkFencingConfigured();
} catch (BadFencingConfigurationException e) {
LOG.error("Couldn't fence old active " + target, e);
// TODO: see below todo
throw new RuntimeException(e);
}
if (!target.getFencer().fence(target)) {
// TODO: this will end up in some kind of tight loop,
// won't it? We need some kind of backoff
throw new RuntimeException("Unable to fence " + target);
}
}
}
/**
* Callbacks from HealthMonitor
*/
class HealthCallbacks implements HealthMonitor.Callback {
@Override
public void enteredState(HealthMonitor.State newState) {
LOG.info("Local service " + localTarget +
" entered state: " + newState);
switch (newState) {
case SERVICE_HEALTHY:
LOG.info("Joining master election for " + localTarget);
elector.joinElection(targetToData(localTarget));
break;
case INITIALIZING:
LOG.info("Ensuring that " + localTarget + " does not " +
"participate in active master election");
elector.quitElection(false);
break;
case SERVICE_UNHEALTHY:
case SERVICE_NOT_RESPONDING:
LOG.info("Quitting master election for " + localTarget +
" and marking that fencing is necessary");
elector.quitElection(true);
break;
case HEALTH_MONITOR_FAILED:
fatalError("Health monitor failed!");
break;
default:
throw new IllegalArgumentException("Unhandled state:" + newState);
}
lastHealthState = newState;
}
}
}

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.util; package org.apache.hadoop.util;
import java.io.IOException;
import java.io.PrintStream; import java.io.PrintStream;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -92,4 +93,33 @@ public class ToolRunner {
GenericOptionsParser.printGenericCommandUsage(out); GenericOptionsParser.printGenericCommandUsage(out);
} }
/**
* Print out a prompt to the user, and return true if the user
* responds with "y" or "yes". (case insensitive)
*/
public static boolean confirmPrompt(String prompt) throws IOException {
while (true) {
System.err.print(prompt + " (Y or N) ");
StringBuilder responseBuilder = new StringBuilder();
while (true) {
int c = System.in.read();
if (c == -1 || c == '\r' || c == '\n') {
break;
}
responseBuilder.append((char)c);
}
String response = responseBuilder.toString();
if (response.equalsIgnoreCase("y") ||
response.equalsIgnoreCase("yes")) {
return true;
} else if (response.equalsIgnoreCase("n") ||
response.equalsIgnoreCase("no")) {
return false;
}
System.err.println("Invalid input: " + response);
// else ask them again
}
}
} }

View File

@ -50,4 +50,15 @@ public abstract class ActiveStandbyElectorTestUtil {
Thread.sleep(50); Thread.sleep(50);
} }
} }
public static void waitForElectorState(TestContext ctx,
ActiveStandbyElector elector,
ActiveStandbyElector.State state) throws Exception {
while (elector.getStateForTests() != state) {
if (ctx != null) {
ctx.checkException();
}
Thread.sleep(50);
}
}
} }

View File

@ -19,12 +19,15 @@ package org.apache.hadoop.ha;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.mockito.Mockito; import org.mockito.Mockito;
import com.google.common.collect.Lists;
/** /**
* Test-only implementation of {@link HAServiceTarget}, which returns * Test-only implementation of {@link HAServiceTarget}, which returns
* a mock implementation. * a mock implementation.
@ -36,12 +39,20 @@ class DummyHAService extends HAServiceTarget {
InetSocketAddress address; InetSocketAddress address;
boolean isHealthy = true; boolean isHealthy = true;
boolean actUnreachable = false; boolean actUnreachable = false;
boolean failToBecomeActive;
static ArrayList<DummyHAService> instances = Lists.newArrayList();
int index;
DummyHAService(HAServiceState state, InetSocketAddress address) { DummyHAService(HAServiceState state, InetSocketAddress address) {
this.state = state; this.state = state;
this.proxy = makeMock(); this.proxy = makeMock();
this.fencer = Mockito.mock(NodeFencer.class); this.fencer = Mockito.mock(NodeFencer.class);
this.address = address; this.address = address;
synchronized (instances) {
instances.add(this);
this.index = instances.size();
}
} }
private HAServiceProtocol makeMock() { private HAServiceProtocol makeMock() {
@ -59,6 +70,10 @@ class DummyHAService extends HAServiceTarget {
public void transitionToActive() throws ServiceFailedException, public void transitionToActive() throws ServiceFailedException,
AccessControlException, IOException { AccessControlException, IOException {
checkUnreachable(); checkUnreachable();
if (failToBecomeActive) {
throw new ServiceFailedException("injected failure");
}
state = HAServiceState.ACTIVE; state = HAServiceState.ACTIVE;
} }
@ -106,4 +121,13 @@ class DummyHAService extends HAServiceTarget {
@Override @Override
public void checkFencingConfigured() throws BadFencingConfigurationException { public void checkFencingConfigured() throws BadFencingConfigurationException {
} }
@Override
public String toString() {
return "DummyHAService #" + index;
}
public static HAServiceTarget getInstance(int serial) {
return instances.get(serial - 1);
}
} }

View File

@ -156,6 +156,10 @@ public class TestNodeFencer {
@Override @Override
public void checkArgs(String args) { public void checkArgs(String args) {
} }
public static HAServiceTarget getLastFencedService() {
return fencedSvc;
}
} }
/** /**

View File

@ -0,0 +1,457 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import static org.junit.Assert.*;
import java.io.File;
import java.net.InetSocketAddress;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
import org.apache.log4j.Level;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.primitives.Ints;
public class TestZKFailoverController extends ClientBase {
private Configuration conf;
private DummyHAService svc1;
private DummyHAService svc2;
private TestContext ctx;
private DummyZKFCThread thr1, thr2;
static {
((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL);
}
@Override
public void setUp() throws Exception {
// build.test.dir is used by zookeeper
new File(System.getProperty("build.test.dir", "build")).mkdirs();
super.setUp();
}
@Before
public void setupConfAndServices() {
conf = new Configuration();
conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
// Fast check interval so tests run faster
conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
svc1 = new DummyHAService(HAServiceState.INITIALIZING,
new InetSocketAddress("svc1", 1234));
svc2 = new DummyHAService(HAServiceState.INITIALIZING,
new InetSocketAddress("svc2", 1234));
}
/**
* Set up two services and their failover controllers. svc1 is started
* first, so that it enters ACTIVE state, and then svc2 is started,
* which enters STANDBY
*/
private void setupFCs() throws Exception {
// Format the base dir, should succeed
assertEquals(0, runFC(svc1, "-formatZK"));
ctx = new MultithreadedTestUtil.TestContext();
thr1 = new DummyZKFCThread(ctx, svc1);
ctx.addThread(thr1);
thr1.start();
LOG.info("Waiting for svc1 to enter active state");
waitForHAState(svc1, HAServiceState.ACTIVE);
LOG.info("Adding svc2");
thr2 = new DummyZKFCThread(ctx, svc2);
thr2.start();
waitForHAState(svc2, HAServiceState.STANDBY);
}
private void stopFCs() throws Exception {
if (thr1 != null) {
thr1.interrupt();
}
if (thr2 != null) {
thr2.interrupt();
}
if (ctx != null) {
ctx.stop();
}
}
/**
* Test that the various command lines for formatting the ZK directory
* function correctly.
*/
@Test(timeout=15000)
public void testFormatZK() throws Exception {
// Run without formatting the base dir,
// should barf
assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
runFC(svc1));
// Format the base dir, should succeed
assertEquals(0, runFC(svc1, "-formatZK"));
// Should fail to format if already formatted
assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED,
runFC(svc1, "-formatZK", "-nonInteractive"));
// Unless '-force' is on
assertEquals(0, runFC(svc1, "-formatZK", "-force"));
}
/**
* Test that the ZKFC won't run if fencing is not configured for the
* local service.
*/
@Test(timeout=15000)
public void testFencingMustBeConfigured() throws Exception {
svc1 = Mockito.spy(svc1);
Mockito.doThrow(new BadFencingConfigurationException("no fencing"))
.when(svc1).checkFencingConfigured();
// Format the base dir, should succeed
assertEquals(0, runFC(svc1, "-formatZK"));
// Try to run the actual FC, should fail without a fencer
assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER,
runFC(svc1));
}
/**
* Test that, when the health monitor indicates bad health status,
* failover is triggered. Also ensures that graceful active->standby
* transition is used when possible, falling back to fencing when
* the graceful approach fails.
*/
@Test(timeout=15000)
public void testAutoFailoverOnBadHealth() throws Exception {
try {
setupFCs();
LOG.info("Faking svc1 unhealthy, should failover to svc2");
svc1.isHealthy = false;
LOG.info("Waiting for svc1 to enter standby state");
waitForHAState(svc1, HAServiceState.STANDBY);
waitForHAState(svc2, HAServiceState.ACTIVE);
LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " +
"and fail to gracefully go to standby");
svc1.isHealthy = true;
svc2.actUnreachable = true;
// Allow fencing to succeed
Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2));
// Should fail back to svc1 at this point
waitForHAState(svc1, HAServiceState.ACTIVE);
// and fence svc2
Mockito.verify(svc2.fencer).fence(Mockito.same(svc2));
} finally {
stopFCs();
}
}
@Test(timeout=15000)
public void testAutoFailoverOnLostZKSession() throws Exception {
try {
setupFCs();
// Expire svc1, it should fail over to svc2
expireAndVerifyFailover(thr1, thr2);
// Expire svc2, it should fail back to svc1
expireAndVerifyFailover(thr2, thr1);
LOG.info("======= Running test cases second time to test " +
"re-establishment =========");
// Expire svc1, it should fail over to svc2
expireAndVerifyFailover(thr1, thr2);
// Expire svc2, it should fail back to svc1
expireAndVerifyFailover(thr2, thr1);
} finally {
stopFCs();
}
}
private void expireAndVerifyFailover(DummyZKFCThread fromThr,
DummyZKFCThread toThr) throws Exception {
DummyHAService fromSvc = fromThr.zkfc.localTarget;
DummyHAService toSvc = toThr.zkfc.localTarget;
fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
try {
expireActiveLockHolder(fromSvc);
waitForHAState(fromSvc, HAServiceState.STANDBY);
waitForHAState(toSvc, HAServiceState.ACTIVE);
} finally {
fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
}
}
/**
* Test that, if the standby node is unhealthy, it doesn't try to become
* active
*/
@Test(timeout=15000)
public void testDontFailoverToUnhealthyNode() throws Exception {
try {
setupFCs();
// Make svc2 unhealthy, and wait for its FC to notice the bad health.
svc2.isHealthy = false;
waitForHealthState(thr2.zkfc,
HealthMonitor.State.SERVICE_UNHEALTHY);
// Expire svc1
thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
try {
expireActiveLockHolder(svc1);
LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" +
" a chance to take the lock, if it is ever going to.");
Thread.sleep(1000);
// Ensure that no one holds the lock.
waitForActiveLockHolder(null);
} finally {
LOG.info("Allowing svc1's elector to re-establish its connection");
thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
}
// svc1 should get the lock again
waitForActiveLockHolder(svc1);
} finally {
stopFCs();
}
}
/**
* Test that the ZKFC successfully quits the election when it fails to
* become active. This allows the old node to successfully fail back.
*/
@Test(timeout=15000)
public void testBecomingActiveFails() throws Exception {
try {
setupFCs();
LOG.info("Making svc2 fail to become active");
svc2.failToBecomeActive = true;
LOG.info("Faking svc1 unhealthy, should NOT successfully " +
"failover to svc2");
svc1.isHealthy = false;
waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
waitForActiveLockHolder(null);
Mockito.verify(svc2.proxy).transitionToActive();
waitForHAState(svc1, HAServiceState.STANDBY);
waitForHAState(svc2, HAServiceState.STANDBY);
LOG.info("Faking svc1 healthy again, should go back to svc1");
svc1.isHealthy = true;
waitForHAState(svc1, HAServiceState.ACTIVE);
waitForHAState(svc2, HAServiceState.STANDBY);
waitForActiveLockHolder(svc1);
} finally {
stopFCs();
}
}
/**
* Test that, when ZooKeeper fails, the system remains in its
* current state, without triggering any failovers, and without
* causing the active node to enter standby state.
*/
@Test(timeout=15000)
public void testZooKeeperFailure() throws Exception {
try {
setupFCs();
// Record initial ZK sessions
long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests();
long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests();
LOG.info("====== Stopping ZK server");
stopServer();
waitForServerDown(hostPort, CONNECTION_TIMEOUT);
LOG.info("====== Waiting for services to enter NEUTRAL mode");
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr1.zkfc.getElectorForTests(),
ActiveStandbyElector.State.NEUTRAL);
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr2.zkfc.getElectorForTests(),
ActiveStandbyElector.State.NEUTRAL);
LOG.info("====== Checking that the services didn't change HA state");
assertEquals(HAServiceState.ACTIVE, svc1.state);
assertEquals(HAServiceState.STANDBY, svc2.state);
LOG.info("====== Restarting server");
startServer();
waitForServerUp(hostPort, CONNECTION_TIMEOUT);
// Nodes should go back to their original states, since they re-obtain
// the same sessions.
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr1.zkfc.getElectorForTests(),
ActiveStandbyElector.State.ACTIVE);
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr2.zkfc.getElectorForTests(),
ActiveStandbyElector.State.STANDBY);
// Check HA states didn't change.
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr1.zkfc.getElectorForTests(),
ActiveStandbyElector.State.ACTIVE);
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr2.zkfc.getElectorForTests(),
ActiveStandbyElector.State.STANDBY);
// Check they re-used the same sessions and didn't spuriously reconnect
assertEquals(session1,
thr1.zkfc.getElectorForTests().getZKSessionIdForTests());
assertEquals(session2,
thr2.zkfc.getElectorForTests().getZKSessionIdForTests());
} finally {
stopFCs();
}
}
/**
* Expire the ZK session of the given service. This requires
* (and asserts) that the given service be the current active.
* @throws NoNodeException if no service holds the lock
*/
private void expireActiveLockHolder(DummyHAService expectedActive)
throws NoNodeException {
ZooKeeperServer zks = getServer(serverFactory);
Stat stat = new Stat();
byte[] data = zks.getZKDatabase().getData(
ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
ActiveStandbyElector.LOCK_FILENAME, stat, null);
assertArrayEquals(Ints.toByteArray(expectedActive.index), data);
long session = stat.getEphemeralOwner();
LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session);
zks.closeSession(session);
}
/**
* Wait for the given HA service to enter the given HA state.
*/
private void waitForHAState(DummyHAService svc, HAServiceState state)
throws Exception {
while (svc.state != state) {
ctx.checkException();
Thread.sleep(50);
}
}
/**
* Wait for the ZKFC to be notified of a change in health state.
*/
private void waitForHealthState(DummyZKFC zkfc, State state)
throws Exception {
while (zkfc.getLastHealthState() != state) {
ctx.checkException();
Thread.sleep(50);
}
}
/**
* Wait for the given HA service to become the active lock holder.
* If the passed svc is null, waits for there to be no active
* lock holder.
*/
private void waitForActiveLockHolder(DummyHAService svc)
throws Exception {
ZooKeeperServer zks = getServer(serverFactory);
ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
(svc == null) ? null : Ints.toByteArray(svc.index));
}
private int runFC(DummyHAService target, String ... args) throws Exception {
DummyZKFC zkfc = new DummyZKFC(target);
zkfc.setConf(conf);
return zkfc.run(args);
}
/**
* Test-thread which runs a ZK Failover Controller corresponding
* to a given dummy service.
*/
private class DummyZKFCThread extends TestingThread {
private final DummyZKFC zkfc;
public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
super(ctx);
this.zkfc = new DummyZKFC(svc);
zkfc.setConf(conf);
}
@Override
public void doWork() throws Exception {
try {
assertEquals(0, zkfc.run(new String[0]));
} catch (InterruptedException ie) {
// Interrupted by main thread, that's OK.
}
}
}
private static class DummyZKFC extends ZKFailoverController {
private final DummyHAService localTarget;
public DummyZKFC(DummyHAService localTarget) {
this.localTarget = localTarget;
}
@Override
protected byte[] targetToData(HAServiceTarget target) {
return Ints.toByteArray(((DummyHAService)target).index);
}
@Override
protected HAServiceTarget dataToTarget(byte[] data) {
int index = Ints.fromByteArray(data);
return DummyHAService.getInstance(index);
}
@Override
protected HAServiceTarget getLocalTarget() {
return localTarget;
}
}
}