HADOOP-8212. Improve ActiveStandbyElector's behavior when session expires. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1305510 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-03-26 19:31:43 +00:00
parent 1a9385d516
commit 31c2e3f29c
6 changed files with 346 additions and 208 deletions

View File

@ -223,6 +223,9 @@ Release 0.23.3 - UNRELEASED
HADOOP-8193. Refactor FailoverController/HAAdmin code to add an abstract HADOOP-8193. Refactor FailoverController/HAAdmin code to add an abstract
class for "target" services. (todd) class for "target" services. (todd)
HADOOP-8212. Improve ActiveStandbyElector's behavior when session expires
(todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.ha;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -32,6 +34,7 @@ import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.AsyncCallback.*; import org.apache.zookeeper.AsyncCallback.*;
@ -60,8 +63,7 @@ import com.google.common.base.Preconditions;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ActiveStandbyElector implements Watcher, StringCallback, public class ActiveStandbyElector implements StatCallback, StringCallback {
StatCallback {
/** /**
* Callback interface to interact with the ActiveStandbyElector object. <br/> * Callback interface to interact with the ActiveStandbyElector object. <br/>
@ -156,6 +158,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
private final String zkBreadCrumbPath; private final String zkBreadCrumbPath;
private final String znodeWorkingDir; private final String znodeWorkingDir;
private Lock sessionReestablishLockForTests = new ReentrantLock();
private boolean wantToBeInElection;
/** /**
* Create a new ActiveStandbyElector object <br/> * Create a new ActiveStandbyElector object <br/>
* The elector is created by providing to it the Zookeeper configuration, the * The elector is created by providing to it the Zookeeper configuration, the
@ -274,6 +279,8 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
} }
} }
LOG.info("Successfully created " + znodeWorkingDir + " in ZK.");
} }
/** /**
@ -290,13 +297,14 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
* if a failover occurs due to dropping out of the election. * if a failover occurs due to dropping out of the election.
*/ */
public synchronized void quitElection(boolean needFence) { public synchronized void quitElection(boolean needFence) {
LOG.debug("Yielding from election"); LOG.info("Yielding from election");
if (!needFence && state == State.ACTIVE) { if (!needFence && state == State.ACTIVE) {
// If active is gracefully going back to standby mode, remove // If active is gracefully going back to standby mode, remove
// our permanent znode so no one fences us. // our permanent znode so no one fences us.
tryDeleteOwnBreadCrumbNode(); tryDeleteOwnBreadCrumbNode();
} }
reset(); reset();
wantToBeInElection = false;
} }
/** /**
@ -343,13 +351,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
@Override @Override
public synchronized void processResult(int rc, String path, Object ctx, public synchronized void processResult(int rc, String path, Object ctx,
String name) { String name) {
if (isStaleClient(ctx)) return;
LOG.debug("CreateNode result: " + rc + " for path: " + path LOG.debug("CreateNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState); + " connectionState: " + zkConnectionState);
if (zkClient == null) {
// zkClient is nulled before closing the connection
// this is the callback with session expired after we closed the session
return;
}
Code code = Code.get(rc); Code code = Code.get(rc);
if (isSuccess(code)) { if (isSuccess(code)) {
@ -386,6 +390,10 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
errorMessage = errorMessage errorMessage = errorMessage
+ ". Not retrying further znode create connection errors."; + ". Not retrying further znode create connection errors.";
} else if (isSessionExpired(code)) {
// This isn't fatal - the client Watcher will re-join the election
LOG.warn("Lock acquisition failed because session was lost");
return;
} }
fatalError(errorMessage); fatalError(errorMessage);
@ -397,13 +405,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
@Override @Override
public synchronized void processResult(int rc, String path, Object ctx, public synchronized void processResult(int rc, String path, Object ctx,
Stat stat) { Stat stat) {
if (isStaleClient(ctx)) return;
LOG.debug("StatNode result: " + rc + " for path: " + path LOG.debug("StatNode result: " + rc + " for path: " + path
+ " connectionState: " + zkConnectionState); + " connectionState: " + zkConnectionState);
if (zkClient == null) {
// zkClient is nulled before closing the connection
// this is the callback with session expired after we closed the session
return;
}
Code code = Code.get(rc); Code code = Code.get(rc);
if (isSuccess(code)) { if (isSuccess(code)) {
@ -447,22 +451,18 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
/** /**
* interface implementation of Zookeeper watch events (connection and node) * interface implementation of Zookeeper watch events (connection and node)
*/ */
@Override synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
public synchronized void process(WatchedEvent event) {
Event.EventType eventType = event.getType(); Event.EventType eventType = event.getType();
if (isStaleClient(zk)) return;
LOG.debug("Watcher event type: " + eventType + " with state:" LOG.debug("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath() + event.getState() + " for path:" + event.getPath()
+ " connectionState: " + zkConnectionState); + " connectionState: " + zkConnectionState);
if (zkClient == null) {
// zkClient is nulled before closing the connection
// this is the callback with session expired after we closed the session
return;
}
if (eventType == Event.EventType.None) { if (eventType == Event.EventType.None) {
// the connection state has changed // the connection state has changed
switch (event.getState()) { switch (event.getState()) {
case SyncConnected: case SyncConnected:
LOG.info("Session connected.");
// if the listener was asked to move to safe state then it needs to // if the listener was asked to move to safe state then it needs to
// be undone // be undone
ConnectionState prevConnectionState = zkConnectionState; ConnectionState prevConnectionState = zkConnectionState;
@ -472,6 +472,8 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
break; break;
case Disconnected: case Disconnected:
LOG.info("Session disconnected. Entering neutral mode...");
// ask the app to move to safe state because zookeeper connection // ask the app to move to safe state because zookeeper connection
// is not active and we dont know our state // is not active and we dont know our state
zkConnectionState = ConnectionState.DISCONNECTED; zkConnectionState = ConnectionState.DISCONNECTED;
@ -480,6 +482,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
case Expired: case Expired:
// the connection got terminated because of session timeout // the connection got terminated because of session timeout
// call listener to reconnect // call listener to reconnect
LOG.info("Session expired. Entering neutral mode and rejoining...");
enterNeutralMode(); enterNeutralMode();
reJoinElection(); reJoinElection();
break; break;
@ -527,7 +530,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
* @throws IOException * @throws IOException
*/ */
protected synchronized ZooKeeper getNewZooKeeper() throws IOException { protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
return new ZooKeeper(zkHostPort, zkSessionTimeout, this); ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
zk.register(new WatcherWithClientRef(zk));
return zk;
} }
private void fatalError(String errorMessage) { private void fatalError(String errorMessage) {
@ -550,13 +555,42 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
createRetryCount = 0; createRetryCount = 0;
wantToBeInElection = true;
createLockNodeAsync(); createLockNodeAsync();
} }
private void reJoinElection() { private void reJoinElection() {
LOG.debug("Trying to re-establish ZK session"); LOG.info("Trying to re-establish ZK session");
terminateConnection();
joinElectionInternal(); // Some of the test cases rely on expiring the ZK sessions and
// ensuring that the other node takes over. But, there's a race
// where the original lease holder could reconnect faster than the other
// thread manages to take the lock itself. This lock allows the
// tests to block the reconnection. It's a shame that this leaked
// into non-test code, but the lock is only acquired here so will never
// be contended.
sessionReestablishLockForTests.lock();
try {
terminateConnection();
joinElectionInternal();
} finally {
sessionReestablishLockForTests.unlock();
}
}
@VisibleForTesting
void preventSessionReestablishmentForTests() {
sessionReestablishLockForTests.lock();
}
@VisibleForTesting
void allowSessionReestablishmentForTests() {
sessionReestablishLockForTests.unlock();
}
@VisibleForTesting
long getZKSessionIdForTests() {
return zkClient.getSessionId();
} }
private boolean reEstablishSession() { private boolean reEstablishSession() {
@ -605,6 +639,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
private void becomeActive() { private void becomeActive() {
assert wantToBeInElection;
if (state != State.ACTIVE) { if (state != State.ACTIVE) {
try { try {
Stat oldBreadcrumbStat = fenceOldActive(); Stat oldBreadcrumbStat = fenceOldActive();
@ -727,12 +762,14 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
private void createLockNodeAsync() { private void createLockNodeAsync() {
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL,
null); this, zkClient);
} }
private void monitorLockNodeAsync() { private void monitorLockNodeAsync() {
zkClient.exists(zkLockFilePath, this, this, null); zkClient.exists(zkLockFilePath,
new WatcherWithClientRef(zkClient), this,
zkClient);
} }
private String createWithRetries(final String path, final byte[] data, private String createWithRetries(final String path, final byte[] data,
@ -778,10 +815,47 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
} }
} }
} }
private interface ZKAction<T> { private interface ZKAction<T> {
T run() throws KeeperException, InterruptedException; T run() throws KeeperException, InterruptedException;
} }
/**
* The callbacks and watchers pass a reference to the ZK client
* which made the original call. We don't want to take action
* based on any callbacks from prior clients after we quit
* the election.
* @param ctx the ZK client passed into the watcher
* @return true if it matches the current client
*/
private synchronized boolean isStaleClient(Object ctx) {
Preconditions.checkNotNull(ctx);
if (zkClient != (ZooKeeper)ctx) {
LOG.warn("Ignoring stale result from old client with sessionId " +
String.format("0x%08x", ((ZooKeeper)ctx).getSessionId()));
return true;
}
return false;
}
/**
* Watcher implementation which keeps a reference around to the
* original ZK connection, and passes it back along with any
* events.
*/
private final class WatcherWithClientRef implements Watcher {
private final ZooKeeper zk;
private WatcherWithClientRef(ZooKeeper zk) {
this.zk = zk;
}
@Override
public void process(WatchedEvent event) {
ActiveStandbyElector.this.processWatchEvent(
zk, event);
}
}
private static boolean isSuccess(Code code) { private static boolean isSuccess(Code code) {
return (code == Code.OK); return (code == Code.OK);
@ -794,6 +868,10 @@ public class ActiveStandbyElector implements Watcher, StringCallback,
private static boolean isNodeDoesNotExist(Code code) { private static boolean isNodeDoesNotExist(Code code) {
return (code == Code.NONODE); return (code == Code.NONODE);
} }
private static boolean isSessionExpired(Code code) {
return (code == Code.SESSIONEXPIRED);
}
private static boolean shouldRetry(Code code) { private static boolean shouldRetry(Code code) {
switch (code) { switch (code) {

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import java.util.Arrays;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.ZooKeeperServer;
public abstract class ActiveStandbyElectorTestUtil {
public static void waitForActiveLockData(TestContext ctx,
ZooKeeperServer zks, String parentDir, byte[] activeData)
throws Exception {
while (true) {
if (ctx != null) {
ctx.checkException();
}
try {
Stat stat = new Stat();
byte[] data = zks.getZKDatabase().getData(
parentDir + "/" +
ActiveStandbyElector.LOCK_FILENAME, stat, null);
if (activeData != null &&
Arrays.equals(activeData, data)) {
return;
}
} catch (NoNodeException nne) {
if (activeData == null) {
return;
}
}
Thread.sleep(50);
}
}
}

View File

@ -26,6 +26,7 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.ACL;
@ -111,7 +112,7 @@ public class TestActiveStandbyElector {
public void testJoinElection() { public void testJoinElection() {
elector.joinElection(data); elector.joinElection(data);
Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data, Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
} }
/** /**
@ -123,7 +124,7 @@ public class TestActiveStandbyElector {
mockNoPriorActive(); mockNoPriorActive();
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
verifyExistCall(1); verifyExistCall(1);
@ -133,14 +134,14 @@ public class TestActiveStandbyElector {
Stat stat = new Stat(); Stat stat = new Stat();
stat.setEphemeralOwner(1L); stat.setEphemeralOwner(1L);
Mockito.when(mockZK.getSessionId()).thenReturn(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
// should not call neutral mode/standby/active // should not call neutral mode/standby/active
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
Mockito.verify(mockApp, Mockito.times(0)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(0)).becomeStandby();
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
// another joinElection not called. // another joinElection not called.
Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data, Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
// no new monitor called // no new monitor called
verifyExistCall(1); verifyExistCall(1);
} }
@ -155,7 +156,7 @@ public class TestActiveStandbyElector {
mockPriorActive(fakeOldActiveData); mockPriorActive(fakeOldActiveData);
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
// Application fences active. // Application fences active.
Mockito.verify(mockApp, Mockito.times(1)).fenceOldActive( Mockito.verify(mockApp, Mockito.times(1)).fenceOldActive(
@ -173,7 +174,7 @@ public class TestActiveStandbyElector {
public void testQuitElectionRemovesBreadcrumbNode() throws Exception { public void testQuitElectionRemovesBreadcrumbNode() throws Exception {
mockNoPriorActive(); mockNoPriorActive();
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
// Writes its own active info // Writes its own active info
Mockito.verify(mockZK, Mockito.times(1)).create( Mockito.verify(mockZK, Mockito.times(1)).create(
@ -197,7 +198,7 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultBecomeStandby() { public void testCreateNodeResultBecomeStandby() {
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1); verifyExistCall(1);
@ -210,7 +211,7 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultError() { public void testCreateNodeResultError() {
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Received create error from Zookeeper. code:APIERROR " + "Received create error from Zookeeper. code:APIERROR " +
@ -227,13 +228,13 @@ public class TestActiveStandbyElector {
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
// 4 errors results in fatalError // 4 errors results in fatalError
Mockito Mockito
@ -246,20 +247,20 @@ public class TestActiveStandbyElector {
elector.joinElection(data); elector.joinElection(data);
// recreate connection via getNewZooKeeper // recreate connection via getNewZooKeeper
Assert.assertEquals(2, count); Assert.assertEquals(2, count);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
verifyExistCall(1); verifyExistCall(1);
Stat stat = new Stat(); Stat stat = new Stat();
stat.setEphemeralOwner(1L); stat.setEphemeralOwner(1L);
Mockito.when(mockZK.getSessionId()).thenReturn(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
verifyExistCall(1); verifyExistCall(1);
Mockito.verify(mockZK, Mockito.times(6)).create(ZK_LOCK_NAME, data, Mockito.verify(mockZK, Mockito.times(6)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
} }
/** /**
@ -270,16 +271,16 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultRetryBecomeStandby() { public void testCreateNodeResultRetryBecomeStandby() {
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
verifyExistCall(1); verifyExistCall(1);
Stat stat = new Stat(); Stat stat = new Stat();
stat.setEphemeralOwner(0); stat.setEphemeralOwner(0);
Mockito.when(mockZK.getSessionId()).thenReturn(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat); elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1); verifyExistCall(1);
} }
@ -293,19 +294,19 @@ public class TestActiveStandbyElector {
public void testCreateNodeResultRetryNoNode() { public void testCreateNodeResultRetryNoNode() {
elector.joinElection(data); elector.joinElection(data);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
verifyExistCall(1); verifyExistCall(1);
elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, mockZK,
(Stat) null); (Stat) null);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
Mockito.verify(mockZK, Mockito.times(4)).create(ZK_LOCK_NAME, data, Mockito.verify(mockZK, Mockito.times(4)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
} }
/** /**
@ -313,13 +314,13 @@ public class TestActiveStandbyElector {
*/ */
@Test @Test
public void testStatNodeRetry() { public void testStatNodeRetry() {
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
(Stat) null); (Stat) null);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
(Stat) null); (Stat) null);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
(Stat) null); (Stat) null);
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
(Stat) null); (Stat) null);
Mockito Mockito
.verify(mockApp, Mockito.times(1)) .verify(mockApp, Mockito.times(1))
@ -334,7 +335,7 @@ public class TestActiveStandbyElector {
@Test @Test
public void testStatNodeError() { public void testStatNodeError() {
elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME, elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME,
null, (Stat) null); mockZK, (Stat) null);
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Received stat error from Zookeeper. code:RUNTIMEINCONSISTENCY"); "Received stat error from Zookeeper. code:RUNTIMEINCONSISTENCY");
@ -354,7 +355,7 @@ public class TestActiveStandbyElector {
// first SyncConnected should not do anything // first SyncConnected should not do anything
Mockito.when(mockEvent.getState()).thenReturn( Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.SyncConnected); Event.KeeperState.SyncConnected);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockZK, Mockito.times(0)).exists(Mockito.anyString(), Mockito.verify(mockZK, Mockito.times(0)).exists(Mockito.anyString(),
Mockito.anyBoolean(), Mockito.<AsyncCallback.StatCallback> anyObject(), Mockito.anyBoolean(), Mockito.<AsyncCallback.StatCallback> anyObject(),
Mockito.<Object> anyObject()); Mockito.<Object> anyObject());
@ -362,20 +363,20 @@ public class TestActiveStandbyElector {
// disconnection should enter safe mode // disconnection should enter safe mode
Mockito.when(mockEvent.getState()).thenReturn( Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.Disconnected); Event.KeeperState.Disconnected);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// re-connection should monitor master status // re-connection should monitor master status
Mockito.when(mockEvent.getState()).thenReturn( Mockito.when(mockEvent.getState()).thenReturn(
Event.KeeperState.SyncConnected); Event.KeeperState.SyncConnected);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(1); verifyExistCall(1);
// session expired should enter safe mode and initiate re-election // session expired should enter safe mode and initiate re-election
// re-election checked via checking re-creation of new zookeeper and // re-election checked via checking re-creation of new zookeeper and
// call to create lock znode // call to create lock znode
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired); Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
// already in safe mode above. should not enter safe mode again // already in safe mode above. should not enter safe mode again
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// called getNewZooKeeper to create new session. first call was in // called getNewZooKeeper to create new session. first call was in
@ -383,17 +384,17 @@ public class TestActiveStandbyElector {
Assert.assertEquals(2, count); Assert.assertEquals(2, count);
// once in initial joinElection and one now // once in initial joinElection and one now
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
// create znode success. become master and monitor // create znode success. become master and monitor
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
verifyExistCall(2); verifyExistCall(2);
// error event results in fatal error // error event results in fatal error
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed); Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Unexpected Zookeeper watch event state: AuthFailed"); "Unexpected Zookeeper watch event state: AuthFailed");
// only 1 state change callback is called at a time // only 1 state change callback is called at a time
@ -409,7 +410,7 @@ public class TestActiveStandbyElector {
elector.joinElection(data); elector.joinElection(data);
// make the object go into the monitoring state // make the object go into the monitoring state
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1); verifyExistCall(1);
@ -420,25 +421,25 @@ public class TestActiveStandbyElector {
// monitoring should be setup again after event is received // monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn( Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeDataChanged); Event.EventType.NodeDataChanged);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(2); verifyExistCall(2);
// monitoring should be setup again after event is received // monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn( Mockito.when(mockEvent.getType()).thenReturn(
Event.EventType.NodeChildrenChanged); Event.EventType.NodeChildrenChanged);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
verifyExistCall(3); verifyExistCall(3);
// lock node deletion when in standby mode should create znode again // lock node deletion when in standby mode should create znode again
// successful znode creation enters active state and sets monitor // successful znode creation enters active state and sets monitor
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
// enterNeutralMode not called when app is standby and leader is lost // enterNeutralMode not called when app is standby and leader is lost
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// once in initial joinElection() and one now // once in initial joinElection() and one now
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
verifyExistCall(4); verifyExistCall(4);
@ -447,19 +448,19 @@ public class TestActiveStandbyElector {
// znode again successful znode creation enters active state and sets // znode again successful znode creation enters active state and sets
// monitor // monitor
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
// another joinElection called // another joinElection called
Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data, Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(2)).becomeActive(); Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
verifyExistCall(5); verifyExistCall(5);
// bad path name results in fatal error // bad path name results in fatal error
Mockito.when(mockEvent.getPath()).thenReturn(null); Mockito.when(mockEvent.getPath()).thenReturn(null);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
"Unexpected watch error from Zookeeper"); "Unexpected watch error from Zookeeper");
// fatal error means no new connection other than one from constructor // fatal error means no new connection other than one from constructor
@ -471,7 +472,9 @@ public class TestActiveStandbyElector {
private void verifyExistCall(int times) { private void verifyExistCall(int times) {
Mockito.verify(mockZK, Mockito.times(times)).exists( Mockito.verify(mockZK, Mockito.times(times)).exists(
ZK_LOCK_NAME, elector, elector, null); Mockito.eq(ZK_LOCK_NAME), Mockito.<Watcher>any(),
Mockito.same(elector),
Mockito.same(mockZK));
} }
/** /**
@ -482,7 +485,7 @@ public class TestActiveStandbyElector {
elector.joinElection(data); elector.joinElection(data);
// make the object go into the monitoring standby state // make the object go into the monitoring standby state
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1); verifyExistCall(1);
@ -493,14 +496,14 @@ public class TestActiveStandbyElector {
// notify node deletion // notify node deletion
// monitoring should be setup again after event is received // monitoring should be setup again after event is received
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
elector.process(mockEvent); elector.processWatchEvent(mockZK, mockEvent);
// is standby. no need to notify anything now // is standby. no need to notify anything now
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
// another joinElection called. // another joinElection called.
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
// lost election // lost election
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
// still standby. so no need to notify again // still standby. so no need to notify again
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
@ -523,7 +526,7 @@ public class TestActiveStandbyElector {
elector.joinElection(data); elector.joinElection(data);
// getNewZooKeeper called 2 times. once in constructor and once now // getNewZooKeeper called 2 times. once in constructor and once now
Assert.assertEquals(2, count); Assert.assertEquals(2, count);
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
ZK_LOCK_NAME); ZK_LOCK_NAME);
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
verifyExistCall(1); verifyExistCall(1);

View File

@ -18,181 +18,182 @@
package org.apache.hadoop.ha; package org.apache.hadoop.ha;
import static org.junit.Assert.*; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.test.ClientBase; import org.apache.zookeeper.test.ClientBase;
import org.junit.Test;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;
import com.google.common.primitives.Ints;
/** /**
* Test for {@link ActiveStandbyElector} using real zookeeper. * Test for {@link ActiveStandbyElector} using real zookeeper.
*/ */
public class TestActiveStandbyElectorRealZK extends ClientBase { public class TestActiveStandbyElectorRealZK extends ClientBase {
static final int NUM_ELECTORS = 2; static final int NUM_ELECTORS = 2;
static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS];
static { static {
((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel( ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(
Level.ALL); Level.ALL);
} }
int activeIndex = -1;
int standbyIndex = -1;
static final String PARENT_DIR = "/" + UUID.randomUUID(); static final String PARENT_DIR = "/" + UUID.randomUUID();
ActiveStandbyElector[] electors = new ActiveStandbyElector[NUM_ELECTORS]; ActiveStandbyElector[] electors = new ActiveStandbyElector[NUM_ELECTORS];
private byte[][] appDatas = new byte[NUM_ELECTORS][];
private ActiveStandbyElectorCallback[] cbs =
new ActiveStandbyElectorCallback[NUM_ELECTORS];
private ZooKeeperServer zkServer;
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
// build.test.dir is used by zookeeper // build.test.dir is used by zookeeper
new File(System.getProperty("build.test.dir", "build")).mkdirs(); new File(System.getProperty("build.test.dir", "build")).mkdirs();
super.setUp(); super.setUp();
}
/**
* The class object runs on a thread and waits for a signal to start from the
* test object. On getting the signal it joins the election and thus by doing
* this on multiple threads we can test simultaneous attempts at leader lock
* creation. after joining the election, the object waits on a signal to exit.
* this signal comes when the object's elector has become a leader or there is
* an unexpected fatal error. this lets another thread object to become a
* leader.
*/
class ThreadRunner extends TestingThread
implements ActiveStandbyElectorCallback {
int index;
CountDownLatch hasBecomeActive = new CountDownLatch(1); zkServer = getServer(serverFactory);
ThreadRunner(TestContext ctx, for (int i = 0; i < NUM_ELECTORS; i++) {
int idx) { cbs[i] = Mockito.mock(ActiveStandbyElectorCallback.class);
super(ctx); appDatas[i] = Ints.toByteArray(i);
index = idx; electors[i] = new ActiveStandbyElector(
} hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE, cbs[i]);
@Override
public void doWork() throws Exception {
LOG.info("starting " + index);
// join election
byte[] data = new byte[1];
data[0] = (byte)index;
ActiveStandbyElector elector = electors[index];
LOG.info("joining " + index);
elector.joinElection(data);
hasBecomeActive.await(30, TimeUnit.SECONDS);
Thread.sleep(1000);
// quit election to allow other elector to become active
elector.quitElection(true);
LOG.info("ending " + index);
}
@Override
public synchronized void becomeActive() {
reportActive(index);
LOG.info("active " + index);
hasBecomeActive.countDown();
}
@Override
public synchronized void becomeStandby() {
reportStandby(index);
LOG.info("standby " + index);
}
@Override
public synchronized void enterNeutralMode() {
LOG.info("neutral " + index);
}
@Override
public synchronized void notifyFatalError(String errorMessage) {
LOG.info("fatal " + index + " .Error message:" + errorMessage);
this.interrupt();
}
@Override
public void fenceOldActive(byte[] data) {
LOG.info("fenceOldActive " + index);
// should not fence itself
Assert.assertTrue(index != data[0]);
} }
} }
synchronized void reportActive(int index) { private void checkFatalsAndReset() throws Exception {
if (activeIndex == -1) { for (int i = 0; i < NUM_ELECTORS; i++) {
activeIndex = index; Mockito.verify(cbs[i], Mockito.never()).notifyFatalError(
} else { Mockito.anyString());
// standby should become active Mockito.reset(cbs[i]);
Assert.assertEquals(standbyIndex, index);
// old active should not become active
Assert.assertFalse(activeIndex == index);
} }
activeIndex = index;
}
synchronized void reportStandby(int index) {
// only 1 standby should be reported and it should not be the same as active
Assert.assertEquals(-1, standbyIndex);
standbyIndex = index;
Assert.assertFalse(activeIndex == standbyIndex);
} }
/** /**
* the test creates 2 electors which try to become active using a real * the test creates 2 electors which try to become active using a real
* zookeeper server. It verifies that 1 becomes active and 1 becomes standby. * zookeeper server. It verifies that 1 becomes active and 1 becomes standby.
* Upon becoming active the leader quits election and the test verifies that * Upon becoming active the leader quits election and the test verifies that
* the standby now becomes active. these electors run on different threads and * the standby now becomes active.
* callback to the test class to report active and standby where the outcome
* is verified
* @throws Exception
*/ */
@Test @Test(timeout=20000)
public void testActiveStandbyTransition() throws Exception { public void testActiveStandbyTransition() throws Exception {
LOG.info("starting test with parentDir:" + PARENT_DIR); LOG.info("starting test with parentDir:" + PARENT_DIR);
TestContext ctx = new TestContext();
for(int i = 0; i < NUM_ELECTORS; i++) {
LOG.info("creating " + i);
final ZooKeeper zk = createClient();
assert zk != null;
ThreadRunner tr = new ThreadRunner(ctx, i);
electors[i] = new ActiveStandbyElector(
"hostPort", 1000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE,
tr) {
@Override
protected synchronized ZooKeeper getNewZooKeeper()
throws IOException {
return zk;
}
};
ctx.addThread(tr);
}
assertFalse(electors[0].parentZNodeExists()); assertFalse(electors[0].parentZNodeExists());
electors[0].ensureParentZNode(); electors[0].ensureParentZNode();
assertTrue(electors[0].parentZNodeExists()); assertTrue(electors[0].parentZNodeExists());
ctx.startThreads(); // First elector joins election, becomes active.
ctx.stop(); electors[0].joinElection(appDatas[0]);
ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
zkServer, PARENT_DIR, appDatas[0]);
Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeActive();
checkFatalsAndReset();
// Second elector joins election, becomes standby.
electors[1].joinElection(appDatas[1]);
Mockito.verify(cbs[1], Mockito.timeout(1000)).becomeStandby();
checkFatalsAndReset();
// First elector quits, second one should become active
electors[0].quitElection(true);
ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
zkServer, PARENT_DIR, appDatas[1]);
Mockito.verify(cbs[1], Mockito.timeout(1000)).becomeActive();
checkFatalsAndReset();
// First one rejoins, becomes standby, second one stays active
electors[0].joinElection(appDatas[0]);
Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeStandby();
checkFatalsAndReset();
// Second one expires, first one becomes active
electors[1].preventSessionReestablishmentForTests();
try {
zkServer.closeSession(electors[1].getZKSessionIdForTests());
ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
zkServer, PARENT_DIR, appDatas[0]);
Mockito.verify(cbs[1], Mockito.timeout(1000)).enterNeutralMode();
Mockito.verify(cbs[0], Mockito.timeout(1000)).fenceOldActive(
AdditionalMatchers.aryEq(appDatas[1]));
Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeActive();
} finally {
electors[1].allowSessionReestablishmentForTests();
}
// Second one eventually reconnects and becomes standby
Mockito.verify(cbs[1], Mockito.timeout(5000)).becomeStandby();
checkFatalsAndReset();
// First one expires, second one should become active
electors[0].preventSessionReestablishmentForTests();
try {
zkServer.closeSession(electors[0].getZKSessionIdForTests());
ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
zkServer, PARENT_DIR, appDatas[1]);
Mockito.verify(cbs[0], Mockito.timeout(1000)).enterNeutralMode();
Mockito.verify(cbs[1], Mockito.timeout(1000)).fenceOldActive(
AdditionalMatchers.aryEq(appDatas[0]));
Mockito.verify(cbs[1], Mockito.timeout(1000)).becomeActive();
} finally {
electors[0].allowSessionReestablishmentForTests();
}
checkFatalsAndReset();
}
@Test(timeout=15000)
public void testHandleSessionExpiration() throws Exception {
ActiveStandbyElectorCallback cb = cbs[0];
byte[] appData = appDatas[0];
ActiveStandbyElector elector = electors[0];
// Let the first elector become active
elector.ensureParentZNode();
elector.joinElection(appData);
ZooKeeperServer zks = getServer(serverFactory);
ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
zks, PARENT_DIR, appData);
Mockito.verify(cb, Mockito.timeout(1000)).becomeActive();
checkFatalsAndReset();
LOG.info("========================== Expiring session");
zks.closeSession(elector.getZKSessionIdForTests());
// Should enter neutral mode when disconnected
Mockito.verify(cb, Mockito.timeout(1000)).enterNeutralMode();
// Should re-join the election and regain active
ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
zks, PARENT_DIR, appData);
Mockito.verify(cb, Mockito.timeout(1000)).becomeActive();
checkFatalsAndReset();
LOG.info("========================== Quitting election");
elector.quitElection(false);
ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
zks, PARENT_DIR, null);
// Double check that we don't accidentally re-join the election
// due to receiving the "expired" event.
Thread.sleep(1000);
Mockito.verify(cb, Mockito.never()).becomeActive();
ActiveStandbyElectorTestUtil.waitForActiveLockData(null,
zks, PARENT_DIR, null);
checkFatalsAndReset();
} }
} }

View File

@ -124,7 +124,7 @@ public abstract class MultithreadedTestUtil {
* Checks for thread exceptions, and if they've occurred * Checks for thread exceptions, and if they've occurred
* throws them as RuntimeExceptions in a deferred manner. * throws them as RuntimeExceptions in a deferred manner.
*/ */
private synchronized void checkException() throws Exception { public synchronized void checkException() throws Exception {
if (err != null) { if (err != null) {
throw new RuntimeException("Deferred", err); throw new RuntimeException("Deferred", err);
} }