HADOOP-8228. Auto HA: Refactor tests and add stress tests. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1307599 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-03-30 20:30:47 +00:00
parent 6693167028
commit 543701387f
8 changed files with 654 additions and 277 deletions

View File

@ -6,3 +6,4 @@ branch is merged.
HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd)
HADOOP-8228. Auto HA: Refactor tests and add stress tests. (todd)

View File

@ -140,7 +140,7 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
private static final int NUM_RETRIES = 3;
static int NUM_RETRIES = 3;
private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
private static enum ConnectionState {
@ -662,8 +662,12 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
}
@VisibleForTesting
long getZKSessionIdForTests() {
return zkClient.getSessionId();
synchronized long getZKSessionIdForTests() {
if (zkClient != null) {
return zkClient.getSessionId();
} else {
return -1;
}
}
@VisibleForTesting

View File

@ -146,7 +146,12 @@ public abstract class ZKFailoverController implements Tool {
}
initHM();
mainLoop();
try {
mainLoop();
} finally {
healthMonitor.shutdown();
healthMonitor.join();
}
return 0;
}

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.security.AccessControlException;
@ -34,6 +36,7 @@ import com.google.common.collect.Lists;
* a mock implementation.
*/
class DummyHAService extends HAServiceTarget {
public static final Log LOG = LogFactory.getLog(DummyHAService.class);
volatile HAServiceState state;
HAServiceProtocol proxy;
NodeFencer fencer;
@ -42,13 +45,21 @@ class DummyHAService extends HAServiceTarget {
boolean actUnreachable = false;
boolean failToBecomeActive;
DummySharedResource sharedResource;
static ArrayList<DummyHAService> instances = Lists.newArrayList();
int index;
DummyHAService(HAServiceState state, InetSocketAddress address) {
this.state = state;
this.proxy = makeMock();
this.fencer = Mockito.mock(NodeFencer.class);
try {
Configuration conf = new Configuration();
conf.set(NodeFencer.CONF_METHODS_KEY, DummyFencer.class.getName());
this.fencer = Mockito.spy(NodeFencer.create(conf));
} catch (BadFencingConfigurationException e) {
throw new RuntimeException(e);
}
this.address = address;
synchronized (instances) {
instances.add(this);
@ -56,6 +67,10 @@ class DummyHAService extends HAServiceTarget {
}
}
public void setSharedResource(DummySharedResource rsrc) {
this.sharedResource = rsrc;
}
private HAServiceProtocol makeMock() {
return Mockito.spy(new MockHAProtocolImpl());
}
@ -107,7 +122,9 @@ class DummyHAService extends HAServiceTarget {
if (failToBecomeActive) {
throw new ServiceFailedException("injected failure");
}
if (sharedResource != null) {
sharedResource.take(DummyHAService.this);
}
state = HAServiceState.ACTIVE;
}
@ -115,6 +132,9 @@ class DummyHAService extends HAServiceTarget {
public void transitionToStandby() throws ServiceFailedException,
AccessControlException, IOException {
checkUnreachable();
if (sharedResource != null) {
sharedResource.release(DummyHAService.this);
}
state = HAServiceState.STANDBY;
}
@ -138,4 +158,20 @@ class DummyHAService extends HAServiceTarget {
public void close() throws IOException {
}
}
public static class DummyFencer implements FenceMethod {
public void checkArgs(String args) throws BadFencingConfigurationException {
}
@Override
public boolean tryFence(HAServiceTarget target, String args)
throws BadFencingConfigurationException {
LOG.info("tryFence(" + target + ")");
DummyHAService svc = (DummyHAService)target;
svc.sharedResource.release(svc);
return true;
}
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import org.junit.Assert;
/**
* A fake shared resource, for use in automatic failover testing.
* This simulates a real shared resource like a shared edit log.
* When the {@link DummyHAService} instances change state or get
* fenced, they notify the shared resource, which asserts that
* we never have two HA services who think they're holding the
* resource at the same time.
*/
public class DummySharedResource {
private DummyHAService holder = null;
private int violations = 0;
public synchronized void take(DummyHAService newHolder) {
if (holder == null || holder == newHolder) {
holder = newHolder;
} else {
violations++;
throw new IllegalStateException("already held by: " + holder);
}
}
public synchronized void release(DummyHAService oldHolder) {
if (holder == oldHolder) {
holder = null;
}
}
public synchronized void assertNoViolations() {
Assert.assertEquals(0, violations);
}
}

View File

@ -0,0 +1,279 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ZooKeeperServer;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
/**
* Harness for starting two dummy ZK FailoverControllers, associated with
* DummyHAServices. This harness starts two such ZKFCs, designated by
* indexes 0 and 1, and provides utilities for building tests around them.
*/
public class MiniZKFCCluster {
private final TestContext ctx;
private final ZooKeeperServer zks;
private DummyHAService svcs[];
private DummyZKFCThread thrs[];
private Configuration conf;
private DummySharedResource sharedResource = new DummySharedResource();
private static final Log LOG = LogFactory.getLog(MiniZKFCCluster.class);
public MiniZKFCCluster(Configuration conf, ZooKeeperServer zks) {
this.conf = conf;
// Fast check interval so tests run faster
conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
svcs = new DummyHAService[2];
svcs[0] = new DummyHAService(HAServiceState.INITIALIZING,
new InetSocketAddress("svc1", 1234));
svcs[0].setSharedResource(sharedResource);
svcs[1] = new DummyHAService(HAServiceState.INITIALIZING,
new InetSocketAddress("svc2", 1234));
svcs[1].setSharedResource(sharedResource);
this.ctx = new TestContext();
this.zks = zks;
}
/**
* Set up two services and their failover controllers. svc1 is started
* first, so that it enters ACTIVE state, and then svc2 is started,
* which enters STANDBY
*/
public void start() throws Exception {
// Format the base dir, should succeed
thrs = new DummyZKFCThread[2];
thrs[0] = new DummyZKFCThread(ctx, svcs[0]);
assertEquals(0, thrs[0].zkfc.run(new String[]{"-formatZK"}));
ctx.addThread(thrs[0]);
thrs[0].start();
LOG.info("Waiting for svc0 to enter active state");
waitForHAState(0, HAServiceState.ACTIVE);
LOG.info("Adding svc1");
thrs[1] = new DummyZKFCThread(ctx, svcs[1]);
thrs[1].start();
waitForHAState(1, HAServiceState.STANDBY);
}
/**
* Stop the services.
* @throws Exception if either of the services had encountered a fatal error
*/
public void stop() throws Exception {
for (DummyZKFCThread thr : thrs) {
if (thr != null) {
thr.interrupt();
}
}
if (ctx != null) {
ctx.stop();
}
sharedResource.assertNoViolations();
}
/**
* @return the TestContext implementation used internally. This allows more
* threads to be added to the context, etc.
*/
public TestContext getTestContext() {
return ctx;
}
public DummyHAService getService(int i) {
return svcs[i];
}
public ActiveStandbyElector getElector(int i) {
return thrs[i].zkfc.getElectorForTests();
}
public void setHealthy(int idx, boolean healthy) {
svcs[idx].isHealthy = healthy;
}
public void setFailToBecomeActive(int idx, boolean doFail) {
svcs[idx].failToBecomeActive = doFail;
}
public void setUnreachable(int idx, boolean unreachable) {
svcs[idx].actUnreachable = unreachable;
}
/**
* Wait for the given HA service to enter the given HA state.
*/
public void waitForHAState(int idx, HAServiceState state)
throws Exception {
DummyHAService svc = getService(idx);
while (svc.state != state) {
ctx.checkException();
Thread.sleep(50);
}
}
/**
* Wait for the ZKFC to be notified of a change in health state.
*/
public void waitForHealthState(int idx, State state)
throws Exception {
ZKFailoverController zkfc = thrs[idx].zkfc;
while (zkfc.getLastHealthState() != state) {
ctx.checkException();
Thread.sleep(50);
}
}
/**
* Wait for the given elector to enter the given elector state.
* @param idx the service index (0 or 1)
* @param state the state to wait for
* @throws Exception if it times out, or an exception occurs on one
* of the ZKFC threads while waiting.
*/
public void waitForElectorState(int idx,
ActiveStandbyElector.State state) throws Exception {
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
getElector(idx), state);
}
/**
* Expire the ZK session of the given service. This requires
* (and asserts) that the given service be the current active.
* @throws NoNodeException if no service holds the lock
*/
public void expireActiveLockHolder(int idx)
throws NoNodeException {
Stat stat = new Stat();
byte[] data = zks.getZKDatabase().getData(
ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
ActiveStandbyElector.LOCK_FILENAME, stat, null);
assertArrayEquals(Ints.toByteArray(svcs[idx].index), data);
long session = stat.getEphemeralOwner();
LOG.info("Expiring svc " + idx + "'s zookeeper session " + session);
zks.closeSession(session);
}
/**
* Wait for the given HA service to become the active lock holder.
* If the passed svc is null, waits for there to be no active
* lock holder.
*/
public void waitForActiveLockHolder(Integer idx)
throws Exception {
DummyHAService svc = idx == null ? null : svcs[idx];
ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
(idx == null) ? null : Ints.toByteArray(svc.index));
}
/**
* Expires the ZK session associated with service 'fromIdx', and waits
* until service 'toIdx' takes over.
* @throws Exception if the target service does not become active
*/
public void expireAndVerifyFailover(int fromIdx, int toIdx)
throws Exception {
Preconditions.checkArgument(fromIdx != toIdx);
getElector(fromIdx).preventSessionReestablishmentForTests();
try {
expireActiveLockHolder(fromIdx);
waitForHAState(fromIdx, HAServiceState.STANDBY);
waitForHAState(toIdx, HAServiceState.ACTIVE);
} finally {
getElector(fromIdx).allowSessionReestablishmentForTests();
}
}
/**
* Test-thread which runs a ZK Failover Controller corresponding
* to a given dummy service.
*/
private class DummyZKFCThread extends TestingThread {
private final DummyZKFC zkfc;
public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
super(ctx);
this.zkfc = new DummyZKFC(svc);
zkfc.setConf(conf);
}
@Override
public void doWork() throws Exception {
try {
assertEquals(0, zkfc.run(new String[0]));
} catch (InterruptedException ie) {
// Interrupted by main thread, that's OK.
}
}
}
static class DummyZKFC extends ZKFailoverController {
private final DummyHAService localTarget;
public DummyZKFC(DummyHAService localTarget) {
this.localTarget = localTarget;
}
@Override
protected byte[] targetToData(HAServiceTarget target) {
return Ints.toByteArray(((DummyHAService)target).index);
}
@Override
protected HAServiceTarget dataToTarget(byte[] data) {
int index = Ints.fromByteArray(data);
return DummyHAService.getInstance(index);
}
@Override
protected HAServiceTarget getLocalTarget() {
return localTarget;
}
}
}

View File

@ -17,36 +17,24 @@
*/
package org.apache.hadoop.ha;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.io.File;
import java.net.InetSocketAddress;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
import org.apache.log4j.Level;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.test.ClientBase;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.primitives.Ints;
public class TestZKFailoverController extends ClientBase {
private Configuration conf;
private DummyHAService svc1;
private DummyHAService svc2;
private TestContext ctx;
private DummyZKFCThread thr1, thr2;
private MiniZKFCCluster cluster;
static {
((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL);
@ -63,49 +51,7 @@ public class TestZKFailoverController extends ClientBase {
public void setupConfAndServices() {
conf = new Configuration();
conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
// Fast check interval so tests run faster
conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
svc1 = new DummyHAService(HAServiceState.INITIALIZING,
new InetSocketAddress("svc1", 1234));
svc2 = new DummyHAService(HAServiceState.INITIALIZING,
new InetSocketAddress("svc2", 1234));
}
/**
* Set up two services and their failover controllers. svc1 is started
* first, so that it enters ACTIVE state, and then svc2 is started,
* which enters STANDBY
*/
private void setupFCs() throws Exception {
// Format the base dir, should succeed
assertEquals(0, runFC(svc1, "-formatZK"));
ctx = new MultithreadedTestUtil.TestContext();
thr1 = new DummyZKFCThread(ctx, svc1);
ctx.addThread(thr1);
thr1.start();
LOG.info("Waiting for svc1 to enter active state");
waitForHAState(svc1, HAServiceState.ACTIVE);
LOG.info("Adding svc2");
thr2 = new DummyZKFCThread(ctx, svc2);
thr2.start();
waitForHAState(svc2, HAServiceState.STANDBY);
}
private void stopFCs() throws Exception {
if (thr1 != null) {
thr1.interrupt();
}
if (thr2 != null) {
thr2.interrupt();
}
if (ctx != null) {
ctx.stop();
}
this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory));
}
/**
@ -114,20 +60,21 @@ public class TestZKFailoverController extends ClientBase {
*/
@Test(timeout=15000)
public void testFormatZK() throws Exception {
DummyHAService svc = cluster.getService(1);
// Run without formatting the base dir,
// should barf
assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
runFC(svc1));
runFC(svc));
// Format the base dir, should succeed
assertEquals(0, runFC(svc1, "-formatZK"));
assertEquals(0, runFC(svc, "-formatZK"));
// Should fail to format if already formatted
assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED,
runFC(svc1, "-formatZK", "-nonInteractive"));
runFC(svc, "-formatZK", "-nonInteractive"));
// Unless '-force' is on
assertEquals(0, runFC(svc1, "-formatZK", "-force"));
assertEquals(0, runFC(svc, "-formatZK", "-force"));
}
/**
@ -136,14 +83,14 @@ public class TestZKFailoverController extends ClientBase {
*/
@Test(timeout=15000)
public void testFencingMustBeConfigured() throws Exception {
svc1 = Mockito.spy(svc1);
DummyHAService svc = Mockito.spy(cluster.getService(0));
Mockito.doThrow(new BadFencingConfigurationException("no fencing"))
.when(svc1).checkFencingConfigured();
.when(svc).checkFencingConfigured();
// Format the base dir, should succeed
assertEquals(0, runFC(svc1, "-formatZK"));
assertEquals(0, runFC(svc, "-formatZK"));
// Try to run the actual FC, should fail without a fencer
assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER,
runFC(svc1));
runFC(svc));
}
/**
@ -155,66 +102,50 @@ public class TestZKFailoverController extends ClientBase {
@Test(timeout=15000)
public void testAutoFailoverOnBadHealth() throws Exception {
try {
setupFCs();
cluster.start();
DummyHAService svc1 = cluster.getService(1);
LOG.info("Faking svc1 unhealthy, should failover to svc2");
svc1.isHealthy = false;
LOG.info("Waiting for svc1 to enter standby state");
waitForHAState(svc1, HAServiceState.STANDBY);
waitForHAState(svc2, HAServiceState.ACTIVE);
LOG.info("Faking svc0 unhealthy, should failover to svc1");
cluster.setHealthy(0, false);
LOG.info("Waiting for svc0 to enter standby state");
cluster.waitForHAState(0, HAServiceState.STANDBY);
cluster.waitForHAState(1, HAServiceState.ACTIVE);
LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " +
LOG.info("Allowing svc0 to be healthy again, making svc1 unreachable " +
"and fail to gracefully go to standby");
svc1.isHealthy = true;
svc2.actUnreachable = true;
// Allow fencing to succeed
Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2));
// Should fail back to svc1 at this point
waitForHAState(svc1, HAServiceState.ACTIVE);
// and fence svc2
Mockito.verify(svc2.fencer).fence(Mockito.same(svc2));
cluster.setUnreachable(1, true);
cluster.setHealthy(0, true);
// Should fail back to svc0 at this point
cluster.waitForHAState(0, HAServiceState.ACTIVE);
// and fence svc1
Mockito.verify(svc1.fencer).fence(Mockito.same(svc1));
} finally {
stopFCs();
cluster.stop();
}
}
@Test(timeout=15000)
public void testAutoFailoverOnLostZKSession() throws Exception {
try {
setupFCs();
cluster.start();
// Expire svc1, it should fail over to svc2
expireAndVerifyFailover(thr1, thr2);
// Expire svc0, it should fail over to svc1
cluster.expireAndVerifyFailover(0, 1);
// Expire svc2, it should fail back to svc1
expireAndVerifyFailover(thr2, thr1);
// Expire svc1, it should fail back to svc0
cluster.expireAndVerifyFailover(1, 0);
LOG.info("======= Running test cases second time to test " +
"re-establishment =========");
// Expire svc1, it should fail over to svc2
expireAndVerifyFailover(thr1, thr2);
// Expire svc0, it should fail over to svc1
cluster.expireAndVerifyFailover(0, 1);
// Expire svc2, it should fail back to svc1
expireAndVerifyFailover(thr2, thr1);
// Expire svc1, it should fail back to svc0
cluster.expireAndVerifyFailover(1, 0);
} finally {
stopFCs();
}
}
private void expireAndVerifyFailover(DummyZKFCThread fromThr,
DummyZKFCThread toThr) throws Exception {
DummyHAService fromSvc = fromThr.zkfc.localTarget;
DummyHAService toSvc = toThr.zkfc.localTarget;
fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
try {
expireActiveLockHolder(fromSvc);
waitForHAState(fromSvc, HAServiceState.STANDBY);
waitForHAState(toSvc, HAServiceState.ACTIVE);
} finally {
fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
cluster.stop();
}
}
@ -225,33 +156,32 @@ public class TestZKFailoverController extends ClientBase {
@Test(timeout=15000)
public void testDontFailoverToUnhealthyNode() throws Exception {
try {
setupFCs();
cluster.start();
// Make svc2 unhealthy, and wait for its FC to notice the bad health.
svc2.isHealthy = false;
waitForHealthState(thr2.zkfc,
HealthMonitor.State.SERVICE_UNHEALTHY);
// Make svc1 unhealthy, and wait for its FC to notice the bad health.
cluster.setHealthy(1, false);
cluster.waitForHealthState(1, HealthMonitor.State.SERVICE_UNHEALTHY);
// Expire svc1
thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
// Expire svc0
cluster.getElector(0).preventSessionReestablishmentForTests();
try {
expireActiveLockHolder(svc1);
cluster.expireActiveLockHolder(0);
LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" +
LOG.info("Expired svc0's ZK session. Waiting a second to give svc1" +
" a chance to take the lock, if it is ever going to.");
Thread.sleep(1000);
// Ensure that no one holds the lock.
waitForActiveLockHolder(null);
cluster.waitForActiveLockHolder(null);
} finally {
LOG.info("Allowing svc1's elector to re-establish its connection");
thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
LOG.info("Allowing svc0's elector to re-establish its connection");
cluster.getElector(0).allowSessionReestablishmentForTests();
}
// svc1 should get the lock again
waitForActiveLockHolder(svc1);
// svc0 should get the lock again
cluster.waitForActiveLockHolder(0);
} finally {
stopFCs();
cluster.stop();
}
}
@ -262,36 +192,38 @@ public class TestZKFailoverController extends ClientBase {
@Test(timeout=15000)
public void testBecomingActiveFails() throws Exception {
try {
setupFCs();
cluster.start();
DummyHAService svc1 = cluster.getService(1);
LOG.info("Making svc2 fail to become active");
svc2.failToBecomeActive = true;
LOG.info("Making svc1 fail to become active");
cluster.setFailToBecomeActive(1, true);
LOG.info("Faking svc1 unhealthy, should NOT successfully " +
"failover to svc2");
svc1.isHealthy = false;
waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
waitForActiveLockHolder(null);
LOG.info("Faking svc0 unhealthy, should NOT successfully " +
"failover to svc1");
cluster.setHealthy(0, false);
cluster.waitForHealthState(0, State.SERVICE_UNHEALTHY);
cluster.waitForActiveLockHolder(null);
Mockito.verify(svc2.proxy, Mockito.timeout(2000).atLeastOnce())
Mockito.verify(svc1.proxy, Mockito.timeout(2000).atLeastOnce())
.transitionToActive();
waitForHAState(svc1, HAServiceState.STANDBY);
waitForHAState(svc2, HAServiceState.STANDBY);
cluster.waitForHAState(0, HAServiceState.STANDBY);
cluster.waitForHAState(1, HAServiceState.STANDBY);
LOG.info("Faking svc1 healthy again, should go back to svc1");
svc1.isHealthy = true;
waitForHAState(svc1, HAServiceState.ACTIVE);
waitForHAState(svc2, HAServiceState.STANDBY);
waitForActiveLockHolder(svc1);
LOG.info("Faking svc0 healthy again, should go back to svc0");
cluster.setHealthy(0, true);
cluster.waitForHAState(0, HAServiceState.ACTIVE);
cluster.waitForHAState(1, HAServiceState.STANDBY);
cluster.waitForActiveLockHolder(0);
// Ensure that we can fail back to thr2 once it it is able
// Ensure that we can fail back to svc1 once it it is able
// to become active (e.g the admin has restarted it)
LOG.info("Allowing svc2 to become active, expiring svc1");
svc2.failToBecomeActive = false;
expireAndVerifyFailover(thr1, thr2);
LOG.info("Allowing svc1 to become active, expiring svc0");
svc1.failToBecomeActive = false;
cluster.expireAndVerifyFailover(0, 1);
} finally {
stopFCs();
cluster.stop();
}
}
@ -303,27 +235,25 @@ public class TestZKFailoverController extends ClientBase {
@Test(timeout=15000)
public void testZooKeeperFailure() throws Exception {
try {
setupFCs();
cluster.start();
// Record initial ZK sessions
long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests();
long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests();
long session0 = cluster.getElector(0).getZKSessionIdForTests();
long session1 = cluster.getElector(1).getZKSessionIdForTests();
LOG.info("====== Stopping ZK server");
stopServer();
waitForServerDown(hostPort, CONNECTION_TIMEOUT);
LOG.info("====== Waiting for services to enter NEUTRAL mode");
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr1.zkfc.getElectorForTests(),
cluster.waitForElectorState(0,
ActiveStandbyElector.State.NEUTRAL);
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr2.zkfc.getElectorForTests(),
cluster.waitForElectorState(1,
ActiveStandbyElector.State.NEUTRAL);
LOG.info("====== Checking that the services didn't change HA state");
assertEquals(HAServiceState.ACTIVE, svc1.state);
assertEquals(HAServiceState.STANDBY, svc2.state);
assertEquals(HAServiceState.ACTIVE, cluster.getService(0).state);
assertEquals(HAServiceState.STANDBY, cluster.getService(1).state);
LOG.info("====== Restarting server");
startServer();
@ -331,134 +261,26 @@ public class TestZKFailoverController extends ClientBase {
// Nodes should go back to their original states, since they re-obtain
// the same sessions.
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr1.zkfc.getElectorForTests(),
ActiveStandbyElector.State.ACTIVE);
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr2.zkfc.getElectorForTests(),
ActiveStandbyElector.State.STANDBY);
cluster.waitForElectorState(0, ActiveStandbyElector.State.ACTIVE);
cluster.waitForElectorState(1, ActiveStandbyElector.State.STANDBY);
// Check HA states didn't change.
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr1.zkfc.getElectorForTests(),
ActiveStandbyElector.State.ACTIVE);
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
thr2.zkfc.getElectorForTests(),
ActiveStandbyElector.State.STANDBY);
cluster.waitForHAState(0, HAServiceState.ACTIVE);
cluster.waitForHAState(1, HAServiceState.STANDBY);
// Check they re-used the same sessions and didn't spuriously reconnect
assertEquals(session0,
cluster.getElector(0).getZKSessionIdForTests());
assertEquals(session1,
thr1.zkfc.getElectorForTests().getZKSessionIdForTests());
assertEquals(session2,
thr2.zkfc.getElectorForTests().getZKSessionIdForTests());
cluster.getElector(1).getZKSessionIdForTests());
} finally {
stopFCs();
cluster.stop();
}
}
/**
* Expire the ZK session of the given service. This requires
* (and asserts) that the given service be the current active.
* @throws NoNodeException if no service holds the lock
*/
private void expireActiveLockHolder(DummyHAService expectedActive)
throws NoNodeException {
ZooKeeperServer zks = getServer(serverFactory);
Stat stat = new Stat();
byte[] data = zks.getZKDatabase().getData(
ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
ActiveStandbyElector.LOCK_FILENAME, stat, null);
assertArrayEquals(Ints.toByteArray(expectedActive.index), data);
long session = stat.getEphemeralOwner();
LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session);
zks.closeSession(session);
}
/**
* Wait for the given HA service to enter the given HA state.
*/
private void waitForHAState(DummyHAService svc, HAServiceState state)
throws Exception {
while (svc.state != state) {
ctx.checkException();
Thread.sleep(50);
}
}
/**
* Wait for the ZKFC to be notified of a change in health state.
*/
private void waitForHealthState(DummyZKFC zkfc, State state)
throws Exception {
while (zkfc.getLastHealthState() != state) {
ctx.checkException();
Thread.sleep(50);
}
}
/**
* Wait for the given HA service to become the active lock holder.
* If the passed svc is null, waits for there to be no active
* lock holder.
*/
private void waitForActiveLockHolder(DummyHAService svc)
throws Exception {
ZooKeeperServer zks = getServer(serverFactory);
ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
(svc == null) ? null : Ints.toByteArray(svc.index));
}
private int runFC(DummyHAService target, String ... args) throws Exception {
DummyZKFC zkfc = new DummyZKFC(target);
zkfc.setConf(conf);
return zkfc.run(args);
}
/**
* Test-thread which runs a ZK Failover Controller corresponding
* to a given dummy service.
*/
private class DummyZKFCThread extends TestingThread {
private final DummyZKFC zkfc;
public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
super(ctx);
this.zkfc = new DummyZKFC(svc);
zkfc.setConf(conf);
}
@Override
public void doWork() throws Exception {
try {
assertEquals(0, zkfc.run(new String[0]));
} catch (InterruptedException ie) {
// Interrupted by main thread, that's OK.
}
}
}
private static class DummyZKFC extends ZKFailoverController {
private final DummyHAService localTarget;
public DummyZKFC(DummyHAService localTarget) {
this.localTarget = localTarget;
}
@Override
protected byte[] targetToData(HAServiceTarget target) {
return Ints.toByteArray(((DummyHAService)target).index);
}
@Override
protected HAServiceTarget dataToTarget(byte[] data) {
int index = Ints.fromByteArray(data);
return DummyHAService.getInstance(index);
}
@Override
protected HAServiceTarget getLocalTarget() {
return localTarget;
}
}
}

