diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 4afae423896..f7601771bca 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -27,6 +27,9 @@ Release 2.5.0 - UNRELEASED HADOOP-10503. Move junit up to v 4.11. (cnauroth) + HADOOP-10535. Make the retry numbers in ActiveStandbyElector configurable. + (jing9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 56a77b5f5e1..bf3b6cae40d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -199,6 +199,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { "ha.failover-controller.graceful-fence.connection.retries"; public static final int HA_FC_GRACEFUL_FENCE_CONNECTION_RETRIES_DEFAULT = 1; + /** number of zookeeper operation retry times in ActiveStandbyElector */ + public static final String HA_FC_ELECTOR_ZK_OP_RETRIES_KEY = + "ha.failover-controller.active-standby-elector.zk.op.retries"; + public static final int HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT = 3; + /* Timeout that the CLI (manual) FC waits for monitorHealth, getServiceState */ public static final String HA_FC_CLI_CHECK_TIMEOUT_KEY = "ha.failover-controller.cli-check.rpc-timeout.ms"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java index 9cc2ef77b96..123f119f709 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -143,7 +143,6 @@ public interface ActiveStandbyElectorCallback { public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); - static int NUM_RETRIES = 3; private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000; private static enum ConnectionState { @@ -170,6 +169,7 @@ static enum State { private final String zkLockFilePath; private final String zkBreadCrumbPath; private final String znodeWorkingDir; + private final int maxRetryNum; private Lock sessionReestablishLockForTests = new ReentrantLock(); private boolean wantToBeInElection; @@ -207,7 +207,7 @@ static enum State { public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List acl, List authInfo, - ActiveStandbyElectorCallback app) throws IOException, + ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException, HadoopIllegalArgumentException, KeeperException { if (app == null || acl == null || parentZnodeName == null || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) { @@ -220,7 +220,8 @@ public ActiveStandbyElector(String zookeeperHostPorts, appClient = app; znodeWorkingDir = parentZnodeName; zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME; - zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME; + zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME; + this.maxRetryNum = maxRetryNum; // createConnection for future API calls createConnection(); @@ -439,7 +440,7 @@ public synchronized void processResult(int rc, String path, Object ctx, LOG.debug(errorMessage); if (shouldRetry(code)) { - if (createRetryCount < NUM_RETRIES) { + if (createRetryCount < maxRetryNum) { LOG.debug("Retrying createNode createRetryCount: " + createRetryCount); ++createRetryCount; createLockNodeAsync(); @@ -500,7 +501,7 @@ public synchronized void processResult(int rc, String path, Object ctx, LOG.debug(errorMessage); if (shouldRetry(code)) { - if (statRetryCount < NUM_RETRIES) { + if (statRetryCount < maxRetryNum) { ++statRetryCount; monitorLockNodeAsync(); return; @@ -735,7 +736,7 @@ synchronized State getStateForTests() { private boolean reEstablishSession() { int connectionRetryCount = 0; boolean success = false; - while(!success && connectionRetryCount < NUM_RETRIES) { + while(!success && connectionRetryCount < maxRetryNum) { LOG.debug("Establishing zookeeper connection for " + this); try { createConnection(); @@ -972,14 +973,14 @@ public Void run() throws KeeperException, InterruptedException { }); } - private static T zkDoWithRetries(ZKAction action) - throws KeeperException, InterruptedException { + private T zkDoWithRetries(ZKAction action) throws KeeperException, + InterruptedException { int retry = 0; while (true) { try { return action.run(); } catch (KeeperException ke) { - if (shouldRetry(ke.code()) && ++retry < NUM_RETRIES) { + if (shouldRetry(ke.code()) && ++retry < maxRetryNum) { continue; } throw ke; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java index dd8ca8a6bdd..46c485b1d9a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -32,6 +32,7 @@ import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; @@ -341,10 +342,12 @@ private void initZK() throws HadoopIllegalArgumentException, IOException, Preconditions.checkArgument(zkTimeout > 0, "Invalid ZK session timeout %s", zkTimeout); - + int maxRetryNum = conf.getInt( + CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY, + CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT); elector = new ActiveStandbyElector(zkQuorum, zkTimeout, getParentZnode(), zkAcls, zkAuths, - new ElectorCallbacks()); + new ElectorCallbacks(), maxRetryNum); } private String getParentZnode() { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java index 9e3cc4162bc..2e578e2a7b8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java @@ -39,6 +39,7 @@ import org.mockito.Mockito; import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.util.ZKUtil.ZKAuthInfo; @@ -59,8 +60,9 @@ class ActiveStandbyElectorTester extends ActiveStandbyElector { ActiveStandbyElectorTester(String hostPort, int timeout, String parent, List acl, ActiveStandbyElectorCallback app) throws IOException, KeeperException { - super(hostPort, timeout, parent, acl, - Collections.emptyList(), app); + super(hostPort, timeout, parent, acl, Collections + . emptyList(), app, + CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT); } @Override @@ -715,7 +717,8 @@ public void testEnsureBaseNodeFails() throws Exception { public void testWithoutZKServer() throws Exception { try { new ActiveStandbyElector("127.0.0.1", 2000, ZK_PARENT_NAME, - Ids.OPEN_ACL_UNSAFE, Collections. emptyList(), mockApp); + Ids.OPEN_ACL_UNSAFE, Collections. emptyList(), mockApp, + CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT); Assert.fail("Did not throw zookeeper connection loss exceptions!"); } catch (KeeperException ke) { GenericTestUtils.assertExceptionContains( "ConnectionLoss", ke); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java index c4326842992..22d0e20de82 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; import org.apache.hadoop.ha.ActiveStandbyElector.State; import org.apache.hadoop.util.ZKUtil.ZKAuthInfo; @@ -70,9 +71,9 @@ public void setUp() throws Exception { for (int i = 0; i < NUM_ELECTORS; i++) { cbs[i] = Mockito.mock(ActiveStandbyElectorCallback.class); appDatas[i] = Ints.toByteArray(i); - electors[i] = new ActiveStandbyElector( - hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE, - Collections.emptyList(), cbs[i]); + electors[i] = new ActiveStandbyElector(hostPort, 5000, PARENT_DIR, + Ids.OPEN_ACL_UNSAFE, Collections. emptyList(), cbs[i], + CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT); } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java index 95874c42dfe..a3b8ec6ab37 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java @@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Time; import org.junit.After; @@ -126,8 +127,7 @@ public void testRandomHealthAndDisconnects() throws Exception { .when(cluster.getService(0).proxy).monitorHealth(); Mockito.doAnswer(new RandomlyThrow(1)) .when(cluster.getService(1).proxy).monitorHealth(); - ActiveStandbyElector.NUM_RETRIES = 100; - + conf.setInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY, 100); // Don't start until after the above mocking. Otherwise we can get // Mockito errors if the HM calls the proxy in the middle of // setting up the mock. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 9f18ffbf690..3cd986cf34d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.ActiveStandbyElector; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.ServiceFailedException; @@ -85,8 +86,11 @@ protected synchronized void serviceInit(Configuration conf) List zkAcls = RMZKUtils.getZKAcls(conf); List zkAuths = RMZKUtils.getZKAuths(conf); + int maxRetryNum = conf.getInt( + CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY, + CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT); elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, - electionZNode, zkAcls, zkAuths, this); + electionZNode, zkAcls, zkAuths, this, maxRetryNum); elector.ensureParentZNode(); if (!isParentZnodeSafe(clusterId)) {