HADOOP-8163. Improve ActiveStandbyElector to provide hooks for fencing old active. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1304675 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-03-24 00:05:33 +00:00
parent 1ff0359aa0
commit 805c1280ce
4 changed files with 547 additions and 264 deletions

View File

@ -217,6 +217,9 @@ Release 0.23.3 - UNRELEASED
HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet. HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet.
(Sanjay Radia via szetszwo) (Sanjay Radia via szetszwo)
HADOOP-8163. Improve ActiveStandbyElector to provide hooks for
fencing old active. (todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.ha; package org.apache.hadoop.ha;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -26,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
@ -37,6 +39,7 @@ import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.Code;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/** /**
* *
@ -106,6 +109,15 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
* called to notify the app about it. * called to notify the app about it.
*/ */
void notifyFatalError(String errorMessage); void notifyFatalError(String errorMessage);
/**
* If an old active has failed, rather than exited gracefully, then
* the new active may need to take some fencing actions against it
* before proceeding with failover.
*
* @param oldActiveData the application data provided by the prior active
*/
void fenceOldActive(byte[] oldActiveData);
} }
/** /**
@ -113,7 +125,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
* classes * classes
*/ */
@VisibleForTesting @VisibleForTesting
protected static final String LOCKFILENAME = "ActiveStandbyElectorLock"; protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock";
@VisibleForTesting
protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb";
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
@ -139,6 +153,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
private final List<ACL> zkAcl; private final List<ACL> zkAcl;
private byte[] appData; private byte[] appData;
private final String zkLockFilePath; private final String zkLockFilePath;
private final String zkBreadCrumbPath;
private final String znodeWorkingDir; private final String znodeWorkingDir;
/** /**
@ -182,7 +197,8 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
zkAcl = acl; zkAcl = acl;
appClient = app; appClient = app;
znodeWorkingDir = parentZnodeName; znodeWorkingDir = parentZnodeName;
zkLockFilePath = znodeWorkingDir + "/" + LOCKFILENAME; zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
// createConnection for future API calls // createConnection for future API calls
createConnection(); createConnection();
@ -204,6 +220,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
*/ */
public synchronized void joinElection(byte[] data) public synchronized void joinElection(byte[] data)
throws HadoopIllegalArgumentException { throws HadoopIllegalArgumentException {
LOG.debug("Attempting active election"); LOG.debug("Attempting active election");
if (data == null) { if (data == null) {
@ -215,6 +232,49 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
joinElectionInternal(); joinElectionInternal();
} }
/**
* @return true if the configured parent znode exists
*/
public synchronized boolean parentZNodeExists()
throws IOException, InterruptedException {
Preconditions.checkState(zkClient != null);
try {
return zkClient.exists(znodeWorkingDir, false) != null;
} catch (KeeperException e) {
throw new IOException("Couldn't determine existence of znode '" +
znodeWorkingDir + "'", e);
}
}
/**
* Utility function to ensure that the configured base znode exists.
* This recursively creates the znode as well as all of its parents.
*/
public synchronized void ensureParentZNode()
throws IOException, InterruptedException {
String pathParts[] = znodeWorkingDir.split("/");
Preconditions.checkArgument(pathParts.length >= 1 &&
"".equals(pathParts[0]),
"Invalid path: %s", znodeWorkingDir);
StringBuilder sb = new StringBuilder();
for (int i = 1; i < pathParts.length; i++) {
sb.append("/").append(pathParts[i]);
String prefixPath = sb.toString();
LOG.debug("Ensuring existence of " + prefixPath);
try {
createWithRetries(prefixPath, new byte[]{}, zkAcl, CreateMode.PERSISTENT);
} catch (KeeperException e) {
if (isNodeExists(e.code())) {
// This is OK - just ensuring existence.
continue;
} else {
throw new IOException("Couldn't create " + prefixPath, e);
}
}
}
}
/** /**
* Any service instance can drop out of the election by calling quitElection. * Any service instance can drop out of the election by calling quitElection.
@ -225,9 +285,17 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
* call joinElection(). <br/> * call joinElection(). <br/>
* This allows service instances to take themselves out of rotation for known * This allows service instances to take themselves out of rotation for known
* impending unavailable states (e.g. long GC pause or software upgrade). * impending unavailable states (e.g. long GC pause or software upgrade).
*
* @param needFence true if the underlying daemon may need to be fenced
* if a failover occurs due to dropping out of the election.
*/ */
public synchronized void quitElection() { public synchronized void quitElection(boolean needFence) {
LOG.debug("Yielding from election"); LOG.debug("Yielding from election");
if (!needFence && state == State.ACTIVE) {
// If active is gracefully going back to standby mode, remove
// our permanent znode so no one fences us.
tryDeleteOwnBreadCrumbNode();
}
reset(); reset();
} }
@ -260,7 +328,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
return zkClient.getData(zkLockFilePath, false, stat); return zkClient.getData(zkLockFilePath, false, stat);
} catch(KeeperException e) { } catch(KeeperException e) {
Code code = e.code(); Code code = e.code();
if (operationNodeDoesNotExist(code)) { if (isNodeDoesNotExist(code)) {
// handle the commonly expected cases that make sense for us // handle the commonly expected cases that make sense for us
throw new ActiveNotFoundException(); throw new ActiveNotFoundException();
} else { } else {
@ -284,14 +352,14 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
Code code = Code.get(rc); Code code = Code.get(rc);
if (operationSuccess(code)) { if (isSuccess(code)) {
// we successfully created the znode. we are the leader. start monitoring // we successfully created the znode. we are the leader. start monitoring
becomeActive(); becomeActive();
monitorActiveStatus(); monitorActiveStatus();
return; return;
} }
if (operationNodeExists(code)) { if (isNodeExists(code)) {
if (createRetryCount == 0) { if (createRetryCount == 0) {
// znode exists and we did not retry the operation. so a different // znode exists and we did not retry the operation. so a different
// instance has created it. become standby and monitor lock. // instance has created it. become standby and monitor lock.
@ -306,14 +374,14 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
String errorMessage = "Received create error from Zookeeper. code:" String errorMessage = "Received create error from Zookeeper. code:"
+ code.toString(); + code.toString() + " for path " + path;
LOG.debug(errorMessage); LOG.debug(errorMessage);
if (operationRetry(code)) { if (shouldRetry(code)) {
if (createRetryCount < NUM_RETRIES) { if (createRetryCount < NUM_RETRIES) {
LOG.debug("Retrying createNode createRetryCount: " + createRetryCount); LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
++createRetryCount; ++createRetryCount;
createNode(); createLockNodeAsync();
return; return;
} }
errorMessage = errorMessage errorMessage = errorMessage
@ -338,7 +406,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
Code code = Code.get(rc); Code code = Code.get(rc);
if (operationSuccess(code)) { if (isSuccess(code)) {
// the following owner check completes verification in case the lock znode // the following owner check completes verification in case the lock znode
// creation was retried // creation was retried
if (stat.getEphemeralOwner() == zkClient.getSessionId()) { if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
@ -352,7 +420,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
return; return;
} }
if (operationNodeDoesNotExist(code)) { if (isNodeDoesNotExist(code)) {
// the lock znode disappeared before we started monitoring it // the lock znode disappeared before we started monitoring it
enterNeutralMode(); enterNeutralMode();
joinElectionInternal(); joinElectionInternal();
@ -363,10 +431,10 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
+ code.toString(); + code.toString();
LOG.debug(errorMessage); LOG.debug(errorMessage);
if (operationRetry(code)) { if (shouldRetry(code)) {
if (statRetryCount < NUM_RETRIES) { if (statRetryCount < NUM_RETRIES) {
++statRetryCount; ++statRetryCount;
monitorNode(); monitorLockNodeAsync();
return; return;
} }
errorMessage = errorMessage errorMessage = errorMessage
@ -470,7 +538,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
private void monitorActiveStatus() { private void monitorActiveStatus() {
LOG.debug("Monitoring active leader"); LOG.debug("Monitoring active leader");
statRetryCount = 0; statRetryCount = 0;
monitorNode(); monitorLockNodeAsync();
} }
private void joinElectionInternal() { private void joinElectionInternal() {
@ -482,7 +550,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
createRetryCount = 0; createRetryCount = 0;
createNode(); createLockNodeAsync();
} }
private void reJoinElection() { private void reJoinElection() {
@ -515,7 +583,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
private void createConnection() throws IOException { private void createConnection() throws IOException {
zkClient = getNewZooKeeper(); zkClient = getNewZooKeeper();
} }
private void terminateConnection() { private void terminateConnection() {
if (zkClient == null) { if (zkClient == null) {
return; return;
@ -538,12 +606,110 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
private void becomeActive() { private void becomeActive() {
if (state != State.ACTIVE) { if (state != State.ACTIVE) {
try {
Stat oldBreadcrumbStat = fenceOldActive();
writeBreadCrumbNode(oldBreadcrumbStat);
} catch (Exception e) {
LOG.warn("Exception handling the winning of election", e);
reJoinElection();
return;
}
LOG.debug("Becoming active"); LOG.debug("Becoming active");
state = State.ACTIVE; state = State.ACTIVE;
appClient.becomeActive(); appClient.becomeActive();
} }
} }
/**
* Write the "ActiveBreadCrumb" node, indicating that this node may need
* to be fenced on failover.
* @param oldBreadcrumbStat
*/
private void writeBreadCrumbNode(Stat oldBreadcrumbStat)
throws KeeperException, InterruptedException {
LOG.info("Writing znode " + zkBreadCrumbPath +
" to indicate that the local node is the most recent active...");
if (oldBreadcrumbStat == null) {
// No previous active, just create the node
createWithRetries(zkBreadCrumbPath, appData, zkAcl,
CreateMode.PERSISTENT);
} else {
// There was a previous active, update the node
setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion());
}
}
/**
* Try to delete the "ActiveBreadCrumb" node when gracefully giving up
* active status.
* If this fails, it will simply warn, since the graceful release behavior
* is only an optimization.
*/
private void tryDeleteOwnBreadCrumbNode() {
assert state == State.ACTIVE;
LOG.info("Deleting bread-crumb of active node...");
// Sanity check the data. This shouldn't be strictly necessary,
// but better to play it safe.
Stat stat = new Stat();
byte[] data = null;
try {
data = zkClient.getData(zkBreadCrumbPath, false, stat);
if (!Arrays.equals(data, appData)) {
throw new IllegalStateException(
"We thought we were active, but in fact " +
"the active znode had the wrong data: " +
StringUtils.byteToHexString(data) + " (stat=" + stat + ")");
}
deleteWithRetries(zkBreadCrumbPath, stat.getVersion());
} catch (Exception e) {
LOG.warn("Unable to delete our own bread-crumb of being active at " +
zkBreadCrumbPath + ": " + e.getLocalizedMessage() + ". " +
"Expecting to be fenced by the next active.");
}
}
/**
* If there is a breadcrumb node indicating that another node may need
* fencing, try to fence that node.
* @return the Stat of the breadcrumb node that was read, or null
* if no breadcrumb node existed
*/
private Stat fenceOldActive() throws InterruptedException, KeeperException {
final Stat stat = new Stat();
byte[] data;
LOG.info("Checking for any old active which needs to be fenced...");
try {
data = zkDoWithRetries(new ZKAction<byte[]>() {
@Override
public byte[] run() throws KeeperException, InterruptedException {
return zkClient.getData(zkBreadCrumbPath, false, stat);
}
});
} catch (KeeperException ke) {
if (isNodeDoesNotExist(ke.code())) {
LOG.info("No old node to fence");
return null;
}
// If we failed to read for any other reason, then likely we lost
// our session, or we don't have permissions, etc. In any case,
// we probably shouldn't become active, and failing the whole
// thing is the best bet.
throw ke;
}
LOG.info("Old node exists: " + StringUtils.byteToHexString(data));
if (Arrays.equals(data, appData)) {
LOG.info("But old node has our own data, so don't need to fence it.");
} else {
appClient.fenceOldActive(data);
}
return stat;
}
private void becomeStandby() { private void becomeStandby() {
if (state != State.STANDBY) { if (state != State.STANDBY) {
LOG.debug("Becoming standby"); LOG.debug("Becoming standby");
@ -560,28 +726,76 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
} }
private void createNode() { private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this,
null); null);
} }
private void monitorNode() { private void monitorLockNodeAsync() {
zkClient.exists(zkLockFilePath, true, this, null); zkClient.exists(zkLockFilePath, this, this, null);
} }
private boolean operationSuccess(Code code) { private String createWithRetries(final String path, final byte[] data,
final List<ACL> acl, final CreateMode mode)
throws InterruptedException, KeeperException {
return zkDoWithRetries(new ZKAction<String>() {
public String run() throws KeeperException, InterruptedException {
return zkClient.create(path, data, acl, mode);
}
});
}
private Stat setDataWithRetries(final String path, final byte[] data,
final int version) throws InterruptedException, KeeperException {
return zkDoWithRetries(new ZKAction<Stat>() {
public Stat run() throws KeeperException, InterruptedException {
return zkClient.setData(path, data, version);
}
});
}
private void deleteWithRetries(final String path, final int version)
throws KeeperException, InterruptedException {
zkDoWithRetries(new ZKAction<Void>() {
public Void run() throws KeeperException, InterruptedException {
zkClient.delete(path, version);
return null;
}
});
}
private static <T> T zkDoWithRetries(ZKAction<T> action)
throws KeeperException, InterruptedException {
int retry = 0;
while (true) {
try {
return action.run();
} catch (KeeperException ke) {
if (shouldRetry(ke.code()) && ++retry < NUM_RETRIES) {
continue;
}
throw ke;
}
}
}
private interface ZKAction<T> {
T run() throws KeeperException, InterruptedException;
}
private static boolean isSuccess(Code code) {
return (code == Code.OK); return (code == Code.OK);
} }
private boolean operationNodeExists(Code code) { private static boolean isNodeExists(Code code) {
return (code == Code.NODEEXISTS); return (code == Code.NODEEXISTS);
} }
private boolean operationNodeDoesNotExist(Code code) { private static boolean isNodeDoesNotExist(Code code) {
return (code == Code.NONODE); return (code == Code.NONODE);
} }
private boolean operationRetry(Code code) { private static boolean shouldRetry(Code code) {
switch (code) { switch (code) {
case CONNECTIONLOSS: case CONNECTIONLOSS:
case OPERATIONTIMEOUT: case OPERATIONTIMEOUT:

View File

@ -42,12 +42,12 @@ import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
public class TestActiveStandbyElector { public class TestActiveStandbyElector {
static ZooKeeper mockZK; private ZooKeeper mockZK;
static int count; private int count;
static ActiveStandbyElectorCallback mockApp; private ActiveStandbyElectorCallback mockApp;
static final byte[] data = new byte[8]; private final byte[] data = new byte[8];
ActiveStandbyElectorTester elector; private ActiveStandbyElectorTester elector;
class ActiveStandbyElectorTester extends ActiveStandbyElector { class ActiveStandbyElectorTester extends ActiveStandbyElector {
ActiveStandbyElectorTester(String hostPort, int timeout, String parent, ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
@ -57,25 +57,45 @@ public class TestActiveStandbyElector {
@Override @Override
public ZooKeeper getNewZooKeeper() { public ZooKeeper getNewZooKeeper() {
++TestActiveStandbyElector.count; ++count;
return TestActiveStandbyElector.mockZK; return mockZK;
} }
} }
private static final String zkParentName = "/zookeeper"; private static final String ZK_PARENT_NAME = "/parent/node";
private static final String zkLockPathName = "/zookeeper/" private static final String ZK_LOCK_NAME = ZK_PARENT_NAME + "/" +
+ ActiveStandbyElector.LOCKFILENAME; ActiveStandbyElector.LOCK_FILENAME;
private static final String ZK_BREADCRUMB_NAME = ZK_PARENT_NAME + "/" +
ActiveStandbyElector.BREADCRUMB_FILENAME;
@Before @Before
public void init() throws IOException { public void init() throws IOException {
count = 0; count = 0;
mockZK = Mockito.mock(ZooKeeper.class); mockZK = Mockito.mock(ZooKeeper.class);
mockApp = Mockito.mock(ActiveStandbyElectorCallback.class); mockApp = Mockito.mock(ActiveStandbyElectorCallback.class);
elector = new ActiveStandbyElectorTester("hostPort", 1000, zkParentName, elector = new ActiveStandbyElectorTester("hostPort", 1000, ZK_PARENT_NAME,
Ids.OPEN_ACL_UNSAFE, mockApp); Ids.OPEN_ACL_UNSAFE, mockApp);
} }
/**
* Set up the mock ZK to return no info for a prior active in ZK.
*/
private void mockNoPriorActive() throws Exception {
Mockito.doThrow(new KeeperException.NoNodeException()).when(mockZK)
.getData(Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.anyBoolean(),
Mockito.<Stat>any());
}
/**
* Set up the mock to return info for some prior active node in ZK./
*/
private void mockPriorActive(byte[] data) throws Exception {
Mockito.doReturn(data).when(mockZK)
.getData(Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.anyBoolean(),
Mockito.<Stat>any());
}
/** /**
* verify that joinElection checks for null data * verify that joinElection checks for null data
*/ */
@ -90,7 +110,7 @@ public class TestActiveStandbyElector {
@Test @Test
public void testJoinElection() { public void testJoinElection() {
elector.joinElection(data); elector.joinElection(data);
Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data, Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
} }
@ -99,30 +119,74 @@ public class TestActiveStandbyElector {
* started * started
*/ */
@Test @Test
public void testCreateNodeResultBecomeActive() { public void testCreateNodeResultBecomeActive() throws Exception {
mockNoPriorActive();
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.OK.intValue(), zkLockPathName, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
// monitor callback verifies the leader is ephemeral owner of lock but does // monitor callback verifies the leader is ephemeral owner of lock but does
// not call becomeActive since its already active // not call becomeActive since its already active
Stat stat = new Stat(); Stat stat = new Stat();
stat.setEphemeralOwner(1L); stat.setEphemeralOwner(1L);
Mockito.when(mockZK.getSessionId()).thenReturn(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L);
elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
// should not call neutral mode/standby/active // should not call neutral mode/standby/active
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
Mockito.verify(mockApp, Mockito.times(0)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(0)).becomeStandby();
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
// another joinElection not called. // another joinElection not called.
Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data, Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
// no new monitor called // no new monitor called
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null); }
/**
* Verify that, if there is a record of a prior active node, the
* elector asks the application to fence it before becoming active.
*/
@Test
public void testFencesOldActive() throws Exception {
byte[] fakeOldActiveData = new byte[0];
mockPriorActive(fakeOldActiveData);
elector.joinElection(data);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
ZK_LOCK_NAME);
// Application fences active.
Mockito.verify(mockApp, Mockito.times(1)).fenceOldActive(
fakeOldActiveData);
// Updates breadcrumb node to new data
Mockito.verify(mockZK, Mockito.times(1)).setData(
Mockito.eq(ZK_BREADCRUMB_NAME),
Mockito.eq(data),
Mockito.eq(0));
// Then it becomes active itself
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
}
@Test
public void testQuitElectionRemovesBreadcrumbNode() throws Exception {
mockNoPriorActive();
elector.joinElection(data);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
ZK_LOCK_NAME);
// Writes its own active info
Mockito.verify(mockZK, Mockito.times(1)).create(
Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(data),
Mockito.eq(Ids.OPEN_ACL_UNSAFE),
Mockito.eq(CreateMode.PERSISTENT));
mockPriorActive(data);
elector.quitElection(false);
// Deletes its own active data
Mockito.verify(mockZK, Mockito.times(1)).delete(
Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(0));
} }
/** /**
@ -133,11 +197,10 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultBecomeStandby() { public void testCreateNodeResultBecomeStandby() {
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
} }
/** /**
@ -147,10 +210,11 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultError() { public void testCreateNodeResultError() {
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.APIERROR.intValue(), zkLockPathName, null, elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Received create error from Zookeeper. code:APIERROR"); "Received create error from Zookeeper. code:APIERROR " +
"for path " + ZK_LOCK_NAME);
} }
/** /**
@ -158,42 +222,43 @@ public class TestActiveStandbyElector {
* becomes active if they match. monitoring is started. * becomes active if they match. monitoring is started.
*/ */
@Test @Test
public void testCreateNodeResultRetryBecomeActive() { public void testCreateNodeResultRetryBecomeActive() throws Exception {
mockNoPriorActive();
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
// 4 errors results in fatalError // 4 errors results in fatalError
Mockito Mockito
.verify(mockApp, Mockito.times(1)) .verify(mockApp, Mockito.times(1))
.notifyFatalError( .notifyFatalError(
"Received create error from Zookeeper. code:CONNECTIONLOSS. "+ "Received create error from Zookeeper. code:CONNECTIONLOSS " +
"for path " + ZK_LOCK_NAME + ". " +
"Not retrying further znode create connection errors."); "Not retrying further znode create connection errors.");
elector.joinElection(data); elector.joinElection(data);
// recreate connection via getNewZooKeeper // recreate connection via getNewZooKeeper
Assert.assertEquals(2, TestActiveStandbyElector.count); Assert.assertEquals(2, count);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
Stat stat = new Stat(); Stat stat = new Stat();
stat.setEphemeralOwner(1L); stat.setEphemeralOwner(1L);
Mockito.when(mockZK.getSessionId()).thenReturn(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L);
elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null); Mockito.verify(mockZK, Mockito.times(6)).create(ZK_LOCK_NAME, data,
Mockito.verify(mockZK, Mockito.times(6)).create(zkLockPathName, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
} }
@ -205,20 +270,18 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultRetryBecomeStandby() { public void testCreateNodeResultRetryBecomeStandby() {
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
Stat stat = new Stat(); Stat stat = new Stat();
stat.setEphemeralOwner(0); stat.setEphemeralOwner(0);
Mockito.when(mockZK.getSessionId()).thenReturn(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L);
elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
} }
/** /**
@ -230,19 +293,18 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultRetryNoNode() { public void testCreateNodeResultRetryNoNode() {
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
elector.processResult(Code.NONODE.intValue(), zkLockPathName, null, elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, null,
(Stat) null); (Stat) null);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
Mockito.verify(mockZK, Mockito.times(4)).create(zkLockPathName, data, Mockito.verify(mockZK, Mockito.times(4)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
} }
@ -251,13 +313,13 @@ public class TestActiveStandbyElector {
*/ */
@Test @Test
public void testStatNodeRetry() { public void testStatNodeRetry() {
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
(Stat) null); (Stat) null);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
(Stat) null); (Stat) null);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
(Stat) null); (Stat) null);
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null,
(Stat) null); (Stat) null);
Mockito Mockito
.verify(mockApp, Mockito.times(1)) .verify(mockApp, Mockito.times(1))
@ -271,7 +333,7 @@ public class TestActiveStandbyElector {
*/ */
@Test @Test
public void testStatNodeError() { public void testStatNodeError() {
elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), zkLockPathName, elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME,
null, (Stat) null); null, (Stat) null);
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
@ -282,7 +344,8 @@ public class TestActiveStandbyElector {
* verify behavior of watcher.process callback with non-node event * verify behavior of watcher.process callback with non-node event
*/ */
@Test @Test
public void testProcessCallbackEventNone() { public void testProcessCallbackEventNone() throws Exception {
mockNoPriorActive();
elector.joinElection(data); elector.joinElection(data);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
@ -306,8 +369,7 @@ public class TestActiveStandbyElector {
Mockito.when(mockEvent.getState()).thenReturn( Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.SyncConnected); Event.KeeperState.SyncConnected);
elector.process(mockEvent); elector.process(mockEvent);
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
// session expired should enter safe mode and initiate re-election // session expired should enter safe mode and initiate re-election
// re-election checked via checking re-creation of new zookeeper and // re-election checked via checking re-creation of new zookeeper and
@ -318,17 +380,16 @@ public class TestActiveStandbyElector {
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// called getNewZooKeeper to create new session. first call was in // called getNewZooKeeper to create new session. first call was in
// constructor // constructor
Assert.assertEquals(2, TestActiveStandbyElector.count); Assert.assertEquals(2, count);
// once in initial joinElection and one now // once in initial joinElection and one now
Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data, Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
// create znode success. become master and monitor // create znode success. become master and monitor
elector.processResult(Code.OK.intValue(), zkLockPathName, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true, verifyExistCall(2);
elector, null);
// error event results in fatal error // error event results in fatal error
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed); Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed);
@ -343,32 +404,30 @@ public class TestActiveStandbyElector {
* verify behavior of watcher.process with node event * verify behavior of watcher.process with node event
*/ */
@Test @Test
public void testProcessCallbackEventNode() { public void testProcessCallbackEventNode() throws Exception {
mockNoPriorActive();
elector.joinElection(data); elector.joinElection(data);
// make the object go into the monitoring state // make the object go into the monitoring state
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName); Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
// monitoring should be setup again after event is received // monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn( Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeDataChanged); Event.EventType.NodeDataChanged);
elector.process(mockEvent); elector.process(mockEvent);
Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true, verifyExistCall(2);
elector, null);
// monitoring should be setup again after event is received // monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn( Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeChildrenChanged); Event.EventType.NodeChildrenChanged);
elector.process(mockEvent); elector.process(mockEvent);
Mockito.verify(mockZK, Mockito.times(3)).exists(zkLockPathName, true, verifyExistCall(3);
elector, null);
// lock node deletion when in standby mode should create znode again // lock node deletion when in standby mode should create znode again
// successful znode creation enters active state and sets monitor // successful znode creation enters active state and sets monitor
@ -377,13 +436,12 @@ public class TestActiveStandbyElector {
// enterNeutralMode not called when app is standby and leader is lost // enterNeutralMode not called when app is standby and leader is lost
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// once in initial joinElection() and one now // once in initial joinElection() and one now
Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data, Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
elector.processResult(Code.OK.intValue(), zkLockPathName, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
Mockito.verify(mockZK, Mockito.times(4)).exists(zkLockPathName, true, verifyExistCall(4);
elector, null);
// lock node deletion in active mode should enter neutral mode and create // lock node deletion in active mode should enter neutral mode and create
// znode again successful znode creation enters active state and sets // znode again successful znode creation enters active state and sets
@ -392,13 +450,12 @@ public class TestActiveStandbyElector {
elector.process(mockEvent); elector.process(mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// another joinElection called // another joinElection called
Mockito.verify(mockZK, Mockito.times(3)).create(zkLockPathName, data, Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
elector.processResult(Code.OK.intValue(), zkLockPathName, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(2)).becomeActive(); Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true, verifyExistCall(5);
elector, null);
// bad path name results in fatal error // bad path name results in fatal error
Mockito.when(mockEvent.getPath()).thenReturn(null); Mockito.when(mockEvent.getPath()).thenReturn(null);
@ -406,13 +463,17 @@ public class TestActiveStandbyElector {
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Unexpected watch error from Zookeeper"); "Unexpected watch error from Zookeeper");
// fatal error means no new connection other than one from constructor // fatal error means no new connection other than one from constructor
Assert.assertEquals(1, TestActiveStandbyElector.count); Assert.assertEquals(1, count);
// no new watches after fatal error // no new watches after fatal error
Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true, verifyExistCall(5);
elector, null);
} }
private void verifyExistCall(int times) {
Mockito.verify(mockZK, Mockito.times(times)).exists(
ZK_LOCK_NAME, elector, elector, null);
}
/** /**
* verify becomeStandby is not called if already in standby * verify becomeStandby is not called if already in standby
*/ */
@ -421,14 +482,13 @@ public class TestActiveStandbyElector {
elector.joinElection(data); elector.joinElection(data);
// make the object go into the monitoring standby state // make the object go into the monitoring standby state
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName); Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME);
// notify node deletion // notify node deletion
// monitoring should be setup again after event is received // monitoring should be setup again after event is received
@ -437,16 +497,15 @@ public class TestActiveStandbyElector {
// is standby. no need to notify anything now // is standby. no need to notify anything now
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// another joinElection called. // another joinElection called.
Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data, Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
// lost election // lost election
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
// still standby. so no need to notify again // still standby. so no need to notify again
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
// monitor is set again // monitor is set again
Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true, verifyExistCall(2);
elector, null);
} }
/** /**
@ -454,22 +513,20 @@ public class TestActiveStandbyElector {
* next call to joinElection creates new connection and performs election * next call to joinElection creates new connection and performs election
*/ */
@Test @Test
public void testQuitElection() throws InterruptedException { public void testQuitElection() throws Exception {
elector.quitElection(); elector.quitElection(true);
Mockito.verify(mockZK, Mockito.times(1)).close(); Mockito.verify(mockZK, Mockito.times(1)).close();
// no watches added // no watches added
Mockito.verify(mockZK, Mockito.times(0)).exists(zkLockPathName, true, verifyExistCall(0);
elector, null);
byte[] data = new byte[8]; byte[] data = new byte[8];
elector.joinElection(data); elector.joinElection(data);
// getNewZooKeeper called 2 times. once in constructor and once now // getNewZooKeeper called 2 times. once in constructor and once now
Assert.assertEquals(2, TestActiveStandbyElector.count); Assert.assertEquals(2, count);
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null,
zkLockPathName); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, verifyExistCall(1);
elector, null);
} }
@ -488,16 +545,16 @@ public class TestActiveStandbyElector {
// get valid active data // get valid active data
byte[] data = new byte[8]; byte[] data = new byte[8];
Mockito.when( Mockito.when(
mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false), mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject())).thenReturn(data); Mockito.<Stat> anyObject())).thenReturn(data);
Assert.assertEquals(data, elector.getActiveData()); Assert.assertEquals(data, elector.getActiveData());
Mockito.verify(mockZK, Mockito.times(1)).getData( Mockito.verify(mockZK, Mockito.times(1)).getData(
Mockito.eq(zkLockPathName), Mockito.eq(false), Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject()); Mockito.<Stat> anyObject());
// active does not exist // active does not exist
Mockito.when( Mockito.when(
mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false), mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject())).thenThrow( Mockito.<Stat> anyObject())).thenThrow(
new KeeperException.NoNodeException()); new KeeperException.NoNodeException());
try { try {
@ -505,23 +562,65 @@ public class TestActiveStandbyElector {
Assert.fail("ActiveNotFoundException expected"); Assert.fail("ActiveNotFoundException expected");
} catch(ActiveNotFoundException e) { } catch(ActiveNotFoundException e) {
Mockito.verify(mockZK, Mockito.times(2)).getData( Mockito.verify(mockZK, Mockito.times(2)).getData(
Mockito.eq(zkLockPathName), Mockito.eq(false), Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject()); Mockito.<Stat> anyObject());
} }
// error getting active data rethrows keeperexception // error getting active data rethrows keeperexception
try { try {
Mockito.when( Mockito.when(
mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false), mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject())).thenThrow( Mockito.<Stat> anyObject())).thenThrow(
new KeeperException.AuthFailedException()); new KeeperException.AuthFailedException());
elector.getActiveData(); elector.getActiveData();
Assert.fail("KeeperException.AuthFailedException expected"); Assert.fail("KeeperException.AuthFailedException expected");
} catch(KeeperException.AuthFailedException ke) { } catch(KeeperException.AuthFailedException ke) {
Mockito.verify(mockZK, Mockito.times(3)).getData( Mockito.verify(mockZK, Mockito.times(3)).getData(
Mockito.eq(zkLockPathName), Mockito.eq(false), Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false),
Mockito.<Stat> anyObject()); Mockito.<Stat> anyObject());
} }
} }
/**
* Test that ensureBaseNode() recursively creates the specified dir
*/
@Test
public void testEnsureBaseNode() throws Exception {
elector.ensureParentZNode();
StringBuilder prefix = new StringBuilder();
for (String part : ZK_PARENT_NAME.split("/")) {
if (part.isEmpty()) continue;
prefix.append("/").append(part);
if (!"/".equals(prefix.toString())) {
Mockito.verify(mockZK).create(
Mockito.eq(prefix.toString()), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
}
}
}
/**
* Test for a bug encountered during development of HADOOP-8163:
* ensureBaseNode() should throw an exception if it has to retry
* more than 3 times to create any part of the path.
*/
@Test
public void testEnsureBaseNodeFails() throws Exception {
Mockito.doThrow(new KeeperException.ConnectionLossException())
.when(mockZK).create(
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
try {
elector.ensureParentZNode();
Assert.fail("Did not throw!");
} catch (IOException ioe) {
if (!(ioe.getCause() instanceof KeeperException.ConnectionLossException)) {
throw ioe;
}
}
// Should have tried three times
Mockito.verify(mockZK, Mockito.times(3)).create(
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
}
} }

View File

@ -18,19 +18,24 @@
package org.apache.hadoop.ha; package org.apache.hadoop.ha;
import static org.junit.Assert.*;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.zookeeper.CreateMode; import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.zookeeper.KeeperException; import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
import org.apache.log4j.Level;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase;
/** /**
@ -39,7 +44,17 @@ import org.apache.zookeeper.test.ClientBase;
public class TestActiveStandbyElectorRealZK extends ClientBase { public class TestActiveStandbyElectorRealZK extends ClientBase {
static final int NUM_ELECTORS = 2; static final int NUM_ELECTORS = 2;
static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS]; static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS];
static int currentClientIndex = 0;
static {
((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(
Level.ALL);
}
int activeIndex = -1;
int standbyIndex = -1;
static final String PARENT_DIR = "/" + UUID.randomUUID();
ActiveStandbyElector[] electors = new ActiveStandbyElector[NUM_ELECTORS];
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
@ -48,20 +63,6 @@ public class TestActiveStandbyElectorRealZK extends ClientBase {
super.setUp(); super.setUp();
} }
class ActiveStandbyElectorTesterRealZK extends ActiveStandbyElector {
ActiveStandbyElectorTesterRealZK(String hostPort, int timeout,
String parent, List<ACL> acl, ActiveStandbyElectorCallback app)
throws IOException {
super(hostPort, timeout, parent, acl, app);
}
@Override
public ZooKeeper getNewZooKeeper() {
return TestActiveStandbyElectorRealZK.zkClient[
TestActiveStandbyElectorRealZK.currentClientIndex];
}
}
/** /**
* The class object runs on a thread and waits for a signal to start from the * The class object runs on a thread and waits for a signal to start from the
* test object. On getting the signal it joins the election and thus by doing * test object. On getting the signal it joins the election and thus by doing
@ -71,71 +72,48 @@ public class TestActiveStandbyElectorRealZK extends ClientBase {
* an unexpected fatal error. this lets another thread object to become a * an unexpected fatal error. this lets another thread object to become a
* leader. * leader.
*/ */
class ThreadRunner implements Runnable, ActiveStandbyElectorCallback { class ThreadRunner extends TestingThread
implements ActiveStandbyElectorCallback {
int index; int index;
TestActiveStandbyElectorRealZK test;
boolean wait = true; CountDownLatch hasBecomeActive = new CountDownLatch(1);
ThreadRunner(int i, TestActiveStandbyElectorRealZK s) { ThreadRunner(TestContext ctx,
index = i; int idx) {
test = s; super(ctx);
index = idx;
} }
@Override @Override
public void run() { public void doWork() throws Exception {
LOG.info("starting " + index); LOG.info("starting " + index);
while(true) {
synchronized (test) {
// wait for test start signal to come
if (!test.start) {
try {
test.wait();
} catch(InterruptedException e) {
Assert.fail(e.getMessage());
}
} else {
break;
}
}
}
// join election // join election
byte[] data = new byte[8]; byte[] data = new byte[1];
ActiveStandbyElector elector = test.elector[index]; data[0] = (byte)index;
ActiveStandbyElector elector = electors[index];
LOG.info("joining " + index); LOG.info("joining " + index);
elector.joinElection(data); elector.joinElection(data);
try {
while(true) { hasBecomeActive.await(30, TimeUnit.SECONDS);
synchronized (this) { Thread.sleep(1000);
// wait for elector to become active/fatal error
if (wait) { // quit election to allow other elector to become active
// wait to become active elector.quitElection(true);
// wait capped at 30s to prevent hung test
wait(30000);
} else {
break;
}
}
}
Thread.sleep(1000);
// quit election to allow other elector to become active
elector.quitElection();
} catch(InterruptedException e) {
Assert.fail(e.getMessage());
}
LOG.info("ending " + index); LOG.info("ending " + index);
} }
@Override @Override
public synchronized void becomeActive() { public synchronized void becomeActive() {
test.reportActive(index); reportActive(index);
LOG.info("active " + index); LOG.info("active " + index);
wait = false; hasBecomeActive.countDown();
notifyAll();
} }
@Override @Override
public synchronized void becomeStandby() { public synchronized void becomeStandby() {
test.reportStandby(index); reportStandby(index);
LOG.info("standby " + index); LOG.info("standby " + index);
} }
@ -147,20 +125,17 @@ public class TestActiveStandbyElectorRealZK extends ClientBase {
@Override @Override
public synchronized void notifyFatalError(String errorMessage) { public synchronized void notifyFatalError(String errorMessage) {
LOG.info("fatal " + index + " .Error message:" + errorMessage); LOG.info("fatal " + index + " .Error message:" + errorMessage);
wait = false; this.interrupt();
notifyAll(); }
@Override
public void fenceOldActive(byte[] data) {
LOG.info("fenceOldActive " + index);
// should not fence itself
Assert.assertTrue(index != data[0]);
} }
} }
boolean start = false;
int activeIndex = -1;
int standbyIndex = -1;
String parentDir = "/" + java.util.UUID.randomUUID().toString();
ActiveStandbyElector[] elector = new ActiveStandbyElector[NUM_ELECTORS];
ThreadRunner[] threadRunner = new ThreadRunner[NUM_ELECTORS];
Thread[] thread = new Thread[NUM_ELECTORS];
synchronized void reportActive(int index) { synchronized void reportActive(int index) {
if (activeIndex == -1) { if (activeIndex == -1) {
activeIndex = index; activeIndex = index;
@ -187,45 +162,37 @@ public class TestActiveStandbyElectorRealZK extends ClientBase {
* the standby now becomes active. these electors run on different threads and * the standby now becomes active. these electors run on different threads and
* callback to the test class to report active and standby where the outcome * callback to the test class to report active and standby where the outcome
* is verified * is verified
* * @throws Exception
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/ */
@Test @Test
public void testActiveStandbyTransition() throws IOException, public void testActiveStandbyTransition() throws Exception {
InterruptedException, KeeperException { LOG.info("starting test with parentDir:" + PARENT_DIR);
LOG.info("starting test with parentDir:" + parentDir);
start = false;
byte[] data = new byte[8];
// create random working directory
createClient().create(parentDir, data, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
for(currentClientIndex = 0; TestContext ctx = new TestContext();
currentClientIndex < NUM_ELECTORS;
++currentClientIndex) { for(int i = 0; i < NUM_ELECTORS; i++) {
LOG.info("creating " + currentClientIndex); LOG.info("creating " + i);
zkClient[currentClientIndex] = createClient(); final ZooKeeper zk = createClient();
threadRunner[currentClientIndex] = new ThreadRunner(currentClientIndex, assert zk != null;
this);
elector[currentClientIndex] = new ActiveStandbyElectorTesterRealZK( ThreadRunner tr = new ThreadRunner(ctx, i);
"hostPort", 1000, parentDir, Ids.OPEN_ACL_UNSAFE, electors[i] = new ActiveStandbyElector(
threadRunner[currentClientIndex]); "hostPort", 1000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE,
zkClient[currentClientIndex].register(elector[currentClientIndex]); tr) {
thread[currentClientIndex] = new Thread(threadRunner[currentClientIndex]); @Override
thread[currentClientIndex].start(); protected synchronized ZooKeeper getNewZooKeeper()
throws IOException {
return zk;
}
};
ctx.addThread(tr);
} }
synchronized (this) { assertFalse(electors[0].parentZNodeExists());
// signal threads to start electors[0].ensureParentZNode();
LOG.info("signaling threads"); assertTrue(electors[0].parentZNodeExists());
start = true;
notifyAll();
}
for(int i = 0; i < thread.length; i++) { ctx.startThreads();
thread[i].join(); ctx.stop();
}
} }
} }