HADOOP-10535. Merge r1589905 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1589906 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2014-04-24 23:05:25 +00:00
parent 1566fff15f
commit fa6c972283
8 changed files with 40 additions and 20 deletions

View File

@ -27,6 +27,9 @@ Release 2.5.0 - UNRELEASED
HADOOP-10503. Move junit up to v 4.11. (cnauroth) HADOOP-10503. Move junit up to v 4.11. (cnauroth)
HADOOP-10535. Make the retry numbers in ActiveStandbyElector configurable.
(jing9)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -199,6 +199,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
"ha.failover-controller.graceful-fence.connection.retries"; "ha.failover-controller.graceful-fence.connection.retries";
public static final int HA_FC_GRACEFUL_FENCE_CONNECTION_RETRIES_DEFAULT = 1; public static final int HA_FC_GRACEFUL_FENCE_CONNECTION_RETRIES_DEFAULT = 1;
/** number of zookeeper operation retry times in ActiveStandbyElector */
public static final String HA_FC_ELECTOR_ZK_OP_RETRIES_KEY =
"ha.failover-controller.active-standby-elector.zk.op.retries";
public static final int HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT = 3;
/* Timeout that the CLI (manual) FC waits for monitorHealth, getServiceState */ /* Timeout that the CLI (manual) FC waits for monitorHealth, getServiceState */
public static final String HA_FC_CLI_CHECK_TIMEOUT_KEY = public static final String HA_FC_CLI_CHECK_TIMEOUT_KEY =
"ha.failover-controller.cli-check.rpc-timeout.ms"; "ha.failover-controller.cli-check.rpc-timeout.ms";

View File

