From 71fdca460019b64ba58e11f2a78021626e9a2308 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 9 Dec 2016 16:38:49 -0800 Subject: [PATCH] YARN-5709. Cleanup leader election configs and pluggability. Contribtued by Karthik Kambatla (cherry picked from commit b817c565c8be1d4a682d119bfac6f43ee09e87f0) --- .../hadoop/yarn/conf/YarnConfiguration.java | 14 +++- ...iveStandbyElectorBasedElectorService.java} | 33 +++++--- .../server/resourcemanager/AdminService.java | 75 ++++--------------- ...e.java => CuratorBasedElectorService.java} | 48 +++++++----- .../resourcemanager/EmbeddedElector.java | 41 ++++++++++ .../server/resourcemanager/RMContext.java | 6 +- .../server/resourcemanager/RMContextImpl.java | 15 +++- .../resourcemanager/ResourceManager.java | 39 ++++++---- .../resourcemanager/webapp/RMWebApp.java | 3 +- .../webapp/dao/ClusterInfo.java | 2 +- .../yarn/server/resourcemanager/MockRM.java | 33 ++++++-- .../server/resourcemanager/RMHATestBase.java | 4 +- .../TestLeaderElectorService.java | 17 +++-- .../TestRMEmbeddedElector.java | 49 ++++++------ .../yarn/server/resourcemanager/TestRMHA.java | 35 ++++----- 15 files changed, 244 insertions(+), 170 deletions(-) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/{EmbeddedElectorService.java => ActiveStandbyElectorBasedElectorService.java} (91%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/{LeaderElectorService.java => CuratorBasedElectorService.java} (82%) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 2b7d985692a..983918d23f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ActiveStandbyElector; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -655,9 +656,20 @@ public class YarnConfiguration extends Configuration { public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX + "failover-controller.active-standby-elector.zk.retries"; - @Private + + /** + * Whether to use curator-based elector for leader election. + * + * @deprecated Eventually, we want to default to the curator-based + * implementation and remove the {@link ActiveStandbyElector} based + * implementation. We should remove this config then. + */ + @Unstable + @Deprecated public static final String CURATOR_LEADER_ELECTOR = RM_HA_PREFIX + "curator-leader-elector.enabled"; + @Private + @Unstable public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false; //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java similarity index 91% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java index 88d2e102562..751eedd9afb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java @@ -43,12 +43,16 @@ import java.util.List; import java.util.Timer; import java.util.TimerTask; +/** + * Leader election implementation that uses {@link ActiveStandbyElector}. + */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class EmbeddedElectorService extends AbstractService - implements ActiveStandbyElector.ActiveStandbyElectorCallback { - private static final Log LOG = - LogFactory.getLog(EmbeddedElectorService.class.getName()); +public class ActiveStandbyElectorBasedElectorService extends AbstractService + implements EmbeddedElector, + ActiveStandbyElector.ActiveStandbyElectorCallback { + private static final Log LOG = LogFactory.getLog( + ActiveStandbyElectorBasedElectorService.class.getName()); private static final HAServiceProtocol.StateChangeRequestInfo req = new HAServiceProtocol.StateChangeRequestInfo( HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC); @@ -62,19 +66,21 @@ public class EmbeddedElectorService extends AbstractService @VisibleForTesting final Object zkDisconnectLock = new Object(); - EmbeddedElectorService(RMContext rmContext) { - super(EmbeddedElectorService.class.getName()); + ActiveStandbyElectorBasedElectorService(RMContext rmContext) { + super(ActiveStandbyElectorBasedElectorService.class.getName()); this.rmContext = rmContext; } @Override protected void serviceInit(Configuration conf) throws Exception { - conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf); + conf = conf instanceof YarnConfiguration + ? conf + : new YarnConfiguration(conf); String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS); if (zkQuorum == null) { - throw new YarnRuntimeException("Embedded automatic failover " + + throw new YarnRuntimeException("Embedded automatic failover " + "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS + " is not set"); } @@ -199,7 +205,8 @@ public class EmbeddedElectorService extends AbstractService @Override public void notifyFatalError(String errorMessage) { rmContext.getDispatcher().getEventHandler().handle( - new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage)); + new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, + errorMessage)); } @Override @@ -249,12 +256,16 @@ public class EmbeddedElectorService extends AbstractService return true; } - public void resetLeaderElection() { + // EmbeddedElector methods + + @Override + public void rejoinElection() { elector.quitElection(false); elector.joinElection(localActiveNodeInfo); } - public String getHAZookeeperConnectionState() { + @Override + public String getZookeeperConnectionState() { return elector.getHAZookeeperConnectionState(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index c060659ddb8..028b6f07091 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -29,7 +29,6 @@ import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.ha.HAServiceProtocol; @@ -108,8 +107,6 @@ public class AdminService extends CompositeService implements private String rmId; private boolean autoFailoverEnabled; - private boolean curatorEnabled; - private EmbeddedElectorService embeddedElector; private Server server; @@ -134,18 +131,8 @@ public class AdminService extends CompositeService implements @Override public void serviceInit(Configuration conf) throws Exception { - if (rmContext.isHAEnabled()) { - curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, - YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); - autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf); - if (autoFailoverEnabled && !curatorEnabled) { - if (HAUtil.isAutomaticFailoverEmbedded(conf)) { - embeddedElector = createEmbeddedElectorService(); - addIfService(embeddedElector); - } - } - - } + autoFailoverEnabled = + rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf); masterServiceBindAddress = conf.getSocketAddr( YarnConfiguration.RM_BIND_HOST, @@ -228,17 +215,6 @@ public class AdminService extends CompositeService implements } } - protected EmbeddedElectorService createEmbeddedElectorService() { - return new EmbeddedElectorService(rmContext); - } - - @InterfaceAudience.Private - void resetLeaderElection() { - if (embeddedElector != null) { - embeddedElector.resetLeaderElection(); - } - } - private UserGroupInformation checkAccess(String method) throws IOException { return RMServerUtils.verifyAdminAccess(authorizer, method, LOG); } @@ -375,30 +351,24 @@ public class AdminService extends CompositeService implements } } + /** + * Return the HA status of this RM. This includes the current state and + * whether the RM is ready to become active. + * + * @return {@link HAServiceStatus} of the current RM + * @throws IOException if the caller does not have permissions + */ @Override public synchronized HAServiceStatus getServiceStatus() throws IOException { checkAccess("getServiceState"); - if (curatorEnabled) { - HAServiceStatus state; - if (rmContext.getLeaderElectorService().hasLeaderShip()) { - state = new HAServiceStatus(HAServiceState.ACTIVE); - } else { - state = new HAServiceStatus(HAServiceState.STANDBY); - } - // set empty string to avoid NPE at - // HAServiceProtocolServerSideTranslatorPB#getServiceStatus - state.setNotReadyToBecomeActive(""); - return state; + HAServiceState haState = rmContext.getHAServiceState(); + HAServiceStatus ret = new HAServiceStatus(haState); + if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { + ret.setReadyToBecomeActive(); } else { - HAServiceState haState = rmContext.getHAServiceState(); - HAServiceStatus ret = new HAServiceStatus(haState); - if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { - ret.setReadyToBecomeActive(); - } else { - ret.setNotReadyToBecomeActive("State is " + haState); - } - return ret; + ret.setNotReadyToBecomeActive("State is " + haState); } + return ret; } @Override @@ -926,19 +896,4 @@ public class AdminService extends CompositeService implements rmContext.getScheduler().setClusterMaxPriority(conf); } - - public String getHAZookeeperConnectionState() { - if (!rmContext.isHAEnabled()) { - return "ResourceManager HA is not enabled."; - } else if (!autoFailoverEnabled) { - return "Auto Failover is not enabled."; - } - if (curatorEnabled) { - return "Connected to zookeeper : " + rmContext - .getLeaderElectorService().getCuratorClient().getZookeeperClient() - .isConnected(); - } else { - return this.embeddedElector.getHAZookeeperConnectionState(); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java similarity index 82% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java index 8c1a6eb0fc4..bcdf48bb2ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java @@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatchListener; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.service.AbstractService; @@ -32,10 +34,15 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import java.io.IOException; - -public class LeaderElectorService extends AbstractService implements - LeaderLatchListener { - public static final Log LOG = LogFactory.getLog(LeaderElectorService.class); +/** + * Leader election implementation that uses Curator. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class CuratorBasedElectorService extends AbstractService + implements EmbeddedElector, LeaderLatchListener { + public static final Log LOG = + LogFactory.getLog(CuratorBasedElectorService.class); private LeaderLatch leaderLatch; private CuratorFramework curator; private RMContext rmContext; @@ -43,8 +50,8 @@ public class LeaderElectorService extends AbstractService implements private String rmId; private ResourceManager rm; - public LeaderElectorService(RMContext rmContext, ResourceManager rm) { - super(LeaderElectorService.class.getName()); + public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) { + super(CuratorBasedElectorService.class.getName()); this.rmContext = rmContext; this.rm = rm; } @@ -74,10 +81,22 @@ public class LeaderElectorService extends AbstractService implements super.serviceStop(); } - public boolean hasLeaderShip() { - return leaderLatch.hasLeadership(); + @Override + public void rejoinElection() { + try { + closeLeaderLatch(); + Thread.sleep(1000); + initAndStartLeaderLatch(); + } catch (Exception e) { + LOG.info("Fail to re-join election.", e); + } } + @Override + public String getZookeeperConnectionState() { + return "Connected to zookeeper : " + + curator.getZookeeperClient().isConnected(); + } @Override public void isLeader() { @@ -90,17 +109,7 @@ public class LeaderElectorService extends AbstractService implements LOG.info(rmId + " failed to transition to active, giving up leadership", e); notLeader(); - reJoinElection(); - } - } - - public void reJoinElection() { - try { - closeLeaderLatch(); - Thread.sleep(1000); - initAndStartLeaderLatch(); - } catch (Exception e) { - LOG.info("Fail to re-join election.", e); + rejoinElection(); } } @@ -109,6 +118,7 @@ public class LeaderElectorService extends AbstractService implements leaderLatch.close(); } } + @Override public void notLeader() { LOG.info(rmId + " relinquish leadership"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java new file mode 100644 index 00000000000..677ec85ffec --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.Service; + +/** + * Interface that all embedded leader electors must implement. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface EmbeddedElector extends Service{ + /** + * Leave and rejoin leader election. + */ + void rejoinElection(); + + /** + * Get information about the elector's connection to Zookeeper. + * + * @return zookeeper connection state + */ + String getZookeeperConnectionState(); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 45fe6cf6823..cc6ee5f230f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -139,13 +139,15 @@ public interface RMContext { void setQueuePlacementManager(PlacementManager placementMgr); - void setLeaderElectorService(LeaderElectorService elector); + void setLeaderElectorService(EmbeddedElector elector); - LeaderElectorService getLeaderElectorService(); + EmbeddedElector getLeaderElectorService(); QueueLimitCalculator getNodeManagerQueueLimitCalculator(); void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor); RMAppLifetimeMonitor getRMAppLifetimeMonitor(); + + String getHAZookeeperConnectionState(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index 1f2485f423b..981dad22f6c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -75,7 +75,7 @@ public class RMContextImpl implements RMContext { private RMApplicationHistoryWriter rmApplicationHistoryWriter; private SystemMetricsPublisher systemMetricsPublisher; - private LeaderElectorService elector; + private EmbeddedElector elector; private QueueLimitCalculator queueLimitCalculator; @@ -142,12 +142,12 @@ public class RMContextImpl implements RMContext { } @Override - public void setLeaderElectorService(LeaderElectorService elector) { + public void setLeaderElectorService(EmbeddedElector elector) { this.elector = elector; } @Override - public LeaderElectorService getLeaderElectorService() { + public EmbeddedElector getLeaderElectorService() { return this.elector; } @@ -500,4 +500,13 @@ public class RMContextImpl implements RMContext { public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { return this.activeServiceContext.getRMAppLifetimeMonitor(); } + + public String getHAZookeeperConnectionState() { + if (elector == null) { + return "Could not find leader elector. Verify both HA and automatic " + + "failover are enabled."; + } else { + return elector.getZookeeperConnectionState(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index c9cc8c57c99..d60dcdc5921 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -256,16 +256,17 @@ public class ResourceManager extends CompositeService implements Recoverable { this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); if (this.rmContext.isHAEnabled()) { HAUtil.verifyAndSetConfiguration(this.conf); - curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, - YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); - if (curatorEnabled) { - this.curator = createAndStartCurator(conf); - LeaderElectorService elector = new LeaderElectorService(rmContext, this); - addService(elector); + + // If the RM is configured to use an embedded leader elector, + // initialize the leader elector. + if (HAUtil.isAutomaticFailoverEnabled(conf) && + HAUtil.isAutomaticFailoverEmbedded(conf)) { + EmbeddedElector elector = createEmbeddedElector(); + addIfService(elector); rmContext.setLeaderElectorService(elector); } } - + // Set UGI and do login // If security is enabled, use login user // If security is not enabled, use current user @@ -305,6 +306,20 @@ public class ResourceManager extends CompositeService implements Recoverable { super.serviceInit(this.conf); } + protected EmbeddedElector createEmbeddedElector() throws IOException { + EmbeddedElector elector; + curatorEnabled = + conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, + YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); + if (curatorEnabled) { + this.curator = createAndStartCurator(conf); + elector = new CuratorBasedElectorService(rmContext, this); + } else { + elector = new ActiveStandbyElectorBasedElectorService(rmContext); + } + return elector; + } + public CuratorFramework createAndStartCurator(Configuration conf) throws IOException { String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); @@ -754,14 +769,12 @@ public class ResourceManager extends CompositeService implements Recoverable { // Transition to standby and reinit active services LOG.info("Transitioning RM to Standby mode"); transitionToStandby(true); - if (curatorEnabled) { - rmContext.getLeaderElectorService().reJoinElection(); - } else { - adminService.resetLeaderElection(); + EmbeddedElector elector = rmContext.getLeaderElectorService(); + if (elector != null) { + elector.rejoinElection(); } - return; } catch (Exception e) { - LOG.fatal("Failed to transition RM to Standby mode."); + LOG.fatal("Failed to transition RM to Standby mode.", e); ExitUtil.terminate(1, e); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java index 2d7139f228e..3367cf47334 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java @@ -121,8 +121,7 @@ public class RMWebApp extends WebApp implements YarnWebParams { } public String getHAZookeeperConnectionState() { - return rm.getRMContext().getRMAdminService() - .getHAZookeeperConnectionState(); + return getRMContext().getHAZookeeperConnectionState(); } public RMContext getRMContext() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java index 512a5c4182a..d815315b260 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java @@ -64,7 +64,7 @@ public class ClusterInfo { this.hadoopBuildVersion = VersionInfo.getBuildVersion(); this.hadoopVersionBuiltOn = VersionInfo.getDate(); this.haZooKeeperConnectionState = - rm.getRMContext().getRMAdminService().getHAZookeeperConnectionState(); + rm.getRMContext().getHAZookeeperConnectionState(); } public String getState() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index ea573e26ae1..a66b093a34c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -109,6 +109,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Assert; + @SuppressWarnings("unchecked") public class MockRM extends ResourceManager { @@ -123,6 +124,8 @@ public class MockRM extends ResourceManager { private final boolean useNullRMNodeLabelsManager; private boolean disableDrainEventsImplicitly; + private boolean useRealElector = false; + public MockRM() { this(new YarnConfiguration()); } @@ -132,13 +135,23 @@ public class MockRM extends ResourceManager { } public MockRM(Configuration conf, RMStateStore store) { - this(conf, store, true); + this(conf, store, true, false); } - + + public MockRM(Configuration conf, boolean useRealElector) { + this(conf, null, true, useRealElector); + } + public MockRM(Configuration conf, RMStateStore store, - boolean useNullRMNodeLabelsManager) { + boolean useRealElector) { + this(conf, store, true, useRealElector); + } + + public MockRM(Configuration conf, RMStateStore store, + boolean useNullRMNodeLabelsManager, boolean useRealElector) { super(); this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager; + this.useRealElector = useRealElector; init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); if (store != null) { setRMStateStore(store); @@ -192,6 +205,15 @@ public class MockRM extends ResourceManager { return new DrainDispatcher(); } + @Override + protected EmbeddedElector createEmbeddedElector() throws IOException { + if (useRealElector) { + return super.createEmbeddedElector(); + } else { + return null; + } + } + @Override protected EventHandler createSchedulerEventDispatcher() { return new EventHandler() { @@ -984,11 +1006,6 @@ public class MockRM extends ResourceManager { protected void stopServer() { // don't do anything } - - @Override - protected EmbeddedElectorService createEmbeddedElectorService() { - return null; - } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java index 6092f41705f..c9ce7d7a061 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java @@ -108,13 +108,13 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{ } protected void startRMs() throws IOException { - rm1 = new MockRM(confForRM1, null, false){ + rm1 = new MockRM(confForRM1, null, false, false){ @Override protected Dispatcher createDispatcher() { return new DrainDispatcher(); } }; - rm2 = new MockRM(confForRM2, null, false){ + rm2 = new MockRM(confForRM2, null, false, false){ @Override protected Dispatcher createDispatcher() { return new DrainDispatcher(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java index bb10041133d..121cacba5e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java @@ -63,7 +63,6 @@ public class TestLeaderElectorService { conf = new Configuration(); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true); - conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true); conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); @@ -121,7 +120,7 @@ public class TestLeaderElectorService { } }; memStore.init(conf); - rm1 = new MockRM(conf, memStore); + rm1 = new MockRM(conf, memStore, true); rm1.init(conf); rm1.start(); @@ -167,7 +166,8 @@ public class TestLeaderElectorService { rm1 = startRM("rm1", HAServiceState.ACTIVE); - LeaderElectorService service = rm1.getRMContext().getLeaderElectorService(); + CuratorBasedElectorService service = (CuratorBasedElectorService) + rm1.getRMContext().getLeaderElectorService(); CuratorZookeeperClient client = service.getCuratorClient().getZookeeperClient(); // this will expire current curator client session. curator will re-establish @@ -187,7 +187,7 @@ public class TestLeaderElectorService { Thread launchRM = new Thread() { @Override public void run() { - rm1 = new MockRM(conf) { + rm1 = new MockRM(conf, true) { @Override synchronized void transitionToActive() throws Exception { if (throwException.get()) { @@ -217,9 +217,12 @@ public class TestLeaderElectorService { rm1 = startRM("rm1", HAServiceState.ACTIVE); rm2 = startRM("rm2", HAServiceState.STANDBY); + CuratorBasedElectorService service = (CuratorBasedElectorService) + rm1.getRMContext().getLeaderElectorService(); + ZooKeeper zkClient = - rm1.getRMContext().getLeaderElectorService().getCuratorClient() - .getZookeeperClient().getZooKeeper(); + service.getCuratorClient().getZookeeperClient().getZooKeeper(); + InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient); zkCluster.killServer(connectionInstance); @@ -245,7 +248,7 @@ public class TestLeaderElectorService { private MockRM startRM(String rmId, HAServiceState state) throws Exception{ YarnConfiguration yarnConf = new YarnConfiguration(conf); yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); - MockRM rm = new MockRM(yarnConf); + MockRM rm = new MockRM(yarnConf, true); rm.init(yarnConf); rm.start(); waitFor(rm, state); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java index c6483470015..c4fcc5da4ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -128,7 +128,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50); when(rc.getRMAdminService()).thenReturn(as); - EmbeddedElectorService ees = new EmbeddedElectorService(rc); + ActiveStandbyElectorBasedElectorService + ees = new ActiveStandbyElectorBasedElectorService(rc); ees.init(myConf); ees.enterNeutralMode(); @@ -165,7 +166,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @throws InterruptedException if interrupted */ private void testCallbackSynchronizationActive(AdminService as, - EmbeddedElectorService ees) throws IOException, InterruptedException { + ActiveStandbyElectorBasedElectorService ees) + throws IOException, InterruptedException { ees.becomeActive(); Thread.sleep(100); @@ -184,7 +186,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @throws InterruptedException if interrupted */ private void testCallbackSynchronizationStandby(AdminService as, - EmbeddedElectorService ees) throws IOException, InterruptedException { + ActiveStandbyElectorBasedElectorService ees) + throws IOException, InterruptedException { ees.becomeStandby(); Thread.sleep(100); @@ -202,7 +205,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @throws InterruptedException if interrupted */ private void testCallbackSynchronizationNeutral(AdminService as, - EmbeddedElectorService ees) throws IOException, InterruptedException { + ActiveStandbyElectorBasedElectorService ees) + throws IOException, InterruptedException { ees.enterNeutralMode(); Thread.sleep(100); @@ -221,7 +225,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @throws InterruptedException if interrupted */ private void testCallbackSynchronizationTimingActive(AdminService as, - EmbeddedElectorService ees) throws IOException, InterruptedException { + ActiveStandbyElectorBasedElectorService ees) + throws IOException, InterruptedException { synchronized (ees.zkDisconnectLock) { // Sleep while holding the lock so that the timer thread can't do // anything when it runs. Sleep until we're pretty sure the timer thread @@ -251,7 +256,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { * @throws InterruptedException if interrupted */ private void testCallbackSynchronizationTimingStandby(AdminService as, - EmbeddedElectorService ees) throws IOException, InterruptedException { + ActiveStandbyElectorBasedElectorService ees) + throws IOException, InterruptedException { synchronized (ees.zkDisconnectLock) { // Sleep while holding the lock so that the timer thread can't do // anything when it runs. Sleep until we're pretty sure the timer thread @@ -284,25 +290,20 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes { } @Override - protected AdminService createAdminService() { - return new AdminService(MockRMWithElector.this, getRMContext()) { + protected EmbeddedElector createEmbeddedElector() { + return new ActiveStandbyElectorBasedElectorService(getRMContext()) { @Override - protected EmbeddedElectorService createEmbeddedElectorService() { - return new EmbeddedElectorService(getRMContext()) { - @Override - public void becomeActive() throws - ServiceFailedException { - try { - callbackCalled.set(true); - TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now"); - Thread.sleep(delayMs); - TestRMEmbeddedElector.LOG.info("Sleep done"); - } catch (InterruptedException e) { - e.printStackTrace(); - } - super.becomeActive(); - } - }; + public void becomeActive() throws + ServiceFailedException { + try { + callbackCalled.set(true); + TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now"); + Thread.sleep(delayMs); + TestRMEmbeddedElector.LOG.info("Sleep done"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + super.becomeActive(); } }; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index df1cdb551e3..3d08052d0f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -176,13 +176,13 @@ public class TestRMHA { * 1. Standby: Should be a no-op * 2. Active: Active services should start * 3. Active: Should be a no-op. - * While active, submit a couple of jobs + * While active, submit a couple of jobs * 4. Standby: Active services should stop * 5. Active: Active services should start * 6. Stop the RM: All services should stop and RM should not be ready to * become Active */ - @Test (timeout = 30000) + @Test(timeout = 30000) public void testFailoverAndTransitions() throws Exception { configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); Configuration conf = new YarnConfiguration(configuration); @@ -202,37 +202,37 @@ public class TestRMHA { checkMonitorHealth(); checkStandbyRMFunctionality(); verifyClusterMetrics(0, 0, 0, 0, 0, 0); - + // 1. Transition to Standby - must be a no-op rm.adminService.transitionToStandby(requestInfo); checkMonitorHealth(); checkStandbyRMFunctionality(); verifyClusterMetrics(0, 0, 0, 0, 0, 0); - + // 2. Transition to active rm.adminService.transitionToActive(requestInfo); checkMonitorHealth(); checkActiveRMFunctionality(); verifyClusterMetrics(1, 1, 1, 1, 2048, 1); - + // 3. Transition to active - no-op rm.adminService.transitionToActive(requestInfo); checkMonitorHealth(); checkActiveRMFunctionality(); verifyClusterMetrics(1, 2, 2, 2, 2048, 2); - + // 4. Transition to standby rm.adminService.transitionToStandby(requestInfo); checkMonitorHealth(); checkStandbyRMFunctionality(); verifyClusterMetrics(0, 0, 0, 0, 0, 0); - + // 5. Transition to active to check Active->Standby->Active works rm.adminService.transitionToActive(requestInfo); checkMonitorHealth(); checkActiveRMFunctionality(); verifyClusterMetrics(1, 1, 1, 1, 2048, 1); - + // 6. Stop the RM. All services should stop and RM should not be ready to // become active rm.stop(); @@ -338,7 +338,7 @@ public class TestRMHA { rm.adminService.transitionToStandby(requestInfo); rm.adminService.transitionToActive(requestInfo); rm.adminService.transitionToStandby(requestInfo); - + MyCountingDispatcher dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher(); assertTrue(!dispatcher.isStopped()); @@ -346,24 +346,24 @@ public class TestRMHA { rm.adminService.transitionToActive(requestInfo); assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, ((MyCountingDispatcher) rm.getRMContext().getDispatcher()) - .getEventHandlerCount()); + .getEventHandlerCount()); assertEquals(errorMessageForService, expectedServiceCount, rm.getServices().size()); - + // Keep the dispatcher reference before transitioning to standby dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher(); - - + + rm.adminService.transitionToStandby(requestInfo); assertEquals(errorMessageForEventHandler, expectedEventHandlerCount, ((MyCountingDispatcher) rm.getRMContext().getDispatcher()) - .getEventHandlerCount()); + .getEventHandlerCount()); assertEquals(errorMessageForService, expectedServiceCount, rm.getServices().size()); assertTrue(dispatcher.isStopped()); - + rm.stop(); } @@ -384,7 +384,8 @@ public class TestRMHA { assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM1_NODE_ID); //test if RM_HA_ID can not be found - configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID+ "," + RM3_NODE_ID); + configuration + .set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM3_NODE_ID); configuration.unset(YarnConfiguration.RM_HA_ID); conf = new YarnConfiguration(configuration); try { @@ -456,7 +457,7 @@ public class TestRMHA { checkActiveRMFunctionality(); } - @Test(timeout = 90000) + @Test public void testTransitionedToStandbyShouldNotHang() throws Exception { configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); Configuration conf = new YarnConfiguration(configuration);