diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 00715abda9e..1ff04f8d7cb 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -223,6 +223,9 @@ Release 0.23.3 - UNRELEASED HADOOP-8193. Refactor FailoverController/HAAdmin code to add an abstract class for "target" services. (todd) + HADOOP-8212. Improve ActiveStandbyElector's behavior when session expires + (todd) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java index 56cba81139e..b77187b7939 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -21,6 +21,8 @@ package org.apache.hadoop.ha; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +34,7 @@ import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.AsyncCallback.*; @@ -60,8 +63,7 @@ import com.google.common.base.Preconditions; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ActiveStandbyElector implements Watcher, StringCallback, - StatCallback { +public class ActiveStandbyElector implements StatCallback, StringCallback { /** * Callback interface to interact with the ActiveStandbyElector object.
@@ -156,6 +158,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback, private final String zkBreadCrumbPath; private final String znodeWorkingDir; + private Lock sessionReestablishLockForTests = new ReentrantLock(); + private boolean wantToBeInElection; + /** * Create a new ActiveStandbyElector object
* The elector is created by providing to it the Zookeeper configuration, the @@ -274,6 +279,8 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } } } + + LOG.info("Successfully created " + znodeWorkingDir + " in ZK."); } /** @@ -290,13 +297,14 @@ public class ActiveStandbyElector implements Watcher, StringCallback, * if a failover occurs due to dropping out of the election. */ public synchronized void quitElection(boolean needFence) { - LOG.debug("Yielding from election"); + LOG.info("Yielding from election"); if (!needFence && state == State.ACTIVE) { // If active is gracefully going back to standby mode, remove // our permanent znode so no one fences us. tryDeleteOwnBreadCrumbNode(); } reset(); + wantToBeInElection = false; } /** @@ -343,13 +351,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback, @Override public synchronized void processResult(int rc, String path, Object ctx, String name) { + if (isStaleClient(ctx)) return; LOG.debug("CreateNode result: " + rc + " for path: " + path + " connectionState: " + zkConnectionState); - if (zkClient == null) { - // zkClient is nulled before closing the connection - // this is the callback with session expired after we closed the session - return; - } Code code = Code.get(rc); if (isSuccess(code)) { @@ -386,6 +390,10 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } errorMessage = errorMessage + ". Not retrying further znode create connection errors."; + } else if (isSessionExpired(code)) { + // This isn't fatal - the client Watcher will re-join the election + LOG.warn("Lock acquisition failed because session was lost"); + return; } fatalError(errorMessage); @@ -397,13 +405,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback, @Override public synchronized void processResult(int rc, String path, Object ctx, Stat stat) { + if (isStaleClient(ctx)) return; LOG.debug("StatNode result: " + rc + " for path: " + path + " connectionState: " + zkConnectionState); - if (zkClient == null) { - // zkClient is nulled before closing the connection - // this is the callback with session expired after we closed the session - return; - } Code code = Code.get(rc); if (isSuccess(code)) { @@ -447,22 +451,18 @@ public class ActiveStandbyElector implements Watcher, StringCallback, /** * interface implementation of Zookeeper watch events (connection and node) */ - @Override - public synchronized void process(WatchedEvent event) { + synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) { Event.EventType eventType = event.getType(); + if (isStaleClient(zk)) return; LOG.debug("Watcher event type: " + eventType + " with state:" + event.getState() + " for path:" + event.getPath() + " connectionState: " + zkConnectionState); - if (zkClient == null) { - // zkClient is nulled before closing the connection - // this is the callback with session expired after we closed the session - return; - } if (eventType == Event.EventType.None) { // the connection state has changed switch (event.getState()) { case SyncConnected: + LOG.info("Session connected."); // if the listener was asked to move to safe state then it needs to // be undone ConnectionState prevConnectionState = zkConnectionState; @@ -472,6 +472,8 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } break; case Disconnected: + LOG.info("Session disconnected. Entering neutral mode..."); + // ask the app to move to safe state because zookeeper connection // is not active and we dont know our state zkConnectionState = ConnectionState.DISCONNECTED; @@ -480,6 +482,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, case Expired: // the connection got terminated because of session timeout // call listener to reconnect + LOG.info("Session expired. Entering neutral mode and rejoining..."); enterNeutralMode(); reJoinElection(); break; @@ -527,7 +530,9 @@ public class ActiveStandbyElector implements Watcher, StringCallback, * @throws IOException */ protected synchronized ZooKeeper getNewZooKeeper() throws IOException { - return new ZooKeeper(zkHostPort, zkSessionTimeout, this); + ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); + zk.register(new WatcherWithClientRef(zk)); + return zk; } private void fatalError(String errorMessage) { @@ -550,13 +555,42 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } createRetryCount = 0; + wantToBeInElection = true; createLockNodeAsync(); } private void reJoinElection() { - LOG.debug("Trying to re-establish ZK session"); - terminateConnection(); - joinElectionInternal(); + LOG.info("Trying to re-establish ZK session"); + + // Some of the test cases rely on expiring the ZK sessions and + // ensuring that the other node takes over. But, there's a race + // where the original lease holder could reconnect faster than the other + // thread manages to take the lock itself. This lock allows the + // tests to block the reconnection. It's a shame that this leaked + // into non-test code, but the lock is only acquired here so will never + // be contended. + sessionReestablishLockForTests.lock(); + try { + terminateConnection(); + joinElectionInternal(); + } finally { + sessionReestablishLockForTests.unlock(); + } + } + + @VisibleForTesting + void preventSessionReestablishmentForTests() { + sessionReestablishLockForTests.lock(); + } + + @VisibleForTesting + void allowSessionReestablishmentForTests() { + sessionReestablishLockForTests.unlock(); + } + + @VisibleForTesting + long getZKSessionIdForTests() { + return zkClient.getSessionId(); } private boolean reEstablishSession() { @@ -605,6 +639,7 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } private void becomeActive() { + assert wantToBeInElection; if (state != State.ACTIVE) { try { Stat oldBreadcrumbStat = fenceOldActive(); @@ -727,12 +762,14 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } private void createLockNodeAsync() { - zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, - null); + zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, + this, zkClient); } private void monitorLockNodeAsync() { - zkClient.exists(zkLockFilePath, this, this, null); + zkClient.exists(zkLockFilePath, + new WatcherWithClientRef(zkClient), this, + zkClient); } private String createWithRetries(final String path, final byte[] data, @@ -778,10 +815,47 @@ public class ActiveStandbyElector implements Watcher, StringCallback, } } } - + private interface ZKAction { T run() throws KeeperException, InterruptedException; } + + /** + * The callbacks and watchers pass a reference to the ZK client + * which made the original call. We don't want to take action + * based on any callbacks from prior clients after we quit + * the election. + * @param ctx the ZK client passed into the watcher + * @return true if it matches the current client + */ + private synchronized boolean isStaleClient(Object ctx) { + Preconditions.checkNotNull(ctx); + if (zkClient != (ZooKeeper)ctx) { + LOG.warn("Ignoring stale result from old client with sessionId " + + String.format("0x%08x", ((ZooKeeper)ctx).getSessionId())); + return true; + } + return false; + } + + /** + * Watcher implementation which keeps a reference around to the + * original ZK connection, and passes it back along with any + * events. + */ + private final class WatcherWithClientRef implements Watcher { + private final ZooKeeper zk; + + private WatcherWithClientRef(ZooKeeper zk) { + this.zk = zk; + } + + @Override + public void process(WatchedEvent event) { + ActiveStandbyElector.this.processWatchEvent( + zk, event); + } + } private static boolean isSuccess(Code code) { return (code == Code.OK); @@ -794,6 +868,10 @@ public class ActiveStandbyElector implements Watcher, StringCallback, private static boolean isNodeDoesNotExist(Code code) { return (code == Code.NONODE); } + + private static boolean isSessionExpired(Code code) { + return (code == Code.SESSIONEXPIRED); + } private static boolean shouldRetry(Code code) { switch (code) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java new file mode 100644 index 00000000000..e24cf87aa83 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import java.util.Arrays; + +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.ZooKeeperServer; + +public abstract class ActiveStandbyElectorTestUtil { + + public static void waitForActiveLockData(TestContext ctx, + ZooKeeperServer zks, String parentDir, byte[] activeData) + throws Exception { + while (true) { + if (ctx != null) { + ctx.checkException(); + } + try { + Stat stat = new Stat(); + byte[] data = zks.getZKDatabase().getData( + parentDir + "/" + + ActiveStandbyElector.LOCK_FILENAME, stat, null); + if (activeData != null && + Arrays.equals(activeData, data)) { + return; + } + } catch (NoNodeException nne) { + if (activeData == null) { + return; + } + } + Thread.sleep(50); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java index f3b551ad834..b9786aea36a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java @@ -26,6 +26,7 @@ import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.Code; import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event; import org.apache.zookeeper.data.ACL; @@ -111,7 +112,7 @@ public class TestActiveStandbyElector { public void testJoinElection() { elector.joinElection(data); Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); } /** @@ -123,7 +124,7 @@ public class TestActiveStandbyElector { mockNoPriorActive(); elector.joinElection(data); - elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); verifyExistCall(1); @@ -133,14 +134,14 @@ public class TestActiveStandbyElector { Stat stat = new Stat(); stat.setEphemeralOwner(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L); - elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat); // should not call neutral mode/standby/active Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(0)).becomeStandby(); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); // another joinElection not called. Mockito.verify(mockZK, Mockito.times(1)).create(ZK_LOCK_NAME, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); // no new monitor called verifyExistCall(1); } @@ -155,7 +156,7 @@ public class TestActiveStandbyElector { mockPriorActive(fakeOldActiveData); elector.joinElection(data); - elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); // Application fences active. Mockito.verify(mockApp, Mockito.times(1)).fenceOldActive( @@ -173,7 +174,7 @@ public class TestActiveStandbyElector { public void testQuitElectionRemovesBreadcrumbNode() throws Exception { mockNoPriorActive(); elector.joinElection(data); - elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); // Writes its own active info Mockito.verify(mockZK, Mockito.times(1)).create( @@ -197,7 +198,7 @@ public class TestActiveStandbyElector { public void testCreateNodeResultBecomeStandby() { elector.joinElection(data); - elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); @@ -210,7 +211,7 @@ public class TestActiveStandbyElector { public void testCreateNodeResultError() { elector.joinElection(data); - elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.APIERROR.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( "Received create error from Zookeeper. code:APIERROR " + @@ -227,13 +228,13 @@ public class TestActiveStandbyElector { elector.joinElection(data); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); // 4 errors results in fatalError Mockito @@ -246,20 +247,20 @@ public class TestActiveStandbyElector { elector.joinElection(data); // recreate connection via getNewZooKeeper Assert.assertEquals(2, count); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); - elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); verifyExistCall(1); Stat stat = new Stat(); stat.setEphemeralOwner(1L); Mockito.when(mockZK.getSessionId()).thenReturn(1L); - elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); verifyExistCall(1); Mockito.verify(mockZK, Mockito.times(6)).create(ZK_LOCK_NAME, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); } /** @@ -270,16 +271,16 @@ public class TestActiveStandbyElector { public void testCreateNodeResultRetryBecomeStandby() { elector.joinElection(data); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); - elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); verifyExistCall(1); Stat stat = new Stat(); stat.setEphemeralOwner(0); Mockito.when(mockZK.getSessionId()).thenReturn(1L); - elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, stat); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); } @@ -293,19 +294,19 @@ public class TestActiveStandbyElector { public void testCreateNodeResultRetryNoNode() { elector.joinElection(data); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); - elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); verifyExistCall(1); - elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.NONODE.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); Mockito.verify(mockZK, Mockito.times(4)).create(ZK_LOCK_NAME, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); } /** @@ -313,13 +314,13 @@ public class TestActiveStandbyElector { */ @Test public void testStatNodeRetry() { - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); - elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, (Stat) null); Mockito .verify(mockApp, Mockito.times(1)) @@ -334,7 +335,7 @@ public class TestActiveStandbyElector { @Test public void testStatNodeError() { elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), ZK_LOCK_NAME, - null, (Stat) null); + mockZK, (Stat) null); Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( "Received stat error from Zookeeper. code:RUNTIMEINCONSISTENCY"); @@ -354,7 +355,7 @@ public class TestActiveStandbyElector { // first SyncConnected should not do anything Mockito.when(mockEvent.getState()).thenReturn( Event.KeeperState.SyncConnected); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); Mockito.verify(mockZK, Mockito.times(0)).exists(Mockito.anyString(), Mockito.anyBoolean(), Mockito. anyObject(), Mockito. anyObject()); @@ -362,20 +363,20 @@ public class TestActiveStandbyElector { // disconnection should enter safe mode Mockito.when(mockEvent.getState()).thenReturn( Event.KeeperState.Disconnected); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); // re-connection should monitor master status Mockito.when(mockEvent.getState()).thenReturn( Event.KeeperState.SyncConnected); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); verifyExistCall(1); // session expired should enter safe mode and initiate re-election // re-election checked via checking re-creation of new zookeeper and // call to create lock znode Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); // already in safe mode above. should not enter safe mode again Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); // called getNewZooKeeper to create new session. first call was in @@ -383,17 +384,17 @@ public class TestActiveStandbyElector { Assert.assertEquals(2, count); // once in initial joinElection and one now Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); // create znode success. become master and monitor - elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); verifyExistCall(2); // error event results in fatal error Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( "Unexpected Zookeeper watch event state: AuthFailed"); // only 1 state change callback is called at a time @@ -409,7 +410,7 @@ public class TestActiveStandbyElector { elector.joinElection(data); // make the object go into the monitoring state - elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); @@ -420,25 +421,25 @@ public class TestActiveStandbyElector { // monitoring should be setup again after event is received Mockito.when(mockEvent.getType()).thenReturn( Event.EventType.NodeDataChanged); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); verifyExistCall(2); // monitoring should be setup again after event is received Mockito.when(mockEvent.getType()).thenReturn( Event.EventType.NodeChildrenChanged); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); verifyExistCall(3); // lock node deletion when in standby mode should create znode again // successful znode creation enters active state and sets monitor Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); // enterNeutralMode not called when app is standby and leader is lost Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); // once in initial joinElection() and one now Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); - elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); verifyExistCall(4); @@ -447,19 +448,19 @@ public class TestActiveStandbyElector { // znode again successful znode creation enters active state and sets // monitor Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); // another joinElection called Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); - elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, null, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(2)).becomeActive(); verifyExistCall(5); // bad path name results in fatal error Mockito.when(mockEvent.getPath()).thenReturn(null); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( "Unexpected watch error from Zookeeper"); // fatal error means no new connection other than one from constructor @@ -471,7 +472,9 @@ public class TestActiveStandbyElector { private void verifyExistCall(int times) { Mockito.verify(mockZK, Mockito.times(times)).exists( - ZK_LOCK_NAME, elector, elector, null); + Mockito.eq(ZK_LOCK_NAME), Mockito.any(), + Mockito.same(elector), + Mockito.same(mockZK)); } /** @@ -482,7 +485,7 @@ public class TestActiveStandbyElector { elector.joinElection(data); // make the object go into the monitoring standby state - elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); @@ -493,14 +496,14 @@ public class TestActiveStandbyElector { // notify node deletion // monitoring should be setup again after event is received Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); - elector.process(mockEvent); + elector.processWatchEvent(mockZK, mockEvent); // is standby. no need to notify anything now Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); // another joinElection called. Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, - Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); // lost election - elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); // still standby. so no need to notify again Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); @@ -523,7 +526,7 @@ public class TestActiveStandbyElector { elector.joinElection(data); // getNewZooKeeper called 2 times. once in constructor and once now Assert.assertEquals(2, count); - elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, null, + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, ZK_LOCK_NAME); Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); verifyExistCall(1); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java index bc375e40602..3a0fa5f981f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java @@ -18,181 +18,182 @@ package org.apache.hadoop.ha; -import static org.junit.Assert.*; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.File; -import java.io.IOException; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.junit.Assert; -import org.junit.Test; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; -import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; -import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread; import org.apache.log4j.Level; -import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.test.ClientBase; +import org.junit.Test; +import org.mockito.AdditionalMatchers; +import org.mockito.Mockito; + +import com.google.common.primitives.Ints; /** * Test for {@link ActiveStandbyElector} using real zookeeper. */ public class TestActiveStandbyElectorRealZK extends ClientBase { static final int NUM_ELECTORS = 2; - static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS]; static { ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel( Level.ALL); } - int activeIndex = -1; - int standbyIndex = -1; static final String PARENT_DIR = "/" + UUID.randomUUID(); ActiveStandbyElector[] electors = new ActiveStandbyElector[NUM_ELECTORS]; + private byte[][] appDatas = new byte[NUM_ELECTORS][]; + private ActiveStandbyElectorCallback[] cbs = + new ActiveStandbyElectorCallback[NUM_ELECTORS]; + private ZooKeeperServer zkServer; + @Override public void setUp() throws Exception { // build.test.dir is used by zookeeper new File(System.getProperty("build.test.dir", "build")).mkdirs(); super.setUp(); - } - - /** - * The class object runs on a thread and waits for a signal to start from the - * test object. On getting the signal it joins the election and thus by doing - * this on multiple threads we can test simultaneous attempts at leader lock - * creation. after joining the election, the object waits on a signal to exit. - * this signal comes when the object's elector has become a leader or there is - * an unexpected fatal error. this lets another thread object to become a - * leader. - */ - class ThreadRunner extends TestingThread - implements ActiveStandbyElectorCallback { - int index; - CountDownLatch hasBecomeActive = new CountDownLatch(1); + zkServer = getServer(serverFactory); - ThreadRunner(TestContext ctx, - int idx) { - super(ctx); - index = idx; - } - - @Override - public void doWork() throws Exception { - LOG.info("starting " + index); - // join election - byte[] data = new byte[1]; - data[0] = (byte)index; - - ActiveStandbyElector elector = electors[index]; - LOG.info("joining " + index); - elector.joinElection(data); - - hasBecomeActive.await(30, TimeUnit.SECONDS); - Thread.sleep(1000); - - // quit election to allow other elector to become active - elector.quitElection(true); - - LOG.info("ending " + index); - } - - @Override - public synchronized void becomeActive() { - reportActive(index); - LOG.info("active " + index); - hasBecomeActive.countDown(); - } - - @Override - public synchronized void becomeStandby() { - reportStandby(index); - LOG.info("standby " + index); - } - - @Override - public synchronized void enterNeutralMode() { - LOG.info("neutral " + index); - } - - @Override - public synchronized void notifyFatalError(String errorMessage) { - LOG.info("fatal " + index + " .Error message:" + errorMessage); - this.interrupt(); - } - - @Override - public void fenceOldActive(byte[] data) { - LOG.info("fenceOldActive " + index); - // should not fence itself - Assert.assertTrue(index != data[0]); + for (int i = 0; i < NUM_ELECTORS; i++) { + cbs[i] = Mockito.mock(ActiveStandbyElectorCallback.class); + appDatas[i] = Ints.toByteArray(i); + electors[i] = new ActiveStandbyElector( + hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE, cbs[i]); } } - - synchronized void reportActive(int index) { - if (activeIndex == -1) { - activeIndex = index; - } else { - // standby should become active - Assert.assertEquals(standbyIndex, index); - // old active should not become active - Assert.assertFalse(activeIndex == index); + + private void checkFatalsAndReset() throws Exception { + for (int i = 0; i < NUM_ELECTORS; i++) { + Mockito.verify(cbs[i], Mockito.never()).notifyFatalError( + Mockito.anyString()); + Mockito.reset(cbs[i]); } - activeIndex = index; - } - - synchronized void reportStandby(int index) { - // only 1 standby should be reported and it should not be the same as active - Assert.assertEquals(-1, standbyIndex); - standbyIndex = index; - Assert.assertFalse(activeIndex == standbyIndex); } /** * the test creates 2 electors which try to become active using a real * zookeeper server. It verifies that 1 becomes active and 1 becomes standby. * Upon becoming active the leader quits election and the test verifies that - * the standby now becomes active. these electors run on different threads and - * callback to the test class to report active and standby where the outcome - * is verified - * @throws Exception + * the standby now becomes active. */ - @Test + @Test(timeout=20000) public void testActiveStandbyTransition() throws Exception { LOG.info("starting test with parentDir:" + PARENT_DIR); - TestContext ctx = new TestContext(); - - for(int i = 0; i < NUM_ELECTORS; i++) { - LOG.info("creating " + i); - final ZooKeeper zk = createClient(); - assert zk != null; - - ThreadRunner tr = new ThreadRunner(ctx, i); - electors[i] = new ActiveStandbyElector( - "hostPort", 1000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE, - tr) { - @Override - protected synchronized ZooKeeper getNewZooKeeper() - throws IOException { - return zk; - } - }; - ctx.addThread(tr); - } - assertFalse(electors[0].parentZNodeExists()); electors[0].ensureParentZNode(); assertTrue(electors[0].parentZNodeExists()); - ctx.startThreads(); - ctx.stop(); + // First elector joins election, becomes active. + electors[0].joinElection(appDatas[0]); + ActiveStandbyElectorTestUtil.waitForActiveLockData(null, + zkServer, PARENT_DIR, appDatas[0]); + Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeActive(); + checkFatalsAndReset(); + + // Second elector joins election, becomes standby. + electors[1].joinElection(appDatas[1]); + Mockito.verify(cbs[1], Mockito.timeout(1000)).becomeStandby(); + checkFatalsAndReset(); + + // First elector quits, second one should become active + electors[0].quitElection(true); + ActiveStandbyElectorTestUtil.waitForActiveLockData(null, + zkServer, PARENT_DIR, appDatas[1]); + Mockito.verify(cbs[1], Mockito.timeout(1000)).becomeActive(); + checkFatalsAndReset(); + + // First one rejoins, becomes standby, second one stays active + electors[0].joinElection(appDatas[0]); + Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeStandby(); + checkFatalsAndReset(); + + // Second one expires, first one becomes active + electors[1].preventSessionReestablishmentForTests(); + try { + zkServer.closeSession(electors[1].getZKSessionIdForTests()); + + ActiveStandbyElectorTestUtil.waitForActiveLockData(null, + zkServer, PARENT_DIR, appDatas[0]); + Mockito.verify(cbs[1], Mockito.timeout(1000)).enterNeutralMode(); + Mockito.verify(cbs[0], Mockito.timeout(1000)).fenceOldActive( + AdditionalMatchers.aryEq(appDatas[1])); + Mockito.verify(cbs[0], Mockito.timeout(1000)).becomeActive(); + } finally { + electors[1].allowSessionReestablishmentForTests(); + } + + // Second one eventually reconnects and becomes standby + Mockito.verify(cbs[1], Mockito.timeout(5000)).becomeStandby(); + checkFatalsAndReset(); + + // First one expires, second one should become active + electors[0].preventSessionReestablishmentForTests(); + try { + zkServer.closeSession(electors[0].getZKSessionIdForTests()); + + ActiveStandbyElectorTestUtil.waitForActiveLockData(null, + zkServer, PARENT_DIR, appDatas[1]); + Mockito.verify(cbs[0], Mockito.timeout(1000)).enterNeutralMode(); + Mockito.verify(cbs[1], Mockito.timeout(1000)).fenceOldActive( + AdditionalMatchers.aryEq(appDatas[0])); + Mockito.verify(cbs[1], Mockito.timeout(1000)).becomeActive(); + } finally { + electors[0].allowSessionReestablishmentForTests(); + } + + checkFatalsAndReset(); + } + + @Test(timeout=15000) + public void testHandleSessionExpiration() throws Exception { + ActiveStandbyElectorCallback cb = cbs[0]; + byte[] appData = appDatas[0]; + ActiveStandbyElector elector = electors[0]; + + // Let the first elector become active + elector.ensureParentZNode(); + elector.joinElection(appData); + ZooKeeperServer zks = getServer(serverFactory); + ActiveStandbyElectorTestUtil.waitForActiveLockData(null, + zks, PARENT_DIR, appData); + Mockito.verify(cb, Mockito.timeout(1000)).becomeActive(); + checkFatalsAndReset(); + + LOG.info("========================== Expiring session"); + zks.closeSession(elector.getZKSessionIdForTests()); + + // Should enter neutral mode when disconnected + Mockito.verify(cb, Mockito.timeout(1000)).enterNeutralMode(); + + // Should re-join the election and regain active + ActiveStandbyElectorTestUtil.waitForActiveLockData(null, + zks, PARENT_DIR, appData); + Mockito.verify(cb, Mockito.timeout(1000)).becomeActive(); + checkFatalsAndReset(); + + LOG.info("========================== Quitting election"); + elector.quitElection(false); + ActiveStandbyElectorTestUtil.waitForActiveLockData(null, + zks, PARENT_DIR, null); + + // Double check that we don't accidentally re-join the election + // due to receiving the "expired" event. + Thread.sleep(1000); + Mockito.verify(cb, Mockito.never()).becomeActive(); + ActiveStandbyElectorTestUtil.waitForActiveLockData(null, + zks, PARENT_DIR, null); + + checkFatalsAndReset(); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java index 6fab1948cb5..df978c96b8e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MultithreadedTestUtil.java @@ -124,7 +124,7 @@ public abstract class MultithreadedTestUtil { * Checks for thread exceptions, and if they've occurred * throws them as RuntimeExceptions in a deferred manner. */ - private synchronized void checkException() throws Exception { + public synchronized void checkException() throws Exception { if (err != null) { throw new RuntimeException("Deferred", err); }