@ -143,7 +143,6 @@ public interface ActiveStandbyElectorCallback {
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
static int NUM_RETRIES = 3;
private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000; private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
private static enum ConnectionState { private static enum ConnectionState {
@ -170,6 +169,7 @@ static enum State {
private final String zkLockFilePath; private final String zkLockFilePath;
private final String zkBreadCrumbPath; private final String zkBreadCrumbPath;
private final String znodeWorkingDir; private final String znodeWorkingDir;
private final int maxRetryNum;
private Lock sessionReestablishLockForTests = new ReentrantLock(); private Lock sessionReestablishLockForTests = new ReentrantLock();
private boolean wantToBeInElection; private boolean wantToBeInElection;
@ -207,7 +207,7 @@ static enum State {
public ActiveStandbyElector(String zookeeperHostPorts, public ActiveStandbyElector(String zookeeperHostPorts,
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
List<ZKAuthInfo> authInfo, List<ZKAuthInfo> authInfo,
ActiveStandbyElectorCallback app) throws IOException, ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException,
HadoopIllegalArgumentException, KeeperException { HadoopIllegalArgumentException, KeeperException {
if (app == null || acl == null || parentZnodeName == null if (app == null || acl == null || parentZnodeName == null
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) { || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
@ -221,6 +221,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
znodeWorkingDir = parentZnodeName; znodeWorkingDir = parentZnodeName;
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME; zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME; zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
this.maxRetryNum = maxRetryNum;
// createConnection for future API calls // createConnection for future API calls
createConnection(); createConnection();
@ -439,7 +440,7 @@ public synchronized void processResult(int rc, String path, Object ctx,
LOG.debug(errorMessage); LOG.debug(errorMessage);
if (shouldRetry(code)) { if (shouldRetry(code)) {
if (createRetryCount < NUM_RETRIES) { if (createRetryCount < maxRetryNum) {
LOG.debug("Retrying createNode createRetryCount: " + createRetryCount); LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
++createRetryCount; ++createRetryCount;
createLockNodeAsync(); createLockNodeAsync();
@ -500,7 +501,7 @@ public synchronized void processResult(int rc, String path, Object ctx,
LOG.debug(errorMessage); LOG.debug(errorMessage);
if (shouldRetry(code)) { if (shouldRetry(code)) {
if (statRetryCount < NUM_RETRIES) { if (statRetryCount < maxRetryNum) {
++statRetryCount; ++statRetryCount;
monitorLockNodeAsync(); monitorLockNodeAsync();
return; return;
@ -735,7 +736,7 @@ synchronized State getStateForTests() {
private boolean reEstablishSession() { private boolean reEstablishSession() {
int connectionRetryCount = 0; int connectionRetryCount = 0;
boolean success = false; boolean success = false;
while(!success && connectionRetryCount < NUM_RETRIES) { while(!success && connectionRetryCount < maxRetryNum) {
LOG.debug("Establishing zookeeper connection for " + this); LOG.debug("Establishing zookeeper connection for " + this);
try { try {
createConnection(); createConnection();
@ -972,14 +973,14 @@ public Void run() throws KeeperException, InterruptedException {
}); });
} }
private static <T> T zkDoWithRetries(ZKAction<T> action) private <T> T zkDoWithRetries(ZKAction<T> action) throws KeeperException,
throws KeeperException, InterruptedException { InterruptedException {
int retry = 0; int retry = 0;
while (true) { while (true) {
try { try {
return action.run(); return action.run();
} catch (KeeperException ke) { } catch (KeeperException ke) {
if (shouldRetry(ke.code()) && ++retry < NUM_RETRIES) { if (shouldRetry(ke.code()) && ++retry < maxRetryNum) {
continue; continue;
} }
throw ke; throw ke;

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@ -341,10 +342,12 @@ private void initZK() throws HadoopIllegalArgumentException, IOException,
Preconditions.checkArgument(zkTimeout > 0, Preconditions.checkArgument(zkTimeout > 0,
"Invalid ZK session timeout %s", zkTimeout); "Invalid ZK session timeout %s", zkTimeout);
int maxRetryNum = conf.getInt(
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
elector = new ActiveStandbyElector(zkQuorum, elector = new ActiveStandbyElector(zkQuorum,
zkTimeout, getParentZnode(), zkAcls, zkAuths, zkTimeout, getParentZnode(), zkAcls, zkAuths,
new ElectorCallbacks()); new ElectorCallbacks(), maxRetryNum);
} }
private String getParentZnode() { private String getParentZnode() {

View File

@ -39,6 +39,7 @@
import org.mockito.Mockito; import org.mockito.Mockito;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo; import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
@ -59,8 +60,9 @@ class ActiveStandbyElectorTester extends ActiveStandbyElector {
ActiveStandbyElectorTester(String hostPort, int timeout, String parent, ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException, List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException,
KeeperException { KeeperException {
super(hostPort, timeout, parent, acl, super(hostPort, timeout, parent, acl, Collections
Collections.<ZKAuthInfo>emptyList(), app); .<ZKAuthInfo> emptyList(), app,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
} }
@Override @Override
@ -715,7 +717,8 @@ public void testEnsureBaseNodeFails() throws Exception {
public void testWithoutZKServer() throws Exception { public void testWithoutZKServer() throws Exception {
try { try {
new ActiveStandbyElector("127.0.0.1", 2000, ZK_PARENT_NAME, new ActiveStandbyElector("127.0.0.1", 2000, ZK_PARENT_NAME,
Ids.OPEN_ACL_UNSAFE, Collections.<ZKAuthInfo> emptyList(), mockApp); Ids.OPEN_ACL_UNSAFE, Collections.<ZKAuthInfo> emptyList(), mockApp,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
Assert.fail("Did not throw zookeeper connection loss exceptions!"); Assert.fail("Did not throw zookeeper connection loss exceptions!");
} catch (KeeperException ke) { } catch (KeeperException ke) {
GenericTestUtils.assertExceptionContains( "ConnectionLoss", ke); GenericTestUtils.assertExceptionContains( "ConnectionLoss", ke);

View File

@ -26,6 +26,7 @@
import java.util.UUID; import java.util.UUID;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.ActiveStandbyElector.State; import org.apache.hadoop.ha.ActiveStandbyElector.State;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo; import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
@ -70,9 +71,9 @@ public void setUp() throws Exception {
for (int i = 0; i < NUM_ELECTORS; i++) { for (int i = 0; i < NUM_ELECTORS; i++) {
cbs[i] = Mockito.mock(ActiveStandbyElectorCallback.class); cbs[i] = Mockito.mock(ActiveStandbyElectorCallback.class);
appDatas[i] = Ints.toByteArray(i); appDatas[i] = Ints.toByteArray(i);
electors[i] = new ActiveStandbyElector( electors[i] = new ActiveStandbyElector(hostPort, 5000, PARENT_DIR,
hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE, Ids.OPEN_ACL_UNSAFE, Collections.<ZKAuthInfo> emptyList(), cbs[i],
Collections.<ZKAuthInfo>emptyList(), cbs[i]); CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
} }
} }

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.After; import org.junit.After;
@ -126,8 +127,7 @@ public void testRandomHealthAndDisconnects() throws Exception {
.when(cluster.getService(0).proxy).monitorHealth(); .when(cluster.getService(0).proxy).monitorHealth();
Mockito.doAnswer(new RandomlyThrow(1)) Mockito.doAnswer(new RandomlyThrow(1))
.when(cluster.getService(1).proxy).monitorHealth(); .when(cluster.getService(1).proxy).monitorHealth();
ActiveStandbyElector.NUM_RETRIES = 100; conf.setInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY, 100);
// Don't start until after the above mocking. Otherwise we can get // Don't start until after the above mocking. Otherwise we can get
// Mockito errors if the HM calls the proxy in the middle of // Mockito errors if the HM calls the proxy in the middle of
// setting up the mock. // setting up the mock.

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.ActiveStandbyElector; import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.ServiceFailedException;
@ -85,8 +86,11 @@ protected synchronized void serviceInit(Configuration conf)
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf); List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf); List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
int maxRetryNum = conf.getInt(
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
electionZNode, zkAcls, zkAuths, this); electionZNode, zkAcls, zkAuths, this, maxRetryNum);
elector.ensureParentZNode(); elector.ensureParentZNode();
if (!isParentZnodeSafe(clusterId)) { if (!isParentZnodeSafe(clusterId)) {