From 805c1280ce2773bc61ea718723b42b09d795688f Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Sat, 24 Mar 2012 00:05:33 +0000 Subject: [PATCH] HADOOP-8163. Improve ActiveStandbyElector to provide hooks for fencing old active. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1304675 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../hadoop/ha/ActiveStandbyElector.java | 260 +++++++++++-- .../hadoop/ha/TestActiveStandbyElector.java | 361 +++++++++++------- .../ha/TestActiveStandbyElectorRealZK.java | 187 ++++----- 4 files changed, 547 insertions(+), 264 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 3bc6077388a..f3fda7668c5 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -217,6 +217,9 @@ Release 0.23.3 - UNRELEASED HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet. (Sanjay Radia via szetszwo) + HADOOP-8163. Improve ActiveStandbyElector to provide hooks for + fencing old active. (todd) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java index 7da2d3e1bfd..56cba81139e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ha; import java.io.IOException; +import java.util.Arrays; import java.util.List; import org.apache.commons.logging.Log; @@ -26,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; @@ -37,6 +39,7 @@ import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.KeeperException.Code; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; /** * @@ -106,6 +109,15 @@ public class ActiveStandbyElector implements Watcher, StringCallback, * called to notify the app about it. */ void notifyFatalError(String errorMessage); + + /** + * If an old active has failed, rather than exited gracefully, then + * the new active may need to take some fencing actions against it + * before proceeding with failover. + * + * @param oldActiveData the application data provided by the prior active + */ + void fenceOldActive(byte[] oldActiveData); } /** @@ -113,7 +125,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback, * classes */ @VisibleForTesting - protected static final String LOCKFILENAME = "ActiveStandbyElectorLock"; + protected static final String LOCK_FILENAME = "ActiveStandbyElectorLock"; + @VisibleForTesting + protected static final String BREADCRUMB_FILENAME = "ActiveBreadCrumb"; public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); @@ -139,6 +153,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, private final List zkAcl; private byte[] appData; private final String zkLockFilePath; + private final String zkBreadCrumbPath; private final String znodeWorkingDir; /** @@ -182,7 +197,8 @@ public class ActiveStandbyElector implements Watcher, StringCallback, zkAcl = acl; appClient = app; znodeWorkingDir = parentZnodeName; - zkLockFilePath = znodeWorkingDir + "/" + LOCKFILENAME; + zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME; + zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME; // createConnection for future API calls createConnection(); @@ -204,6 +220,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, */ public synchronized void joinElection(byte[] data) throws HadoopIllegalArgumentException { + LOG.debug("Attempting active election"); if (data == null) { @@ -215,6 +232,49 @@ public class ActiveStandbyElector implements Watcher, StringCallback, joinElectionInternal(); } + + /** + * @return true if the configured parent znode exists + */ + public synchronized boolean parentZNodeExists() + throws IOException, InterruptedException { + Preconditions.checkState(zkClient != null); + try { + return zkClient.exists(znodeWorkingDir, false) != null; + } catch (KeeperException e) { + throw new IOException("Couldn't determine existence of znode '" + + znodeWorkingDir + "'", e); + } + } + + /** + * Utility function to ensure that the configured base znode exists. + * This recursively creates the znode as well as all of its parents. + */ + public synchronized void ensureParentZNode() + throws IOException, InterruptedException { + String pathParts[] = znodeWorkingDir.split("/"); + Preconditions.checkArgument(pathParts.length >= 1 && + "".equals(pathParts[0]), + "Invalid path: %s", znodeWorkingDir); + + StringBuilder sb = new StringBuilder(); + for (int i = 1; i < pathParts.length; i++) { + sb.append("/").append(pathParts[i]); + String prefixPath = sb.toString(); + LOG.debug("Ensuring existence of " + prefixPath); + try { + createWithRetries(prefixPath, new byte[]{}, zkAcl, CreateMode.PERSISTENT); + } catch (KeeperException e) { + if (isNodeExists(e.code())) { + // This is OK - just ensuring existence. + continue; + } else { + throw new IOException("Couldn't create " + prefixPath, e); + } + } + } + } /** * Any service instance can drop out of the election by calling quitElection. @@ -225,9 +285,17 @@ public class ActiveStandbyElector implements Watcher, StringCallback, * call joinElection().
* This allows service instances to take themselves out of rotation for known * impending unavailable states (e.g. long GC pause or software upgrade). + * + * @param needFence true if the underlying daemon may need to be fenced + * if a failover occurs due to dropping out of the election. */ - public synchronized void quitElection() { + public synchronized void quitElection(boolean needFence) { LOG.debug("Yielding from election"); + if (!needFence && state == State.ACTIVE) { + // If active is gracefully going back to standby mode, remove + // our permanent znode so no one fences us. + tryDeleteOwnBreadCrumbNode(); + } reset(); } @@ -260,7 +328,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, return zkClient.getData(zkLockFilePath, false, stat); } catch(KeeperException e) { Code code = e.code(); - if (operationNodeDoesNotExist(code)) { + if (isNodeDoesNotExist(code)) { // handle the commonly expected cases that make sense for us throw new ActiveNotFoundException(); } else { @@ -284,14 +352,14 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } Code code = Code.get(rc); - if (operationSuccess(code)) { + if (isSuccess(code)) { // we successfully created the znode. we are the leader. start monitoring becomeActive(); monitorActiveStatus(); return; } - if (operationNodeExists(code)) { + if (isNodeExists(code)) { if (createRetryCount == 0) { // znode exists and we did not retry the operation. so a different // instance has created it. become standby and monitor lock. @@ -306,14 +374,14 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } String errorMessage = "Received create error from Zookeeper. code:" - + code.toString(); + + code.toString() + " for path " + path; LOG.debug(errorMessage); - if (operationRetry(code)) { + if (shouldRetry(code)) { if (createRetryCount < NUM_RETRIES) { LOG.debug("Retrying createNode createRetryCount: " + createRetryCount); ++createRetryCount; - createNode(); + createLockNodeAsync(); return; } errorMessage = errorMessage @@ -338,7 +406,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } Code code = Code.get(rc); - if (operationSuccess(code)) { + if (isSuccess(code)) { // the following owner check completes verification in case the lock znode // creation was retried if (stat.getEphemeralOwner() == zkClient.getSessionId()) { @@ -352,7 +420,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, return; } - if (operationNodeDoesNotExist(code)) { + if (isNodeDoesNotExist(code)) { // the lock znode disappeared before we started monitoring it enterNeutralMode(); joinElectionInternal(); @@ -363,10 +431,10 @@ public class ActiveStandbyElector implements Watcher, StringCallback, + code.toString(); LOG.debug(errorMessage); - if (operationRetry(code)) { + if (shouldRetry(code)) { if (statRetryCount < NUM_RETRIES) { ++statRetryCount; - monitorNode(); + monitorLockNodeAsync(); return; } errorMessage = errorMessage @@ -470,7 +538,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, private void monitorActiveStatus() { LOG.debug("Monitoring active leader"); statRetryCount = 0; - monitorNode(); + monitorLockNodeAsync(); } private void joinElectionInternal() { @@ -482,7 +550,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } createRetryCount = 0; - createNode(); + createLockNodeAsync(); } private void reJoinElection() { @@ -515,7 +583,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, private void createConnection() throws IOException { zkClient = getNewZooKeeper(); } - + private void terminateConnection() { if (zkClient == null) { return; @@ -538,12 +606,110 @@ public class ActiveStandbyElector implements Watcher, StringCallback, private void becomeActive() { if (state != State.ACTIVE) { + try { + Stat oldBreadcrumbStat = fenceOldActive(); + writeBreadCrumbNode(oldBreadcrumbStat); + } catch (Exception e) { + LOG.warn("Exception handling the winning of election", e); + reJoinElection(); + return; + } LOG.debug("Becoming active"); state = State.ACTIVE; appClient.becomeActive(); } } + /** + * Write the "ActiveBreadCrumb" node, indicating that this node may need + * to be fenced on failover. + * @param oldBreadcrumbStat + */ + private void writeBreadCrumbNode(Stat oldBreadcrumbStat) + throws KeeperException, InterruptedException { + LOG.info("Writing znode " + zkBreadCrumbPath + + " to indicate that the local node is the most recent active..."); + if (oldBreadcrumbStat == null) { + // No previous active, just create the node + createWithRetries(zkBreadCrumbPath, appData, zkAcl, + CreateMode.PERSISTENT); + } else { + // There was a previous active, update the node + setDataWithRetries(zkBreadCrumbPath, appData, oldBreadcrumbStat.getVersion()); + } + } + + /** + * Try to delete the "ActiveBreadCrumb" node when gracefully giving up + * active status. + * If this fails, it will simply warn, since the graceful release behavior + * is only an optimization. + */ + private void tryDeleteOwnBreadCrumbNode() { + assert state == State.ACTIVE; + LOG.info("Deleting bread-crumb of active node..."); + + // Sanity check the data. This shouldn't be strictly necessary, + // but better to play it safe. + Stat stat = new Stat(); + byte[] data = null; + try { + data = zkClient.getData(zkBreadCrumbPath, false, stat); + + if (!Arrays.equals(data, appData)) { + throw new IllegalStateException( + "We thought we were active, but in fact " + + "the active znode had the wrong data: " + + StringUtils.byteToHexString(data) + " (stat=" + stat + ")"); + } + + deleteWithRetries(zkBreadCrumbPath, stat.getVersion()); + } catch (Exception e) { + LOG.warn("Unable to delete our own bread-crumb of being active at " + + zkBreadCrumbPath + ": " + e.getLocalizedMessage() + ". " + + "Expecting to be fenced by the next active."); + } + } + + /** + * If there is a breadcrumb node indicating that another node may need + * fencing, try to fence that node. + * @return the Stat of the breadcrumb node that was read, or null + * if no breadcrumb node existed + */ + private Stat fenceOldActive() throws InterruptedException, KeeperException { + final Stat stat = new Stat(); + byte[] data; + LOG.info("Checking for any old active which needs to be fenced..."); + try { + data = zkDoWithRetries(new ZKAction() { + @Override + public byte[] run() throws KeeperException, InterruptedException { + return zkClient.getData(zkBreadCrumbPath, false, stat); + } + }); + } catch (KeeperException ke) { + if (isNodeDoesNotExist(ke.code())) { + LOG.info("No old node to fence"); + return null; + } + + // If we failed to read for any other reason, then likely we lost + // our session, or we don't have permissions, etc. In any case, + // we probably shouldn't become active, and failing the whole + // thing is the best bet. + throw ke; + } + + LOG.info("Old node exists: " + StringUtils.byteToHexString(data)); + if (Arrays.equals(data, appData)) { + LOG.info("But old node has our own data, so don't need to fence it."); + } else { + appClient.fenceOldActive(data); + } + return stat; + } + private void becomeStandby() { if (state != State.STANDBY) { LOG.debug("Becoming standby"); @@ -560,28 +726,76 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } } - private void createNode() { + private void createLockNodeAsync() { zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, null); } - private void monitorNode() { - zkClient.exists(zkLockFilePath, true, this, null); + private void monitorLockNodeAsync() { + zkClient.exists(zkLockFilePath, this, this, null); } - private boolean operationSuccess(Code code) { + private String createWithRetries(final String path, final byte[] data, + final List acl, final CreateMode mode) + throws InterruptedException, KeeperException { + return zkDoWithRetries(new ZKAction() { + public String run() throws KeeperException, InterruptedException { + return zkClient.create(path, data, acl, mode); + } + }); + } + + private Stat setDataWithRetries(final String path, final byte[] data, + final int version) throws InterruptedException, KeeperException { + return zkDoWithRetries(new ZKAction() { + public Stat run() throws KeeperException, InterruptedException { + return zkClient.setData(path, data, version); + } + }); + } + + private void deleteWithRetries(final String path, final int version) + throws KeeperException, InterruptedException { + zkDoWithRetries(new ZKAction() { + public Void run() throws KeeperException, InterruptedException { + zkClient.delete(path, version); + return null; + } + }); + } + + private static T zkDoWithRetries(ZKAction action) + throws KeeperException, InterruptedException { + int retry = 0; + while (true) { + try { + return action.run(); + } catch (KeeperException ke) { + if (shouldRetry(ke.code()) && ++retry < NUM_RETRIES) { + continue; + } + throw ke; + } + } + } + + private interface ZKAction { + T run() throws KeeperException, InterruptedException; + } + + private static boolean isSuccess(Code code) { return (code == Code.OK); } - private boolean operationNodeExists(Code code) { + private static boolean isNodeExists(Code code) { return (code == Code.NODEEXISTS); } - private boolean operationNodeDoesNotExist(Code code) { + private static boolean isNodeDoesNotExist(Code code) { return (code == Code.NONODE); } - private boolean operationRetry(Code code) { + private static boolean shouldRetry(Code code) { switch (code) { case CONNECTIONLOSS: case OPERATIONTIMEOUT: diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java index fec350d3bc0..f3b551ad834 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java @@ -42,12 +42,12 @@ import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; public class TestActiveStandbyElector { - static ZooKeeper mockZK; - static int count; - static ActiveStandbyElectorCallback mockApp; - static final byte[] data = new byte[8]; + private ZooKeeper mockZK; + private int count; + private ActiveStandbyElectorCallback mockApp; + private final byte[] data = new byte[8]; - ActiveStandbyElectorTester elector; + private ActiveStandbyElectorTester elector; class ActiveStandbyElectorTester extends ActiveStandbyElector { ActiveStandbyElectorTester(String hostPort, int timeout, String parent, @@ -57,25 +57,45 @@ public class TestActiveStandbyElector { @Override public ZooKeeper getNewZooKeeper() { - ++TestActiveStandbyElector.count; - return TestActiveStandbyElector.mockZK; + ++count; + return mockZK; } - } - private static final String zkParentName = "/zookeeper"; - private static final String zkLockPathName = "/zookeeper/" - + ActiveStandbyElector.LOCKFILENAME; + private static final String ZK_PARENT_NAME = "/parent/node"; + private static final String ZK_LOCK_NAME = ZK_PARENT_NAME + "/" + + ActiveStandbyElector.LOCK_FILENAME; + private static final String ZK_BREADCRUMB_NAME = ZK_PARENT_NAME + "/" + + ActiveStandbyElector.BREADCRUMB_FILENAME; @Before public void init() throws IOException { count = 0; mockZK = Mockito.mock(ZooKeeper.class); mockApp = Mockito.mock(ActiveStandbyElectorCallback.class); - elector = new ActiveStandbyElectorTester("hostPort", 1000, zkParentName, + elector = new ActiveStandbyElectorTester("hostPort", 1000, ZK_PARENT_NAME, Ids.OPEN_ACL_UNSAFE, mockApp); } + /** + * Set up the mock ZK to return no info for a prior active in ZK. + */ + private void mockNoPriorActive() throws Exception { + Mockito.doThrow(new KeeperException.NoNodeException()).when(mockZK) + .getData(Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.anyBoolean(), + Mockito.any()); + } + + /** + * Set up the mock to return info for some prior active node in ZK./ + */ + private void mockPriorActive(byte[] data) throws Exception { + Mockito.doReturn(data).when(mockZK) + .getData(Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.anyBoolean(), + Mockito.any()); + } + + /** * verify that joinElection checks for null data */ @@ -90,7 +110,7 @@ public class TestActiveStandbyElector { @Test public void testJoinElection() { elector.joinElection(data); - Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data, + Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); } @@ -99,30 +119,74 @@ public class TestActiveStandbyElector { * started */ @Test - public void testCreateNodeResultBecomeActive() { + public void testCreateNodeResultBecomeActive() throws Exception { + mockNoPriorActive(); + elector.joinElection(data); - elector.processResult(Code.OK.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(1); // monitor callback verifies the leader is ephemeral owner of lock but does // not call becomeActive since its already active Stat stat = new Stat(); stat.setEphemeralOwner(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L); - elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat); // should not call neutral mode/standby/active Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); // another joinElection not called. - Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data, + Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); // no new monitor called - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(1); + } + + /** + * Verify that, if there is a record of a prior active node, the + * elector asks the application to fence it before becoming active. + */ + @Test + public void testFencesOldActive() throws Exception { + byte[] fakeOldActiveData = new byte[0]; + mockPriorActive(fakeOldActiveData); + + elector.joinElection(data); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + // Application fences active. + Mockito.verify(mockApp, Mockito.times(1)).fenceOldActive( + fakeOldActiveData); + // Updates breadcrumb node to new data + Mockito.verify(mockZK, Mockito.times(1)).setData( + Mockito.eq(ZK_BREADCRUMB_NAME), + Mockito.eq(data), + Mockito.eq(0)); + // Then it becomes active itself + Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); + } + + @Test + public void testQuitElectionRemovesBreadcrumbNode() throws Exception { + mockNoPriorActive(); + elector.joinElection(data); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + // Writes its own active info + Mockito.verify(mockZK, Mockito.times(1)).create( + Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(data), + Mockito.eq(Ids.OPEN_ACL_UNSAFE), + Mockito.eq(CreateMode.PERSISTENT)); + mockPriorActive(data); + + elector.quitElection(false); + + // Deletes its own active data + Mockito.verify(mockZK, Mockito.times(1)).delete( + Mockito.eq(ZK_BREADCRUMB_NAME), Mockito.eq(0)); } /** @@ -133,11 +197,10 @@ public class TestActiveStandbyElector { public void testCreateNodeResultBecomeStandby() { elector.joinElection(data); - elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(1); } /** @@ -147,10 +210,11 @@ public class TestActiveStandbyElector { public void testCreateNodeResultError() { elector.joinElection(data); - elector.processResult(Code.APIERROR.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( - "Received create error from Zookeeper. code:APIERROR"); + "Received create error from Zookeeper. code:APIERROR " + + "for path " + ZK_LOCK_NAME); } /** @@ -158,42 +222,43 @@ public class TestActiveStandbyElector { * becomes active if they match. monitoring is started. */ @Test - public void testCreateNodeResultRetryBecomeActive() { + public void testCreateNodeResultRetryBecomeActive() throws Exception { + mockNoPriorActive(); + elector.joinElection(data); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, - zkLockPathName); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, - zkLockPathName); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, - zkLockPathName); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); // 4 errors results in fatalError Mockito .verify(mockApp, Mockito.times(1)) .notifyFatalError( - "Received create error from Zookeeper. code:CONNECTIONLOSS. "+ + "Received create error from Zookeeper. code:CONNECTIONLOSS " + + "for path " + ZK_LOCK_NAME + ". " + "Not retrying further znode create connection errors."); elector.joinElection(data); // recreate connection via getNewZooKeeper - Assert.assertEquals(2, TestActiveStandbyElector.count); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, - zkLockPathName); - elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, - zkLockPathName); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + Assert.assertEquals(2, count); + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + verifyExistCall(1); Stat stat = new Stat(); stat.setEphemeralOwner(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L); - elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); - Mockito.verify(mockZK, Mockito.times(6)).create(zkLockPathName, data, + verifyExistCall(1); + Mockito.verify(mockZK, Mockito.times(6)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); } @@ -205,20 +270,18 @@ public class TestActiveStandbyElector { public void testCreateNodeResultRetryBecomeStandby() { elector.joinElection(data); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, - zkLockPathName); - elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, - zkLockPathName); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + verifyExistCall(1); Stat stat = new Stat(); stat.setEphemeralOwner(0); Mockito.when(mockZK.getSessionId()).thenReturn(1L); - elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(1); } /** @@ -230,19 +293,18 @@ public class TestActiveStandbyElector { public void testCreateNodeResultRetryNoNode() { elector.joinElection(data); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, - zkLockPathName); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, - zkLockPathName); - elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, - zkLockPathName); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); + verifyExistCall(1); - elector.processResult(Code.NONODE.intValue(), zkLockPathName, null, + elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, null, (Stat) null); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); - Mockito.verify(mockZK, Mockito.times(4)).create(zkLockPathName, data, + Mockito.verify(mockZK, Mockito.times(4)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); } @@ -251,13 +313,13 @@ public class TestActiveStandbyElector { */ @Test public void testStatNodeRetry() { - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, (Stat) null); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, (Stat) null); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, (Stat) null); - elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, (Stat) null); Mockito .verify(mockApp, Mockito.times(1)) @@ -271,7 +333,7 @@ public class TestActiveStandbyElector { */ @Test public void testStatNodeError() { - elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), zkLockPathName, + elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME, null, (Stat) null); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( @@ -282,7 +344,8 @@ public class TestActiveStandbyElector { * verify behavior of watcher.process callback with non-node event */ @Test - public void testProcessCallbackEventNone() { + public void testProcessCallbackEventNone() throws Exception { + mockNoPriorActive(); elector.joinElection(data); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); @@ -306,8 +369,7 @@ public class TestActiveStandbyElector { Mockito.when(mockEvent.getState()).thenReturn( Event.KeeperState.SyncConnected); elector.process(mockEvent); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(1); // session expired should enter safe mode and initiate re-election // re-election checked via checking re-creation of new zookeeper and @@ -318,17 +380,16 @@ public class TestActiveStandbyElector { Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); // called getNewZooKeeper to create new session. first call was in // constructor - Assert.assertEquals(2, TestActiveStandbyElector.count); + Assert.assertEquals(2, count); // once in initial joinElection and one now - Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data, + Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); // create znode success. become master and monitor - elector.processResult(Code.OK.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); - Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(2); // error event results in fatal error Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed); @@ -343,32 +404,30 @@ public class TestActiveStandbyElector { * verify behavior of watcher.process with node event */ @Test - public void testProcessCallbackEventNode() { + public void testProcessCallbackEventNode() throws Exception { + mockNoPriorActive(); elector.joinElection(data); // make the object go into the monitoring state - elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(1); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); - Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName); + Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME); // monitoring should be setup again after event is received Mockito.when(mockEvent.getType()).thenReturn( Event.EventType.NodeDataChanged); elector.process(mockEvent); - Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(2); // monitoring should be setup again after event is received Mockito.when(mockEvent.getType()).thenReturn( Event.EventType.NodeChildrenChanged); elector.process(mockEvent); - Mockito.verify(mockZK, Mockito.times(3)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(3); // lock node deletion when in standby mode should create znode again // successful znode creation enters active state and sets monitor @@ -377,13 +436,12 @@ public class TestActiveStandbyElector { // enterNeutralMode not called when app is standby and leader is lost Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); // once in initial joinElection() and one now - Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data, + Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); - elector.processResult(Code.OK.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); - Mockito.verify(mockZK, Mockito.times(4)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(4); // lock node deletion in active mode should enter neutral mode and create // znode again successful znode creation enters active state and sets @@ -392,13 +450,12 @@ public class TestActiveStandbyElector { elector.process(mockEvent); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); // another joinElection called - Mockito.verify(mockZK, Mockito.times(3)).create(zkLockPathName, data, + Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); - elector.processResult(Code.OK.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(2)).becomeActive(); - Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(5); // bad path name results in fatal error Mockito.when(mockEvent.getPath()).thenReturn(null); @@ -406,13 +463,17 @@ public class TestActiveStandbyElector { Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( "Unexpected watch error from Zookeeper"); // fatal error means no new connection other than one from constructor - Assert.assertEquals(1, TestActiveStandbyElector.count); + Assert.assertEquals(1, count); // no new watches after fatal error - Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(5); } + private void verifyExistCall(int times) { + Mockito.verify(mockZK, Mockito.times(times)).exists( + ZK_LOCK_NAME, elector, elector, null); + } + /** * verify becomeStandby is not called if already in standby */ @@ -421,14 +482,13 @@ public class TestActiveStandbyElector { elector.joinElection(data); // make the object go into the monitoring standby state - elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(1); WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); - Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName); + Mockito.when(mockEvent.getPath()).thenReturn(ZK_LOCK_NAME); // notify node deletion // monitoring should be setup again after event is received @@ -437,16 +497,15 @@ public class TestActiveStandbyElector { // is standby. no need to notify anything now Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); // another joinElection called. - Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data, + Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); // lost election - elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, - zkLockPathName); + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); // still standby. so no need to notify again Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); // monitor is set again - Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(2); } /** @@ -454,22 +513,20 @@ public class TestActiveStandbyElector { * next call to joinElection creates new connection and performs election */ @Test - public void testQuitElection() throws InterruptedException { - elector.quitElection(); + public void testQuitElection() throws Exception { + elector.quitElection(true); Mockito.verify(mockZK, Mockito.times(1)).close(); // no watches added - Mockito.verify(mockZK, Mockito.times(0)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(0); byte[] data = new byte[8]; elector.joinElection(data); // getNewZooKeeper called 2 times. once in constructor and once now - Assert.assertEquals(2, TestActiveStandbyElector.count); - elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, - zkLockPathName); + Assert.assertEquals(2, count); + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); - Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, - elector, null); + verifyExistCall(1); } @@ -488,16 +545,16 @@ public class TestActiveStandbyElector { // get valid active data byte[] data = new byte[8]; Mockito.when( - mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false), + mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false), Mockito. anyObject())).thenReturn(data); Assert.assertEquals(data, elector.getActiveData()); Mockito.verify(mockZK, Mockito.times(1)).getData( - Mockito.eq(zkLockPathName), Mockito.eq(false), + Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false), Mockito. anyObject()); // active does not exist Mockito.when( - mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false), + mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false), Mockito. anyObject())).thenThrow( new KeeperException.NoNodeException()); try { @@ -505,23 +562,65 @@ public class TestActiveStandbyElector { Assert.fail("ActiveNotFoundException expected"); } catch(ActiveNotFoundException e) { Mockito.verify(mockZK, Mockito.times(2)).getData( - Mockito.eq(zkLockPathName), Mockito.eq(false), + Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false), Mockito. anyObject()); } // error getting active data rethrows keeperexception try { Mockito.when( - mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false), + mockZK.getData(Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false), Mockito. anyObject())).thenThrow( new KeeperException.AuthFailedException()); elector.getActiveData(); Assert.fail("KeeperException.AuthFailedException expected"); } catch(KeeperException.AuthFailedException ke) { Mockito.verify(mockZK, Mockito.times(3)).getData( - Mockito.eq(zkLockPathName), Mockito.eq(false), + Mockito.eq(ZK_LOCK_NAME), Mockito.eq(false), Mockito. anyObject()); } } + /** + * Test that ensureBaseNode() recursively creates the specified dir + */ + @Test + public void testEnsureBaseNode() throws Exception { + elector.ensureParentZNode(); + StringBuilder prefix = new StringBuilder(); + for (String part : ZK_PARENT_NAME.split("/")) { + if (part.isEmpty()) continue; + prefix.append("/").append(part); + if (!"/".equals(prefix.toString())) { + Mockito.verify(mockZK).create( + Mockito.eq(prefix.toString()), Mockito.any(), + Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT)); + } + } + } + + /** + * Test for a bug encountered during development of HADOOP-8163: + * ensureBaseNode() should throw an exception if it has to retry + * more than 3 times to create any part of the path. + */ + @Test + public void testEnsureBaseNodeFails() throws Exception { + Mockito.doThrow(new KeeperException.ConnectionLossException()) + .when(mockZK).create( + Mockito.eq(ZK_PARENT_NAME), Mockito.any(), + Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT)); + try { + elector.ensureParentZNode(); + Assert.fail("Did not throw!"); + } catch (IOException ioe) { + if (!(ioe.getCause() instanceof KeeperException.ConnectionLossException)) { + throw ioe; + } + } + // Should have tried three times + Mockito.verify(mockZK, Mockito.times(3)).create( + Mockito.eq(ZK_PARENT_NAME), Mockito.any(), + Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT)); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java index 672e8d30d1e..bc375e40602 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java @@ -18,19 +18,24 @@ package org.apache.hadoop.ha; +import static org.junit.Assert.*; + import java.io.File; import java.io.IOException; -import java.util.List; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.Assert; import org.junit.Test; +import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; -import org.apache.zookeeper.CreateMode; -import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread; +import org.apache.log4j.Level; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; -import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.test.ClientBase; /** @@ -39,7 +44,17 @@ import org.apache.zookeeper.test.ClientBase; public class TestActiveStandbyElectorRealZK extends ClientBase { static final int NUM_ELECTORS = 2; static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS]; - static int currentClientIndex = 0; + + static { + ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel( + Level.ALL); + } + + int activeIndex = -1; + int standbyIndex = -1; + static final String PARENT_DIR = "/" + UUID.randomUUID(); + + ActiveStandbyElector[] electors = new ActiveStandbyElector[NUM_ELECTORS]; @Override public void setUp() throws Exception { @@ -48,20 +63,6 @@ public class TestActiveStandbyElectorRealZK extends ClientBase { super.setUp(); } - class ActiveStandbyElectorTesterRealZK extends ActiveStandbyElector { - ActiveStandbyElectorTesterRealZK(String hostPort, int timeout, - String parent, List acl, ActiveStandbyElectorCallback app) - throws IOException { - super(hostPort, timeout, parent, acl, app); - } - - @Override - public ZooKeeper getNewZooKeeper() { - return TestActiveStandbyElectorRealZK.zkClient[ - TestActiveStandbyElectorRealZK.currentClientIndex]; - } - } - /** * The class object runs on a thread and waits for a signal to start from the * test object. On getting the signal it joins the election and thus by doing @@ -71,71 +72,48 @@ public class TestActiveStandbyElectorRealZK extends ClientBase { * an unexpected fatal error. this lets another thread object to become a * leader. */ - class ThreadRunner implements Runnable, ActiveStandbyElectorCallback { + class ThreadRunner extends TestingThread + implements ActiveStandbyElectorCallback { int index; - TestActiveStandbyElectorRealZK test; - boolean wait = true; + + CountDownLatch hasBecomeActive = new CountDownLatch(1); - ThreadRunner(int i, TestActiveStandbyElectorRealZK s) { - index = i; - test = s; + ThreadRunner(TestContext ctx, + int idx) { + super(ctx); + index = idx; } @Override - public void run() { + public void doWork() throws Exception { LOG.info("starting " + index); - while(true) { - synchronized (test) { - // wait for test start signal to come - if (!test.start) { - try { - test.wait(); - } catch(InterruptedException e) { - Assert.fail(e.getMessage()); - } - } else { - break; - } - } - } // join election - byte[] data = new byte[8]; - ActiveStandbyElector elector = test.elector[index]; + byte[] data = new byte[1]; + data[0] = (byte)index; + + ActiveStandbyElector elector = electors[index]; LOG.info("joining " + index); elector.joinElection(data); - try { - while(true) { - synchronized (this) { - // wait for elector to become active/fatal error - if (wait) { - // wait to become active - // wait capped at 30s to prevent hung test - wait(30000); - } else { - break; - } - } - } - Thread.sleep(1000); - // quit election to allow other elector to become active - elector.quitElection(); - } catch(InterruptedException e) { - Assert.fail(e.getMessage()); - } + + hasBecomeActive.await(30, TimeUnit.SECONDS); + Thread.sleep(1000); + + // quit election to allow other elector to become active + elector.quitElection(true); + LOG.info("ending " + index); } @Override public synchronized void becomeActive() { - test.reportActive(index); + reportActive(index); LOG.info("active " + index); - wait = false; - notifyAll(); + hasBecomeActive.countDown(); } @Override public synchronized void becomeStandby() { - test.reportStandby(index); + reportStandby(index); LOG.info("standby " + index); } @@ -147,20 +125,17 @@ public class TestActiveStandbyElectorRealZK extends ClientBase { @Override public synchronized void notifyFatalError(String errorMessage) { LOG.info("fatal " + index + " .Error message:" + errorMessage); - wait = false; - notifyAll(); + this.interrupt(); + } + + @Override + public void fenceOldActive(byte[] data) { + LOG.info("fenceOldActive " + index); + // should not fence itself + Assert.assertTrue(index != data[0]); } } - boolean start = false; - int activeIndex = -1; - int standbyIndex = -1; - String parentDir = "/" + java.util.UUID.randomUUID().toString(); - - ActiveStandbyElector[] elector = new ActiveStandbyElector[NUM_ELECTORS]; - ThreadRunner[] threadRunner = new ThreadRunner[NUM_ELECTORS]; - Thread[] thread = new Thread[NUM_ELECTORS]; - synchronized void reportActive(int index) { if (activeIndex == -1) { activeIndex = index; @@ -187,45 +162,37 @@ public class TestActiveStandbyElectorRealZK extends ClientBase { * the standby now becomes active. these electors run on different threads and * callback to the test class to report active and standby where the outcome * is verified - * - * @throws IOException - * @throws InterruptedException - * @throws KeeperException + * @throws Exception */ @Test - public void testActiveStandbyTransition() throws IOException, - InterruptedException, KeeperException { - LOG.info("starting test with parentDir:" + parentDir); - start = false; - byte[] data = new byte[8]; - // create random working directory - createClient().create(parentDir, data, Ids.OPEN_ACL_UNSAFE, - CreateMode.PERSISTENT); + public void testActiveStandbyTransition() throws Exception { + LOG.info("starting test with parentDir:" + PARENT_DIR); - for(currentClientIndex = 0; - currentClientIndex < NUM_ELECTORS; - ++currentClientIndex) { - LOG.info("creating " + currentClientIndex); - zkClient[currentClientIndex] = createClient(); - threadRunner[currentClientIndex] = new ThreadRunner(currentClientIndex, - this); - elector[currentClientIndex] = new ActiveStandbyElectorTesterRealZK( - "hostPort", 1000, parentDir, Ids.OPEN_ACL_UNSAFE, - threadRunner[currentClientIndex]); - zkClient[currentClientIndex].register(elector[currentClientIndex]); - thread[currentClientIndex] = new Thread(threadRunner[currentClientIndex]); - thread[currentClientIndex].start(); + TestContext ctx = new TestContext(); + + for(int i = 0; i < NUM_ELECTORS; i++) { + LOG.info("creating " + i); + final ZooKeeper zk = createClient(); + assert zk != null; + + ThreadRunner tr = new ThreadRunner(ctx, i); + electors[i] = new ActiveStandbyElector( + "hostPort", 1000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE, + tr) { + @Override + protected synchronized ZooKeeper getNewZooKeeper() + throws IOException { + return zk; + } + }; + ctx.addThread(tr); } - synchronized (this) { - // signal threads to start - LOG.info("signaling threads"); - start = true; - notifyAll(); - } + assertFalse(electors[0].parentZNodeExists()); + electors[0].ensureParentZNode(); + assertTrue(electors[0].parentZNodeExists()); - for(int i = 0; i < thread.length; i++) { - thread[i].join(); - } + ctx.startThreads(); + ctx.stop(); } }