View File

@ -0,0 +1,178 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import java.io.File;
import java.util.Random;
import java.util.Set;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.JMXEnv;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
/**
* Stress test for ZKFailoverController.
* Starts multiple ZKFCs for dummy services, and then performs many automatic
* failovers. While doing so, ensures that a fake "shared resource"
* (simulating the shared edits dir) is only owned by one service at a time.
*/
public class TestZKFailoverControllerStress extends ClientBase {
private static final int STRESS_RUNTIME_SECS = 30;
private static final int EXTRA_TIMEOUT_SECS = 10;
private Configuration conf;
private MiniZKFCCluster cluster;
@Override
public void setUp() throws Exception {
// build.test.dir is used by zookeeper
new File(System.getProperty("build.test.dir", "build")).mkdirs();
super.setUp();
}
@Before
public void setupConfAndServices() throws Exception {
conf = new Configuration();
conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory));
cluster.start();
}
@After
public void stopCluster() throws Exception {
cluster.stop();
}
/**
* ZK seems to have a bug when we muck with its sessions
* behind its back, causing disconnects, etc. This bug
* ends up leaving JMX beans around at the end of the test,
* and ClientBase's teardown method will throw an exception
* if it finds JMX beans leaked. So, clear them out there
* to workaround the ZK bug. See ZOOKEEPER-1438.
*/
@After
public void clearZKJMX() throws Exception {
Set<ObjectName> names = JMXEnv.ensureAll();
for (ObjectName n : names) {
JMXEnv.conn().unregisterMBean(n);
}
}
/**
* Simply fail back and forth between two services for the
* configured amount of time, via expiring their ZK sessions.
*/
@Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
public void testExpireBackAndForth() throws Exception {
long st = System.currentTimeMillis();
long runFor = STRESS_RUNTIME_SECS * 1000;
int i = 0;
while (System.currentTimeMillis() - st < runFor) {
// flip flop the services back and forth
int from = i % 2;
int to = (i + 1) % 2;
// Expire one service, it should fail over to the other
LOG.info("Failing over via expiration from " + from + " to " + to);
cluster.expireAndVerifyFailover(from, to);
i++;
}
}
/**
* Randomly expire the ZK sessions of the two ZKFCs. This differs
* from the above test in that it is not a controlled failover -
* we just do random expirations and expect neither one to ever
* generate fatal exceptions.
*/
@Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
public void testRandomExpirations() throws Exception {
long st = System.currentTimeMillis();
long runFor = STRESS_RUNTIME_SECS * 1000;
Random r = new Random();
while (System.currentTimeMillis() - st < runFor) {
cluster.getTestContext().checkException();
int targetIdx = r.nextInt(2);
ActiveStandbyElector target = cluster.getElector(targetIdx);
long sessId = target.getZKSessionIdForTests();
if (sessId != -1) {
LOG.info(String.format("Expiring session %x for svc %d",
sessId, targetIdx));
getServer(serverFactory).closeSession(sessId);
}
Thread.sleep(r.nextInt(300));
}
}
/**
* Have the services fail their health checks half the time,
* causing the master role to bounce back and forth in the
* cluster. Meanwhile, causes ZK to disconnect clients every
* 50ms, to trigger the retry code and failures to become active.
*/
@Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000)
public void testRandomHealthAndDisconnects() throws Exception {
long runFor = STRESS_RUNTIME_SECS * 1000;
Mockito.doAnswer(new RandomlyThrow(0))
.when(cluster.getService(0).proxy).monitorHealth();
Mockito.doAnswer(new RandomlyThrow(1))
.when(cluster.getService(1).proxy).monitorHealth();
ActiveStandbyElector.NUM_RETRIES = 100;
long st = System.currentTimeMillis();
while (System.currentTimeMillis() - st < runFor) {
cluster.getTestContext().checkException();
serverFactory.closeAll();
Thread.sleep(50);
}
}
/**
* Randomly throw an exception half the time the method is called
*/
@SuppressWarnings("rawtypes")
private static class RandomlyThrow implements Answer {
private Random r = new Random();
private final int svcIdx;
public RandomlyThrow(int svcIdx) {
this.svcIdx = svcIdx;
}
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
if (r.nextBoolean()) {
LOG.info("Throwing an exception for svc " + svcIdx);
throw new HealthCheckFailedException("random failure");
}
return invocation.callRealMethod();
}
}
}