HADOOP-7992. Add ZKClient library to facilitate leader election. Contributed by Bikas Saha.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1235841 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dea3164e00
commit
cbfe8fea0e
|
@ -9,21 +9,21 @@ HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)
|
||||||
HADOOP-7774. HA: Administrative CLI to control HA daemons. (todd)
|
HADOOP-7774. HA: Administrative CLI to control HA daemons. (todd)
|
||||||
|
|
||||||
HADOOP-7896. HA: if both NNs are in Standby mode, client needs to try failing
|
HADOOP-7896. HA: if both NNs are in Standby mode, client needs to try failing
|
||||||
back and forth several times with sleeps. (atm)
|
back and forth several times with sleeps. (atm)
|
||||||
|
|
||||||
HADOOP-7922. Improve some logging for client IPC failovers and
|
HADOOP-7922. Improve some logging for client IPC failovers and
|
||||||
StandbyExceptions (todd)
|
StandbyExceptions (todd)
|
||||||
|
|
||||||
HADOOP-7921. StandbyException should extend IOException (todd)
|
HADOOP-7921. StandbyException should extend IOException (todd)
|
||||||
|
|
||||||
HADOOP-7928. HA: Client failover policy is incorrectly trying to fail over all
|
HADOOP-7928. HA: Client failover policy is incorrectly trying to fail over all
|
||||||
IOExceptions (atm)
|
IOExceptions (atm)
|
||||||
|
|
||||||
HADOOP-7925. Add interface and update CLI to query current state to
|
HADOOP-7925. Add interface and update CLI to query current state to
|
||||||
HAServiceProtocol (eli via todd)
|
HAServiceProtocol (eli via todd)
|
||||||
|
|
||||||
HADOOP-7932. Make client connection retries on socket time outs configurable.
|
HADOOP-7932. Make client connection retries on socket time outs configurable.
|
||||||
(Uma Maheswara Rao G via todd)
|
(Uma Maheswara Rao G via todd)
|
||||||
|
|
||||||
HADOOP-7924.
FailoverController for client-based configuration (eli)
|
HADOOP-7924.
FailoverController for client-based configuration (eli)
|
||||||
|
|
||||||
|
@ -31,3 +31,6 @@ HADOOP-7961. Move HA fencing to common. (eli)
|
||||||
|
|
||||||
HADOOP-7970. HAServiceProtocol methods must throw IOException.
|
HADOOP-7970. HAServiceProtocol methods must throw IOException.
|
||||||
(Hari Mankude via suresh).
|
(Hari Mankude via suresh).
|
||||||
|
|
||||||
|
HADOOP-7992. Add ZKClient library to facilitate leader election.
|
||||||
|
(Bikas Saha via suresh).
|
||||||
|
|
|
@ -268,6 +268,34 @@
|
||||||
<groupId>com.jcraft</groupId>
|
<groupId>com.jcraft</groupId>
|
||||||
<artifactId>jsch</artifactId>
|
<artifactId>jsch</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
|
<artifactId>zookeeper</artifactId>
|
||||||
|
<version>3.4.2</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<!-- otherwise seems to drag in junit 3.8.1 via jline -->
|
||||||
|
<groupId>junit</groupId>
|
||||||
|
<artifactId>junit</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.sun.jdmk</groupId>
|
||||||
|
<artifactId>jmxtools</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.sun.jmx</groupId>
|
||||||
|
<artifactId>jmxri</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.zookeeper</groupId>
|
||||||
|
<artifactId>zookeeper</artifactId>
|
||||||
|
<version>3.4.2</version>
|
||||||
|
<type>test-jar</type>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
|
|
@ -0,0 +1,593 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ha;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.Watcher;
|
||||||
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.AsyncCallback.*;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.apache.zookeeper.KeeperException.Code;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* This class implements a simple library to perform leader election on top of
|
||||||
|
* Apache Zookeeper. Using Zookeeper as a coordination service, leader election
|
||||||
|
* can be performed by atomically creating an ephemeral lock file (znode) on
|
||||||
|
* Zookeeper. The service instance that successfully creates the znode becomes
|
||||||
|
* active and the rest become standbys. <br/>
|
||||||
|
* This election mechanism is only efficient for small number of election
|
||||||
|
* candidates (order of 10's) because contention on single znode by a large
|
||||||
|
* number of candidates can result in Zookeeper overload. <br/>
|
||||||
|
* The elector does not guarantee fencing (protection of shared resources) among
|
||||||
|
* service instances. After it has notified an instance about becoming a leader,
|
||||||
|
* then that instance must ensure that it meets the service consistency
|
||||||
|
* requirements. If it cannot do so, then it is recommended to quit the
|
||||||
|
* election. The application implements the {@link ActiveStandbyElectorCallback}
|
||||||
|
* to interact with the elector
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public class ActiveStandbyElector implements Watcher, StringCallback,
|
||||||
|
StatCallback {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback interface to interact with the ActiveStandbyElector object. <br/>
|
||||||
|
* The application will be notified with a callback only on state changes
|
||||||
|
* (i.e. there will never be successive calls to becomeActive without an
|
||||||
|
* intermediate call to enterNeutralMode). <br/>
|
||||||
|
* The callbacks will be running on Zookeeper client library threads. The
|
||||||
|
* application should return from these callbacks quickly so as not to impede
|
||||||
|
* Zookeeper client library performance and notifications. The app will
|
||||||
|
* typically remember the state change and return from the callback. It will
|
||||||
|
* then proceed with implementing actions around that state change. It is
|
||||||
|
* possible to be called back again while these actions are in flight and the
|
||||||
|
* app should handle this scenario.
|
||||||
|
*/
|
||||||
|
public interface ActiveStandbyElectorCallback {
|
||||||
|
/**
|
||||||
|
* This method is called when the app becomes the active leader
|
||||||
|
*/
|
||||||
|
void becomeActive();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method is called when the app becomes a standby
|
||||||
|
*/
|
||||||
|
void becomeStandby();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If the elector gets disconnected from Zookeeper and does not know about
|
||||||
|
* the lock state, then it will notify the service via the enterNeutralMode
|
||||||
|
* interface. The service may choose to ignore this or stop doing state
|
||||||
|
* changing operations. Upon reconnection, the elector verifies the leader
|
||||||
|
* status and calls back on the becomeActive and becomeStandby app
|
||||||
|
* interfaces. <br/>
|
||||||
|
* Zookeeper disconnects can happen due to network issues or loss of
|
||||||
|
* Zookeeper quorum. Thus enterNeutralMode can be used to guard against
|
||||||
|
* split-brain issues. In such situations it might be prudent to call
|
||||||
|
* becomeStandby too. However, such state change operations might be
|
||||||
|
* expensive and enterNeutralMode can help guard against doing that for
|
||||||
|
* transient issues.
|
||||||
|
*/
|
||||||
|
void enterNeutralMode();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If there is any fatal error (e.g. wrong ACL's, unexpected Zookeeper
|
||||||
|
* errors or Zookeeper persistent unavailability) then notifyFatalError is
|
||||||
|
* called to notify the app about it.
|
||||||
|
*/
|
||||||
|
void notifyFatalError(String errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Name of the lock znode used by the library. Protected for access in test
|
||||||
|
* classes
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
protected static final String LOCKFILENAME = "ActiveStandbyElectorLock";
|
||||||
|
|
||||||
|
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
|
||||||
|
|
||||||
|
private static final int NUM_RETRIES = 3;
|
||||||
|
|
||||||
|
private enum ConnectionState {
|
||||||
|
DISCONNECTED, CONNECTED, TERMINATED
|
||||||
|
};
|
||||||
|
|
||||||
|
private enum State {
|
||||||
|
INIT, ACTIVE, STANDBY, NEUTRAL
|
||||||
|
};
|
||||||
|
|
||||||
|
private State state = State.INIT;
|
||||||
|
private int createRetryCount = 0;
|
||||||
|
private int statRetryCount = 0;
|
||||||
|
private ZooKeeper zkClient;
|
||||||
|
private ConnectionState zkConnectionState = ConnectionState.TERMINATED;
|
||||||
|
|
||||||
|
private final ActiveStandbyElectorCallback appClient;
|
||||||
|
private final String zkHostPort;
|
||||||
|
private final int zkSessionTimeout;
|
||||||
|
private final List<ACL> zkAcl;
|
||||||
|
private byte[] appData;
|
||||||
|
private final String zkLockFilePath;
|
||||||
|
private final String znodeWorkingDir;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new ActiveStandbyElector object <br/>
|
||||||
|
* The elector is created by providing to it the Zookeeper configuration, the
|
||||||
|
* parent znode under which to create the znode and a reference to the
|
||||||
|
* callback interface. <br/>
|
||||||
|
* The parent znode name must be the same for all service instances and
|
||||||
|
* different across services. <br/>
|
||||||
|
* After the leader has been lost, a new leader will be elected after the
|
||||||
|
* session timeout expires. Hence, the app must set this parameter based on
|
||||||
|
* its needs for failure response time. The session timeout must be greater
|
||||||
|
* than the Zookeeper disconnect timeout and is recommended to be 3X that
|
||||||
|
* value to enable Zookeeper to retry transient disconnections. Setting a very
|
||||||
|
* short session timeout may result in frequent transitions between active and
|
||||||
|
* standby states during issues like network outages/GS pauses.
|
||||||
|
*
|
||||||
|
* @param zookeeperHostPorts
|
||||||
|
* ZooKeeper hostPort for all ZooKeeper servers
|
||||||
|
* @param zookeeperSessionTimeout
|
||||||
|
* ZooKeeper session timeout
|
||||||
|
* @param parentZnodeName
|
||||||
|
* znode under which to create the lock
|
||||||
|
* @param acl
|
||||||
|
* ZooKeeper ACL's
|
||||||
|
* @param app
|
||||||
|
* reference to callback interface object
|
||||||
|
* @throws IOException
|
||||||
|
* @throws HadoopIllegalArgumentException
|
||||||
|
*/
|
||||||
|
public ActiveStandbyElector(String zookeeperHostPorts,
|
||||||
|
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
||||||
|
ActiveStandbyElectorCallback app) throws IOException,
|
||||||
|
HadoopIllegalArgumentException {
|
||||||
|
if (app == null || acl == null || parentZnodeName == null
|
||||||
|
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
|
||||||
|
throw new HadoopIllegalArgumentException("Invalid argument");
|
||||||
|
}
|
||||||
|
zkHostPort = zookeeperHostPorts;
|
||||||
|
zkSessionTimeout = zookeeperSessionTimeout;
|
||||||
|
zkAcl = acl;
|
||||||
|
appClient = app;
|
||||||
|
znodeWorkingDir = parentZnodeName;
|
||||||
|
zkLockFilePath = znodeWorkingDir + "/" + LOCKFILENAME;
|
||||||
|
|
||||||
|
// createConnection for future API calls
|
||||||
|
createConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To participate in election, the app will call joinElection. The result will
|
||||||
|
* be notified by a callback on either the becomeActive or becomeStandby app
|
||||||
|
* interfaces. <br/>
|
||||||
|
* After this the elector will automatically monitor the leader status and
|
||||||
|
* perform re-election if necessary<br/>
|
||||||
|
* The app could potentially start off in standby mode and ignore the
|
||||||
|
* becomeStandby call.
|
||||||
|
*
|
||||||
|
* @param data
|
||||||
|
* to be set by the app. non-null data must be set.
|
||||||
|
* @throws HadoopIllegalArgumentException
|
||||||
|
* if valid data is not supplied
|
||||||
|
*/
|
||||||
|
public synchronized void joinElection(byte[] data)
|
||||||
|
throws HadoopIllegalArgumentException {
|
||||||
|
LOG.debug("Attempting active election");
|
||||||
|
|
||||||
|
if (data == null) {
|
||||||
|
throw new HadoopIllegalArgumentException("data cannot be null");
|
||||||
|
}
|
||||||
|
|
||||||
|
appData = new byte[data.length];
|
||||||
|
System.arraycopy(data, 0, appData, 0, data.length);
|
||||||
|
|
||||||
|
joinElectionInternal();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Any service instance can drop out of the election by calling quitElection.
|
||||||
|
* <br/>
|
||||||
|
* This will lose any leader status, if held, and stop monitoring of the lock
|
||||||
|
* node. <br/>
|
||||||
|
* If the instance wants to participate in election again, then it needs to
|
||||||
|
* call joinElection(). <br/>
|
||||||
|
* This allows service instances to take themselves out of rotation for known
|
||||||
|
* impending unavailable states (e.g. long GC pause or software upgrade).
|
||||||
|
*/
|
||||||
|
public synchronized void quitElection() {
|
||||||
|
LOG.debug("Yielding from election");
|
||||||
|
reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Exception thrown when there is no active leader
|
||||||
|
*/
|
||||||
|
public class ActiveNotFoundException extends Exception {
|
||||||
|
private static final long serialVersionUID = 3505396722342846462L;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get data set by the active leader
|
||||||
|
*
|
||||||
|
* @return data set by the active instance
|
||||||
|
* @throws ActiveNotFoundException
|
||||||
|
* when there is no active leader
|
||||||
|
* @throws KeeperException
|
||||||
|
* other zookeeper operation errors
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws IOException
|
||||||
|
* when ZooKeeper connection could not be established
|
||||||
|
*/
|
||||||
|
public synchronized byte[] getActiveData() throws ActiveNotFoundException,
|
||||||
|
KeeperException, InterruptedException, IOException {
|
||||||
|
try {
|
||||||
|
if (zkClient == null) {
|
||||||
|
createConnection();
|
||||||
|
}
|
||||||
|
Stat stat = new Stat();
|
||||||
|
return zkClient.getData(zkLockFilePath, false, stat);
|
||||||
|
} catch(KeeperException e) {
|
||||||
|
Code code = e.code();
|
||||||
|
if (operationNodeDoesNotExist(code)) {
|
||||||
|
// handle the commonly expected cases that make sense for us
|
||||||
|
throw new ActiveNotFoundException();
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* interface implementation of Zookeeper callback for create
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void processResult(int rc, String path, Object ctx,
|
||||||
|
String name) {
|
||||||
|
LOG.debug("CreateNode result: " + rc + " for path: " + path
|
||||||
|
+ " connectionState: " + zkConnectionState);
|
||||||
|
if (zkClient == null) {
|
||||||
|
// zkClient is nulled before closing the connection
|
||||||
|
// this is the callback with session expired after we closed the session
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Code code = Code.get(rc);
|
||||||
|
if (operationSuccess(code)) {
|
||||||
|
// we successfully created the znode. we are the leader. start monitoring
|
||||||
|
becomeActive();
|
||||||
|
monitorActiveStatus();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (operationNodeExists(code)) {
|
||||||
|
if (createRetryCount == 0) {
|
||||||
|
// znode exists and we did not retry the operation. so a different
|
||||||
|
// instance has created it. become standby and monitor lock.
|
||||||
|
becomeStandby();
|
||||||
|
}
|
||||||
|
// if we had retried then the znode could have been created by our first
|
||||||
|
// attempt to the server (that we lost) and this node exists response is
|
||||||
|
// for the second attempt. verify this case via ephemeral node owner. this
|
||||||
|
// will happen on the callback for monitoring the lock.
|
||||||
|
monitorActiveStatus();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String errorMessage = "Received create error from Zookeeper. code:"
|
||||||
|
+ code.toString();
|
||||||
|
LOG.debug(errorMessage);
|
||||||
|
|
||||||
|
if (operationRetry(code)) {
|
||||||
|
if (createRetryCount < NUM_RETRIES) {
|
||||||
|
LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
|
||||||
|
++createRetryCount;
|
||||||
|
createNode();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
errorMessage = errorMessage
|
||||||
|
+ ". Not retrying further znode create connection errors.";
|
||||||
|
}
|
||||||
|
|
||||||
|
fatalError(errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* interface implementation of Zookeeper callback for monitor (exists)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void processResult(int rc, String path, Object ctx,
|
||||||
|
Stat stat) {
|
||||||
|
LOG.debug("StatNode result: " + rc + " for path: " + path
|
||||||
|
+ " connectionState: " + zkConnectionState);
|
||||||
|
if (zkClient == null) {
|
||||||
|
// zkClient is nulled before closing the connection
|
||||||
|
// this is the callback with session expired after we closed the session
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Code code = Code.get(rc);
|
||||||
|
if (operationSuccess(code)) {
|
||||||
|
// the following owner check completes verification in case the lock znode
|
||||||
|
// creation was retried
|
||||||
|
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
|
||||||
|
// we own the lock znode. so we are the leader
|
||||||
|
becomeActive();
|
||||||
|
} else {
|
||||||
|
// we dont own the lock znode. so we are a standby.
|
||||||
|
becomeStandby();
|
||||||
|
}
|
||||||
|
// the watch set by us will notify about changes
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (operationNodeDoesNotExist(code)) {
|
||||||
|
// the lock znode disappeared before we started monitoring it
|
||||||
|
enterNeutralMode();
|
||||||
|
joinElectionInternal();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
String errorMessage = "Received stat error from Zookeeper. code:"
|
||||||
|
+ code.toString();
|
||||||
|
LOG.debug(errorMessage);
|
||||||
|
|
||||||
|
if (operationRetry(code)) {
|
||||||
|
if (statRetryCount < NUM_RETRIES) {
|
||||||
|
++statRetryCount;
|
||||||
|
monitorNode();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
errorMessage = errorMessage
|
||||||
|
+ ". Not retrying further znode monitoring connection errors.";
|
||||||
|
}
|
||||||
|
|
||||||
|
fatalError(errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* interface implementation of Zookeeper watch events (connection and node)
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized void process(WatchedEvent event) {
|
||||||
|
Event.EventType eventType = event.getType();
|
||||||
|
LOG.debug("Watcher event type: " + eventType + " with state:"
|
||||||
|
+ event.getState() + " for path:" + event.getPath()
|
||||||
|
+ " connectionState: " + zkConnectionState);
|
||||||
|
if (zkClient == null) {
|
||||||
|
// zkClient is nulled before closing the connection
|
||||||
|
// this is the callback with session expired after we closed the session
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (eventType == Event.EventType.None) {
|
||||||
|
// the connection state has changed
|
||||||
|
switch (event.getState()) {
|
||||||
|
case SyncConnected:
|
||||||
|
// if the listener was asked to move to safe state then it needs to
|
||||||
|
// be undone
|
||||||
|
ConnectionState prevConnectionState = zkConnectionState;
|
||||||
|
zkConnectionState = ConnectionState.CONNECTED;
|
||||||
|
if (prevConnectionState == ConnectionState.DISCONNECTED) {
|
||||||
|
monitorActiveStatus();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case Disconnected:
|
||||||
|
// ask the app to move to safe state because zookeeper connection
|
||||||
|
// is not active and we dont know our state
|
||||||
|
zkConnectionState = ConnectionState.DISCONNECTED;
|
||||||
|
enterNeutralMode();
|
||||||
|
break;
|
||||||
|
case Expired:
|
||||||
|
// the connection got terminated because of session timeout
|
||||||
|
// call listener to reconnect
|
||||||
|
enterNeutralMode();
|
||||||
|
reJoinElection();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
fatalError("Unexpected Zookeeper watch event state: "
|
||||||
|
+ event.getState());
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// a watch on lock path in zookeeper has fired. so something has changed on
|
||||||
|
// the lock. ideally we should check that the path is the same as the lock
|
||||||
|
// path but trusting zookeeper for now
|
||||||
|
String path = event.getPath();
|
||||||
|
if (path != null) {
|
||||||
|
switch (eventType) {
|
||||||
|
case NodeDeleted:
|
||||||
|
if (state == State.ACTIVE) {
|
||||||
|
enterNeutralMode();
|
||||||
|
}
|
||||||
|
joinElectionInternal();
|
||||||
|
break;
|
||||||
|
case NodeDataChanged:
|
||||||
|
monitorActiveStatus();
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
|
||||||
|
monitorActiveStatus();
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// some unexpected error has occurred
|
||||||
|
fatalError("Unexpected watch error from Zookeeper");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a new zookeeper client instance. protected so that test class can
|
||||||
|
* inherit and pass in a mock object for zookeeper
|
||||||
|
*
|
||||||
|
* @return new zookeeper client instance
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
|
||||||
|
return new ZooKeeper(zkHostPort, zkSessionTimeout, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fatalError(String errorMessage) {
|
||||||
|
reset();
|
||||||
|
appClient.notifyFatalError(errorMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void monitorActiveStatus() {
|
||||||
|
LOG.debug("Monitoring active leader");
|
||||||
|
statRetryCount = 0;
|
||||||
|
monitorNode();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void joinElectionInternal() {
|
||||||
|
if (zkClient == null) {
|
||||||
|
if (!reEstablishSession()) {
|
||||||
|
fatalError("Failed to reEstablish connection with ZooKeeper");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
createRetryCount = 0;
|
||||||
|
createNode();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reJoinElection() {
|
||||||
|
LOG.debug("Trying to re-establish ZK session");
|
||||||
|
terminateConnection();
|
||||||
|
joinElectionInternal();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean reEstablishSession() {
|
||||||
|
int connectionRetryCount = 0;
|
||||||
|
boolean success = false;
|
||||||
|
while(!success && connectionRetryCount < NUM_RETRIES) {
|
||||||
|
LOG.debug("Establishing zookeeper connection");
|
||||||
|
try {
|
||||||
|
createConnection();
|
||||||
|
success = true;
|
||||||
|
} catch(IOException e) {
|
||||||
|
LOG.warn(e);
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch(InterruptedException e1) {
|
||||||
|
LOG.warn(e1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
++connectionRetryCount;
|
||||||
|
}
|
||||||
|
return success;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createConnection() throws IOException {
|
||||||
|
zkClient = getNewZooKeeper();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void terminateConnection() {
|
||||||
|
if (zkClient == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
LOG.debug("Terminating ZK connection");
|
||||||
|
ZooKeeper tempZk = zkClient;
|
||||||
|
zkClient = null;
|
||||||
|
try {
|
||||||
|
tempZk.close();
|
||||||
|
} catch(InterruptedException e) {
|
||||||
|
LOG.warn(e);
|
||||||
|
}
|
||||||
|
zkConnectionState = ConnectionState.TERMINATED;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reset() {
|
||||||
|
state = State.INIT;
|
||||||
|
terminateConnection();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void becomeActive() {
|
||||||
|
if (state != State.ACTIVE) {
|
||||||
|
LOG.debug("Becoming active");
|
||||||
|
state = State.ACTIVE;
|
||||||
|
appClient.becomeActive();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void becomeStandby() {
|
||||||
|
if (state != State.STANDBY) {
|
||||||
|
LOG.debug("Becoming standby");
|
||||||
|
state = State.STANDBY;
|
||||||
|
appClient.becomeStandby();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void enterNeutralMode() {
|
||||||
|
if (state != State.NEUTRAL) {
|
||||||
|
LOG.debug("Entering neutral mode");
|
||||||
|
state = State.NEUTRAL;
|
||||||
|
appClient.enterNeutralMode();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createNode() {
|
||||||
|
zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this,
|
||||||
|
null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void monitorNode() {
|
||||||
|
zkClient.exists(zkLockFilePath, true, this, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean operationSuccess(Code code) {
|
||||||
|
return (code == Code.OK);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean operationNodeExists(Code code) {
|
||||||
|
return (code == Code.NODEEXISTS);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean operationNodeDoesNotExist(Code code) {
|
||||||
|
return (code == Code.NONODE);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean operationRetry(Code code) {
|
||||||
|
switch (code) {
|
||||||
|
case CONNECTIONLOSS:
|
||||||
|
case OPERATIONTIMEOUT:
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,527 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ha;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.zookeeper.AsyncCallback;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.KeeperException.Code;
|
||||||
|
import org.apache.zookeeper.WatchedEvent;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.Watcher.Event;
|
||||||
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
import org.apache.zookeeper.ZooDefs.Ids;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
||||||
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
|
||||||
|
|
||||||
|
public class TestActiveStandbyElector {
|
||||||
|
|
||||||
|
static ZooKeeper mockZK;
|
||||||
|
static int count;
|
||||||
|
static ActiveStandbyElectorCallback mockApp;
|
||||||
|
static final byte[] data = new byte[8];
|
||||||
|
|
||||||
|
ActiveStandbyElectorTester elector;
|
||||||
|
|
||||||
|
class ActiveStandbyElectorTester extends ActiveStandbyElector {
|
||||||
|
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
|
||||||
|
List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException {
|
||||||
|
super(hostPort, timeout, parent, acl, app);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ZooKeeper getNewZooKeeper() {
|
||||||
|
++TestActiveStandbyElector.count;
|
||||||
|
return TestActiveStandbyElector.mockZK;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final String zkParentName = "/zookeeper";
|
||||||
|
private static final String zkLockPathName = "/zookeeper/"
|
||||||
|
+ ActiveStandbyElector.LOCKFILENAME;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws IOException {
|
||||||
|
count = 0;
|
||||||
|
mockZK = Mockito.mock(ZooKeeper.class);
|
||||||
|
mockApp = Mockito.mock(ActiveStandbyElectorCallback.class);
|
||||||
|
elector = new ActiveStandbyElectorTester("hostPort", 1000, zkParentName,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, mockApp);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that joinElection checks for null data
|
||||||
|
*/
|
||||||
|
@Test(expected = HadoopIllegalArgumentException.class)
|
||||||
|
public void testJoinElectionException() {
|
||||||
|
elector.joinElection(null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that joinElection tries to create ephemeral lock znode
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testJoinElection() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that successful znode create result becomes active and monitoring is
|
||||||
|
* started
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreateNodeResultBecomeActive() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
elector.processResult(Code.OK.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
// monitor callback verifies the leader is ephemeral owner of lock but does
|
||||||
|
// not call becomeActive since its already active
|
||||||
|
Stat stat = new Stat();
|
||||||
|
stat.setEphemeralOwner(1L);
|
||||||
|
Mockito.when(mockZK.getSessionId()).thenReturn(1L);
|
||||||
|
elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
|
||||||
|
// should not call neutral mode/standby/active
|
||||||
|
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
|
||||||
|
Mockito.verify(mockApp, Mockito.times(0)).becomeStandby();
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
|
||||||
|
// another joinElection not called.
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
|
||||||
|
// no new monitor called
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that znode create for existing node and no retry becomes standby and
|
||||||
|
* monitoring is started
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreateNodeResultBecomeStandby() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
|
||||||
|
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that znode create error result in fatal error
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreateNodeResultError() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
|
||||||
|
elector.processResult(Code.APIERROR.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
|
||||||
|
"Received create error from Zookeeper. code:APIERROR");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that retry of network errors verifies master by session id and
|
||||||
|
* becomes active if they match. monitoring is started.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreateNodeResultRetryBecomeActive() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
// 4 errors results in fatalError
|
||||||
|
Mockito
|
||||||
|
.verify(mockApp, Mockito.times(1))
|
||||||
|
.notifyFatalError(
|
||||||
|
"Received create error from Zookeeper. code:CONNECTIONLOSS. "+
|
||||||
|
"Not retrying further znode create connection errors.");
|
||||||
|
|
||||||
|
elector.joinElection(data);
|
||||||
|
// recreate connection via getNewZooKeeper
|
||||||
|
Assert.assertEquals(2, TestActiveStandbyElector.count);
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
Stat stat = new Stat();
|
||||||
|
stat.setEphemeralOwner(1L);
|
||||||
|
Mockito.when(mockZK.getSessionId()).thenReturn(1L);
|
||||||
|
elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
Mockito.verify(mockZK, Mockito.times(6)).create(zkLockPathName, data,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that retry of network errors verifies active by session id and
|
||||||
|
* becomes standby if they dont match. monitoring is started.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreateNodeResultRetryBecomeStandby() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
Stat stat = new Stat();
|
||||||
|
stat.setEphemeralOwner(0);
|
||||||
|
Mockito.when(mockZK.getSessionId()).thenReturn(1L);
|
||||||
|
elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that if create znode results in nodeexists and that znode is deleted
|
||||||
|
* before exists() watch is set then the return of the exists() method results
|
||||||
|
* in attempt to re-create the znode and become active
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCreateNodeResultRetryNoNode() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
elector.processResult(Code.NONODE.intValue(), zkLockPathName, null,
|
||||||
|
(Stat) null);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(4)).create(zkLockPathName, data,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that more than 3 network error retries result fatalError
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testStatNodeRetry() {
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
(Stat) null);
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
(Stat) null);
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
(Stat) null);
|
||||||
|
elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null,
|
||||||
|
(Stat) null);
|
||||||
|
Mockito
|
||||||
|
.verify(mockApp, Mockito.times(1))
|
||||||
|
.notifyFatalError(
|
||||||
|
"Received stat error from Zookeeper. code:CONNECTIONLOSS. "+
|
||||||
|
"Not retrying further znode monitoring connection errors.");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify error in exists() callback results in fatal error
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testStatNodeError() {
|
||||||
|
elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), zkLockPathName,
|
||||||
|
null, (Stat) null);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
|
||||||
|
"Received stat error from Zookeeper. code:RUNTIMEINCONSISTENCY");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify behavior of watcher.process callback with non-node event
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testProcessCallbackEventNone() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
|
||||||
|
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
|
||||||
|
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.None);
|
||||||
|
|
||||||
|
// first SyncConnected should not do anything
|
||||||
|
Mockito.when(mockEvent.getState()).thenReturn(
|
||||||
|
Event.KeeperState.SyncConnected);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
Mockito.verify(mockZK, Mockito.times(0)).exists(Mockito.anyString(),
|
||||||
|
Mockito.anyBoolean(), Mockito.<AsyncCallback.StatCallback> anyObject(),
|
||||||
|
Mockito.<Object> anyObject());
|
||||||
|
|
||||||
|
// disconnection should enter safe mode
|
||||||
|
Mockito.when(mockEvent.getState()).thenReturn(
|
||||||
|
Event.KeeperState.Disconnected);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
|
||||||
|
|
||||||
|
// re-connection should monitor master status
|
||||||
|
Mockito.when(mockEvent.getState()).thenReturn(
|
||||||
|
Event.KeeperState.SyncConnected);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
// session expired should enter safe mode and initiate re-election
|
||||||
|
// re-election checked via checking re-creation of new zookeeper and
|
||||||
|
// call to create lock znode
|
||||||
|
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
// already in safe mode above. should not enter safe mode again
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
|
||||||
|
// called getNewZooKeeper to create new session. first call was in
|
||||||
|
// constructor
|
||||||
|
Assert.assertEquals(2, TestActiveStandbyElector.count);
|
||||||
|
// once in initial joinElection and one now
|
||||||
|
Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
|
||||||
|
|
||||||
|
// create znode success. become master and monitor
|
||||||
|
elector.processResult(Code.OK.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
// error event results in fatal error
|
||||||
|
Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
|
||||||
|
"Unexpected Zookeeper watch event state: AuthFailed");
|
||||||
|
// only 1 state change callback is called at a time
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify behavior of watcher.process with node event
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testProcessCallbackEventNode() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
|
||||||
|
// make the object go into the monitoring state
|
||||||
|
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
|
||||||
|
Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName);
|
||||||
|
|
||||||
|
// monitoring should be setup again after event is received
|
||||||
|
Mockito.when(mockEvent.getType()).thenReturn(
|
||||||
|
Event.EventType.NodeDataChanged);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
// monitoring should be setup again after event is received
|
||||||
|
Mockito.when(mockEvent.getType()).thenReturn(
|
||||||
|
Event.EventType.NodeChildrenChanged);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
Mockito.verify(mockZK, Mockito.times(3)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
// lock node deletion when in standby mode should create znode again
|
||||||
|
// successful znode creation enters active state and sets monitor
|
||||||
|
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
// enterNeutralMode not called when app is standby and leader is lost
|
||||||
|
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
|
||||||
|
// once in initial joinElection() and one now
|
||||||
|
Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
|
||||||
|
elector.processResult(Code.OK.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(4)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
// lock node deletion in active mode should enter neutral mode and create
|
||||||
|
// znode again successful znode creation enters active state and sets
|
||||||
|
// monitor
|
||||||
|
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode();
|
||||||
|
// another joinElection called
|
||||||
|
Mockito.verify(mockZK, Mockito.times(3)).create(zkLockPathName, data,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
|
||||||
|
elector.processResult(Code.OK.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(2)).becomeActive();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
// bad path name results in fatal error
|
||||||
|
Mockito.when(mockEvent.getPath()).thenReturn(null);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError(
|
||||||
|
"Unexpected watch error from Zookeeper");
|
||||||
|
// fatal error means no new connection other than one from constructor
|
||||||
|
Assert.assertEquals(1, TestActiveStandbyElector.count);
|
||||||
|
// no new watches after fatal error
|
||||||
|
Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify becomeStandby is not called if already in standby
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSuccessiveStandbyCalls() {
|
||||||
|
elector.joinElection(data);
|
||||||
|
|
||||||
|
// make the object go into the monitoring standby state
|
||||||
|
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class);
|
||||||
|
Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName);
|
||||||
|
|
||||||
|
// notify node deletion
|
||||||
|
// monitoring should be setup again after event is received
|
||||||
|
Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted);
|
||||||
|
elector.process(mockEvent);
|
||||||
|
// is standby. no need to notify anything now
|
||||||
|
Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode();
|
||||||
|
// another joinElection called.
|
||||||
|
Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null);
|
||||||
|
// lost election
|
||||||
|
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
// still standby. so no need to notify again
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
|
||||||
|
// monitor is set again
|
||||||
|
Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify quit election terminates connection and there are no new watches.
|
||||||
|
* next call to joinElection creates new connection and performs election
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testQuitElection() throws InterruptedException {
|
||||||
|
elector.quitElection();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).close();
|
||||||
|
// no watches added
|
||||||
|
Mockito.verify(mockZK, Mockito.times(0)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
byte[] data = new byte[8];
|
||||||
|
elector.joinElection(data);
|
||||||
|
// getNewZooKeeper called 2 times. once in constructor and once now
|
||||||
|
Assert.assertEquals(2, TestActiveStandbyElector.count);
|
||||||
|
elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null,
|
||||||
|
zkLockPathName);
|
||||||
|
Mockito.verify(mockApp, Mockito.times(1)).becomeStandby();
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true,
|
||||||
|
elector, null);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify that receiveActiveData gives data when active exists, tells that
|
||||||
|
* active does not exist and reports error in getting active information
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws ActiveNotFoundException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testGetActiveData() throws ActiveNotFoundException,
|
||||||
|
KeeperException, InterruptedException, IOException {
|
||||||
|
// get valid active data
|
||||||
|
byte[] data = new byte[8];
|
||||||
|
Mockito.when(
|
||||||
|
mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
|
||||||
|
Mockito.<Stat> anyObject())).thenReturn(data);
|
||||||
|
Assert.assertEquals(data, elector.getActiveData());
|
||||||
|
Mockito.verify(mockZK, Mockito.times(1)).getData(
|
||||||
|
Mockito.eq(zkLockPathName), Mockito.eq(false),
|
||||||
|
Mockito.<Stat> anyObject());
|
||||||
|
|
||||||
|
// active does not exist
|
||||||
|
Mockito.when(
|
||||||
|
mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
|
||||||
|
Mockito.<Stat> anyObject())).thenThrow(
|
||||||
|
new KeeperException.NoNodeException());
|
||||||
|
try {
|
||||||
|
elector.getActiveData();
|
||||||
|
Assert.fail("ActiveNotFoundException expected");
|
||||||
|
} catch(ActiveNotFoundException e) {
|
||||||
|
Mockito.verify(mockZK, Mockito.times(2)).getData(
|
||||||
|
Mockito.eq(zkLockPathName), Mockito.eq(false),
|
||||||
|
Mockito.<Stat> anyObject());
|
||||||
|
}
|
||||||
|
|
||||||
|
// error getting active data rethrows keeperexception
|
||||||
|
try {
|
||||||
|
Mockito.when(
|
||||||
|
mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false),
|
||||||
|
Mockito.<Stat> anyObject())).thenThrow(
|
||||||
|
new KeeperException.AuthFailedException());
|
||||||
|
elector.getActiveData();
|
||||||
|
Assert.fail("KeeperException.AuthFailedException expected");
|
||||||
|
} catch(KeeperException.AuthFailedException ke) {
|
||||||
|
Mockito.verify(mockZK, Mockito.times(3)).getData(
|
||||||
|
Mockito.eq(zkLockPathName), Mockito.eq(false),
|
||||||
|
Mockito.<Stat> anyObject());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,223 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.ha;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.apache.zookeeper.ZooDefs.Ids;
|
||||||
|
import org.apache.zookeeper.data.ACL;
|
||||||
|
import org.apache.zookeeper.test.ClientBase;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for {@link ActiveStandbyElector} using real zookeeper.
|
||||||
|
*/
|
||||||
|
public class TestActiveStandbyElectorRealZK extends ClientBase {
|
||||||
|
static final int NUM_ELECTORS = 2;
|
||||||
|
static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS];
|
||||||
|
static int currentClientIndex = 0;
|
||||||
|
|
||||||
|
class ActiveStandbyElectorTesterRealZK extends ActiveStandbyElector {
|
||||||
|
ActiveStandbyElectorTesterRealZK(String hostPort, int timeout,
|
||||||
|
String parent, List<ACL> acl, ActiveStandbyElectorCallback app)
|
||||||
|
throws IOException {
|
||||||
|
super(hostPort, timeout, parent, acl, app);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ZooKeeper getNewZooKeeper() {
|
||||||
|
return TestActiveStandbyElectorRealZK.zkClient[
|
||||||
|
TestActiveStandbyElectorRealZK.currentClientIndex];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The class object runs on a thread and waits for a signal to start from the
|
||||||
|
* test object. On getting the signal it joins the election and thus by doing
|
||||||
|
* this on multiple threads we can test simultaneous attempts at leader lock
|
||||||
|
* creation. after joining the election, the object waits on a signal to exit.
|
||||||
|
* this signal comes when the object's elector has become a leader or there is
|
||||||
|
* an unexpected fatal error. this lets another thread object to become a
|
||||||
|
* leader.
|
||||||
|
*/
|
||||||
|
class ThreadRunner implements Runnable, ActiveStandbyElectorCallback {
|
||||||
|
int index;
|
||||||
|
TestActiveStandbyElectorRealZK test;
|
||||||
|
boolean wait = true;
|
||||||
|
|
||||||
|
ThreadRunner(int i, TestActiveStandbyElectorRealZK s) {
|
||||||
|
index = i;
|
||||||
|
test = s;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
LOG.info("starting " + index);
|
||||||
|
while(true) {
|
||||||
|
synchronized (test) {
|
||||||
|
// wait for test start signal to come
|
||||||
|
if (!test.start) {
|
||||||
|
try {
|
||||||
|
test.wait();
|
||||||
|
} catch(InterruptedException e) {
|
||||||
|
Assert.fail(e.getMessage());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// join election
|
||||||
|
byte[] data = new byte[8];
|
||||||
|
ActiveStandbyElector elector = test.elector[index];
|
||||||
|
LOG.info("joining " + index);
|
||||||
|
elector.joinElection(data);
|
||||||
|
try {
|
||||||
|
while(true) {
|
||||||
|
synchronized (this) {
|
||||||
|
// wait for elector to become active/fatal error
|
||||||
|
if (wait) {
|
||||||
|
// wait to become active
|
||||||
|
// wait capped at 30s to prevent hung test
|
||||||
|
wait(30000);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Thread.sleep(1000);
|
||||||
|
// quit election to allow other elector to become active
|
||||||
|
elector.quitElection();
|
||||||
|
} catch(InterruptedException e) {
|
||||||
|
Assert.fail(e.getMessage());
|
||||||
|
}
|
||||||
|
LOG.info("ending " + index);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void becomeActive() {
|
||||||
|
test.reportActive(index);
|
||||||
|
LOG.info("active " + index);
|
||||||
|
wait = false;
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void becomeStandby() {
|
||||||
|
test.reportStandby(index);
|
||||||
|
LOG.info("standby " + index);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void enterNeutralMode() {
|
||||||
|
LOG.info("neutral " + index);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized void notifyFatalError(String errorMessage) {
|
||||||
|
LOG.info("fatal " + index + " .Error message:" + errorMessage);
|
||||||
|
wait = false;
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean start = false;
|
||||||
|
int activeIndex = -1;
|
||||||
|
int standbyIndex = -1;
|
||||||
|
String parentDir = "/" + java.util.UUID.randomUUID().toString();
|
||||||
|
|
||||||
|
ActiveStandbyElector[] elector = new ActiveStandbyElector[NUM_ELECTORS];
|
||||||
|
ThreadRunner[] threadRunner = new ThreadRunner[NUM_ELECTORS];
|
||||||
|
Thread[] thread = new Thread[NUM_ELECTORS];
|
||||||
|
|
||||||
|
synchronized void reportActive(int index) {
|
||||||
|
if (activeIndex == -1) {
|
||||||
|
activeIndex = index;
|
||||||
|
} else {
|
||||||
|
// standby should become active
|
||||||
|
Assert.assertEquals(standbyIndex, index);
|
||||||
|
// old active should not become active
|
||||||
|
Assert.assertFalse(activeIndex == index);
|
||||||
|
}
|
||||||
|
activeIndex = index;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void reportStandby(int index) {
|
||||||
|
// only 1 standby should be reported and it should not be the same as active
|
||||||
|
Assert.assertEquals(-1, standbyIndex);
|
||||||
|
standbyIndex = index;
|
||||||
|
Assert.assertFalse(activeIndex == standbyIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* the test creates 2 electors which try to become active using a real
|
||||||
|
* zookeeper server. It verifies that 1 becomes active and 1 becomes standby.
|
||||||
|
* Upon becoming active the leader quits election and the test verifies that
|
||||||
|
* the standby now becomes active. these electors run on different threads and
|
||||||
|
* callback to the test class to report active and standby where the outcome
|
||||||
|
* is verified
|
||||||
|
*
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testActiveStandbyTransition() throws IOException,
|
||||||
|
InterruptedException, KeeperException {
|
||||||
|
LOG.info("starting test with parentDir:" + parentDir);
|
||||||
|
start = false;
|
||||||
|
byte[] data = new byte[8];
|
||||||
|
// create random working directory
|
||||||
|
createClient().create(parentDir, data, Ids.OPEN_ACL_UNSAFE,
|
||||||
|
CreateMode.PERSISTENT);
|
||||||
|
|
||||||
|
for(currentClientIndex = 0;
|
||||||
|
currentClientIndex < NUM_ELECTORS;
|
||||||
|
++currentClientIndex) {
|
||||||
|
LOG.info("creating " + currentClientIndex);
|
||||||
|
zkClient[currentClientIndex] = createClient();
|
||||||
|
threadRunner[currentClientIndex] = new ThreadRunner(currentClientIndex,
|
||||||
|
this);
|
||||||
|
elector[currentClientIndex] = new ActiveStandbyElectorTesterRealZK(
|
||||||
|
"hostPort", 1000, parentDir, Ids.OPEN_ACL_UNSAFE,
|
||||||
|
threadRunner[currentClientIndex]);
|
||||||
|
zkClient[currentClientIndex].register(elector[currentClientIndex]);
|
||||||
|
thread[currentClientIndex] = new Thread(threadRunner[currentClientIndex]);
|
||||||
|
thread[currentClientIndex].start();
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized (this) {
|
||||||
|
// signal threads to start
|
||||||
|
LOG.info("signaling threads");
|
||||||
|
start = true;
|
||||||
|
notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int i = 0; i < thread.length; i++) {
|
||||||
|
thread[i].join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue