YARN-5709. Cleanup leader election configs and pluggability. Contribtued by Karthik Kambatla

(cherry picked from commit b817c565c8be1d4a682d119bfac6f43ee09e87f0)
This commit is contained in:
Jian He 2016-12-09 16:38:49 -08:00
parent 292bd78b44
commit 71fdca4600
15 changed files with 244 additions and 170 deletions

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -655,9 +656,20 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
+ "failover-controller.active-standby-elector.zk.retries"; + "failover-controller.active-standby-elector.zk.retries";
@Private
/**
* Whether to use curator-based elector for leader election.
*
* @deprecated Eventually, we want to default to the curator-based
* implementation and remove the {@link ActiveStandbyElector} based
* implementation. We should remove this config then.
*/
@Unstable
@Deprecated
public static final String CURATOR_LEADER_ELECTOR = public static final String CURATOR_LEADER_ELECTOR =
RM_HA_PREFIX + "curator-leader-elector.enabled"; RM_HA_PREFIX + "curator-leader-elector.enabled";
@Private
@Unstable
public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false; public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
//////////////////////////////// ////////////////////////////////

View File

@ -43,12 +43,16 @@
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
/**
* Leader election implementation that uses {@link ActiveStandbyElector}.
*/
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class EmbeddedElectorService extends AbstractService public class ActiveStandbyElectorBasedElectorService extends AbstractService
implements ActiveStandbyElector.ActiveStandbyElectorCallback { implements EmbeddedElector,
private static final Log LOG = ActiveStandbyElector.ActiveStandbyElectorCallback {
LogFactory.getLog(EmbeddedElectorService.class.getName()); private static final Log LOG = LogFactory.getLog(
ActiveStandbyElectorBasedElectorService.class.getName());
private static final HAServiceProtocol.StateChangeRequestInfo req = private static final HAServiceProtocol.StateChangeRequestInfo req =
new HAServiceProtocol.StateChangeRequestInfo( new HAServiceProtocol.StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC); HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
@ -62,15 +66,17 @@ public class EmbeddedElectorService extends AbstractService
@VisibleForTesting @VisibleForTesting
final Object zkDisconnectLock = new Object(); final Object zkDisconnectLock = new Object();
EmbeddedElectorService(RMContext rmContext) { ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
super(EmbeddedElectorService.class.getName()); super(ActiveStandbyElectorBasedElectorService.class.getName());
this.rmContext = rmContext; this.rmContext = rmContext;
} }
@Override @Override
protected void serviceInit(Configuration conf) protected void serviceInit(Configuration conf)
throws Exception { throws Exception {
conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf); conf = conf instanceof YarnConfiguration
? conf
: new YarnConfiguration(conf);
String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS); String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
if (zkQuorum == null) { if (zkQuorum == null) {
@ -199,7 +205,8 @@ public void run() {
@Override @Override
public void notifyFatalError(String errorMessage) { public void notifyFatalError(String errorMessage) {
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage)); new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
errorMessage));
} }
@Override @Override
@ -249,12 +256,16 @@ private boolean isParentZnodeSafe(String clusterId)
return true; return true;
} }
public void resetLeaderElection() { // EmbeddedElector methods
@Override
public void rejoinElection() {
elector.quitElection(false); elector.quitElection(false);
elector.joinElection(localActiveNodeInfo); elector.joinElection(localActiveNodeInfo);
} }
public String getHAZookeeperConnectionState() { @Override
public String getZookeeperConnectionState() {
return elector.getHAZookeeperConnectionState(); return elector.getHAZookeeperConnectionState();
} }
} }

View File

@ -29,7 +29,6 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
@ -108,8 +107,6 @@ public class AdminService extends CompositeService implements
private String rmId; private String rmId;
private boolean autoFailoverEnabled; private boolean autoFailoverEnabled;
private boolean curatorEnabled;
private EmbeddedElectorService embeddedElector;
private Server server; private Server server;
@ -134,18 +131,8 @@ public AdminService(ResourceManager rm, RMContext rmContext) {
@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) { autoFailoverEnabled =
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
if (autoFailoverEnabled && !curatorEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
embeddedElector = createEmbeddedElectorService();
addIfService(embeddedElector);
}
}
}
masterServiceBindAddress = conf.getSocketAddr( masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_BIND_HOST,
@ -228,17 +215,6 @@ protected void stopServer() throws Exception {
} }
} }
protected EmbeddedElectorService createEmbeddedElectorService() {
return new EmbeddedElectorService(rmContext);
}
@InterfaceAudience.Private
void resetLeaderElection() {
if (embeddedElector != null) {
embeddedElector.resetLeaderElection();
}
}
private UserGroupInformation checkAccess(String method) throws IOException { private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAdminAccess(authorizer, method, LOG); return RMServerUtils.verifyAdminAccess(authorizer, method, LOG);
} }
@ -375,21 +351,16 @@ public synchronized void transitionToStandby(
} }
} }
/**
* Return the HA status of this RM. This includes the current state and
* whether the RM is ready to become active.
*
* @return {@link HAServiceStatus} of the current RM
* @throws IOException if the caller does not have permissions
*/
@Override @Override
public synchronized HAServiceStatus getServiceStatus() throws IOException { public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState"); checkAccess("getServiceState");
if (curatorEnabled) {
HAServiceStatus state;
if (rmContext.getLeaderElectorService().hasLeaderShip()) {
state = new HAServiceStatus(HAServiceState.ACTIVE);
} else {
state = new HAServiceStatus(HAServiceState.STANDBY);
}
// set empty string to avoid NPE at
// HAServiceProtocolServerSideTranslatorPB#getServiceStatus
state.setNotReadyToBecomeActive("");
return state;
} else {
HAServiceState haState = rmContext.getHAServiceState(); HAServiceState haState = rmContext.getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState); HAServiceStatus ret = new HAServiceStatus(haState);
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) { if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
@ -399,7 +370,6 @@ public synchronized HAServiceStatus getServiceStatus() throws IOException {
} }
return ret; return ret;
} }
}
@Override @Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
@ -926,19 +896,4 @@ private void refreshClusterMaxPriority() throws IOException, YarnException {
rmContext.getScheduler().setClusterMaxPriority(conf); rmContext.getScheduler().setClusterMaxPriority(conf);
} }
public String getHAZookeeperConnectionState() {
if (!rmContext.isHAEnabled()) {
return "ResourceManager HA is not enabled.";
} else if (!autoFailoverEnabled) {
return "Auto Failover is not enabled.";
}
if (curatorEnabled) {
return "Connected to zookeeper : " + rmContext
.getLeaderElectorService().getCuratorClient().getZookeeperClient()
.isConnected();
} else {
return this.embeddedElector.getHAZookeeperConnectionState();
}
}
} }

View File

@ -24,6 +24,8 @@
import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderLatch; import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener; import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
@ -32,10 +34,15 @@
import java.io.IOException; import java.io.IOException;
/**
public class LeaderElectorService extends AbstractService implements * Leader election implementation that uses Curator.
LeaderLatchListener { */
public static final Log LOG = LogFactory.getLog(LeaderElectorService.class); @InterfaceAudience.Private
@InterfaceStability.Unstable
public class CuratorBasedElectorService extends AbstractService
implements EmbeddedElector, LeaderLatchListener {
public static final Log LOG =
LogFactory.getLog(CuratorBasedElectorService.class);
private LeaderLatch leaderLatch; private LeaderLatch leaderLatch;
private CuratorFramework curator; private CuratorFramework curator;
private RMContext rmContext; private RMContext rmContext;
@ -43,8 +50,8 @@ public class LeaderElectorService extends AbstractService implements
private String rmId; private String rmId;
private ResourceManager rm; private ResourceManager rm;
public LeaderElectorService(RMContext rmContext, ResourceManager rm) { public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
super(LeaderElectorService.class.getName()); super(CuratorBasedElectorService.class.getName());
this.rmContext = rmContext; this.rmContext = rmContext;
this.rm = rm; this.rm = rm;
} }
@ -74,10 +81,22 @@ protected void serviceStop() throws Exception {
super.serviceStop(); super.serviceStop();
} }
public boolean hasLeaderShip() { @Override
return leaderLatch.hasLeadership(); public void rejoinElection() {
try {
closeLeaderLatch();
Thread.sleep(1000);
initAndStartLeaderLatch();
} catch (Exception e) {
LOG.info("Fail to re-join election.", e);
}
} }
@Override
public String getZookeeperConnectionState() {
return "Connected to zookeeper : " +
curator.getZookeeperClient().isConnected();
}
@Override @Override
public void isLeader() { public void isLeader() {
@ -90,17 +109,7 @@ public void isLeader() {
LOG.info(rmId + " failed to transition to active, giving up leadership", LOG.info(rmId + " failed to transition to active, giving up leadership",
e); e);
notLeader(); notLeader();
reJoinElection(); rejoinElection();
}
}
public void reJoinElection() {
try {
closeLeaderLatch();
Thread.sleep(1000);
initAndStartLeaderLatch();
} catch (Exception e) {
LOG.info("Fail to re-join election.", e);
} }
} }
@ -109,6 +118,7 @@ private void closeLeaderLatch() throws IOException {
leaderLatch.close(); leaderLatch.close();
} }
} }
@Override @Override
public void notLeader() { public void notLeader() {
LOG.info(rmId + " relinquish leadership"); LOG.info(rmId + " relinquish leadership");

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.service.Service;
/**
* Interface that all embedded leader electors must implement.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface EmbeddedElector extends Service{
/**
* Leave and rejoin leader election.
*/
void rejoinElection();
/**
* Get information about the elector's connection to Zookeeper.
*
* @return zookeeper connection state
*/
String getZookeeperConnectionState();
}

View File

@ -139,13 +139,15 @@ void setRMDelegatedNodeLabelsUpdater(
void setQueuePlacementManager(PlacementManager placementMgr); void setQueuePlacementManager(PlacementManager placementMgr);
void setLeaderElectorService(LeaderElectorService elector); void setLeaderElectorService(EmbeddedElector elector);
LeaderElectorService getLeaderElectorService(); EmbeddedElector getLeaderElectorService();
QueueLimitCalculator getNodeManagerQueueLimitCalculator(); QueueLimitCalculator getNodeManagerQueueLimitCalculator();
void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor); void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor);
RMAppLifetimeMonitor getRMAppLifetimeMonitor(); RMAppLifetimeMonitor getRMAppLifetimeMonitor();
String getHAZookeeperConnectionState();
} }

View File

@ -75,7 +75,7 @@ public class RMContextImpl implements RMContext {
private RMApplicationHistoryWriter rmApplicationHistoryWriter; private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher; private SystemMetricsPublisher systemMetricsPublisher;
private LeaderElectorService elector; private EmbeddedElector elector;
private QueueLimitCalculator queueLimitCalculator; private QueueLimitCalculator queueLimitCalculator;
@ -142,12 +142,12 @@ public Dispatcher getDispatcher() {
} }
@Override @Override
public void setLeaderElectorService(LeaderElectorService elector) { public void setLeaderElectorService(EmbeddedElector elector) {
this.elector = elector; this.elector = elector;
} }
@Override @Override
public LeaderElectorService getLeaderElectorService() { public EmbeddedElector getLeaderElectorService() {
return this.elector; return this.elector;
} }
@ -500,4 +500,13 @@ public void setRMAppLifetimeMonitor(
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() { public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.activeServiceContext.getRMAppLifetimeMonitor(); return this.activeServiceContext.getRMAppLifetimeMonitor();
} }
public String getHAZookeeperConnectionState() {
if (elector == null) {
return "Could not find leader elector. Verify both HA and automatic " +
"failover are enabled.";
} else {
return elector.getZookeeperConnectionState();
}
}
} }

View File

@ -256,12 +256,13 @@ protected void serviceInit(Configuration conf) throws Exception {
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf)); this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) { if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf); HAUtil.verifyAndSetConfiguration(this.conf);
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED); // If the RM is configured to use an embedded leader elector,
if (curatorEnabled) { // initialize the leader elector.
this.curator = createAndStartCurator(conf); if (HAUtil.isAutomaticFailoverEnabled(conf) &&
LeaderElectorService elector = new LeaderElectorService(rmContext, this); HAUtil.isAutomaticFailoverEmbedded(conf)) {
addService(elector); EmbeddedElector elector = createEmbeddedElector();
addIfService(elector);
rmContext.setLeaderElectorService(elector); rmContext.setLeaderElectorService(elector);
} }
} }
@ -305,6 +306,20 @@ protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(this.conf); super.serviceInit(this.conf);
} }
protected EmbeddedElector createEmbeddedElector() throws IOException {
EmbeddedElector elector;
curatorEnabled =
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
if (curatorEnabled) {
this.curator = createAndStartCurator(conf);
elector = new CuratorBasedElectorService(rmContext, this);
} else {
elector = new ActiveStandbyElectorBasedElectorService(rmContext);
}
return elector;
}
public CuratorFramework createAndStartCurator(Configuration conf) public CuratorFramework createAndStartCurator(Configuration conf)
throws IOException { throws IOException {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS); String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
@ -754,14 +769,12 @@ public void handleTransitionToStandBy() {
// Transition to standby and reinit active services // Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode"); LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true); transitionToStandby(true);
if (curatorEnabled) { EmbeddedElector elector = rmContext.getLeaderElectorService();
rmContext.getLeaderElectorService().reJoinElection(); if (elector != null) {
} else { elector.rejoinElection();
adminService.resetLeaderElection();
} }
return;
} catch (Exception e) { } catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode."); LOG.fatal("Failed to transition RM to Standby mode.", e);
ExitUtil.terminate(1, e); ExitUtil.terminate(1, e);
} }
} }

View File

@ -121,8 +121,7 @@ private String buildRedirectPath() {
} }
public String getHAZookeeperConnectionState() { public String getHAZookeeperConnectionState() {
return rm.getRMContext().getRMAdminService() return getRMContext().getHAZookeeperConnectionState();
.getHAZookeeperConnectionState();
} }
public RMContext getRMContext() { public RMContext getRMContext() {

View File

@ -64,7 +64,7 @@ public ClusterInfo(ResourceManager rm) {
this.hadoopBuildVersion = VersionInfo.getBuildVersion(); this.hadoopBuildVersion = VersionInfo.getBuildVersion();
this.hadoopVersionBuiltOn = VersionInfo.getDate(); this.hadoopVersionBuiltOn = VersionInfo.getDate();
this.haZooKeeperConnectionState = this.haZooKeeperConnectionState =
rm.getRMContext().getRMAdminService().getHAZookeeperConnectionState(); rm.getRMContext().getHAZookeeperConnectionState();
} }
public String getState() { public String getState() {

View File

@ -109,6 +109,7 @@
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.junit.Assert; import org.junit.Assert;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class MockRM extends ResourceManager { public class MockRM extends ResourceManager {
@ -123,6 +124,8 @@ public class MockRM extends ResourceManager {
private final boolean useNullRMNodeLabelsManager; private final boolean useNullRMNodeLabelsManager;
private boolean disableDrainEventsImplicitly; private boolean disableDrainEventsImplicitly;
private boolean useRealElector = false;
public MockRM() { public MockRM() {
this(new YarnConfiguration()); this(new YarnConfiguration());
} }
@ -132,13 +135,23 @@ public MockRM(Configuration conf) {
} }
public MockRM(Configuration conf, RMStateStore store) { public MockRM(Configuration conf, RMStateStore store) {
this(conf, store, true); this(conf, store, true, false);
}
public MockRM(Configuration conf, boolean useRealElector) {
this(conf, null, true, useRealElector);
} }
public MockRM(Configuration conf, RMStateStore store, public MockRM(Configuration conf, RMStateStore store,
boolean useNullRMNodeLabelsManager) { boolean useRealElector) {
this(conf, store, true, useRealElector);
}
public MockRM(Configuration conf, RMStateStore store,
boolean useNullRMNodeLabelsManager, boolean useRealElector) {
super(); super();
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager; this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
this.useRealElector = useRealElector;
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf)); init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
if (store != null) { if (store != null) {
setRMStateStore(store); setRMStateStore(store);
@ -192,6 +205,15 @@ protected Dispatcher createDispatcher() {
return new DrainDispatcher(); return new DrainDispatcher();
} }
@Override
protected EmbeddedElector createEmbeddedElector() throws IOException {
if (useRealElector) {
return super.createEmbeddedElector();
} else {
return null;
}
}
@Override @Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() { protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new EventHandler<SchedulerEvent>() { return new EventHandler<SchedulerEvent>() {
@ -984,11 +1006,6 @@ protected void startServer() {
protected void stopServer() { protected void stopServer() {
// don't do anything // don't do anything
} }
@Override
protected EmbeddedElectorService createEmbeddedElectorService() {
return null;
}
}; };
} }

View File

@ -108,13 +108,13 @@ protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
} }
protected void startRMs() throws IOException { protected void startRMs() throws IOException {
rm1 = new MockRM(confForRM1, null, false){ rm1 = new MockRM(confForRM1, null, false, false){
@Override @Override
protected Dispatcher createDispatcher() { protected Dispatcher createDispatcher() {
return new DrainDispatcher(); return new DrainDispatcher();
} }
}; };
rm2 = new MockRM(confForRM2, null, false){ rm2 = new MockRM(confForRM2, null, false, false){
@Override @Override
protected Dispatcher createDispatcher() { protected Dispatcher createDispatcher() {
return new DrainDispatcher(); return new DrainDispatcher();

View File

@ -63,7 +63,6 @@ public void setUp() throws Exception {
conf = new Configuration(); conf = new Configuration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true); conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@ -121,7 +120,7 @@ public synchronized void storeApplicationStateInternal(ApplicationId
} }
}; };
memStore.init(conf); memStore.init(conf);
rm1 = new MockRM(conf, memStore); rm1 = new MockRM(conf, memStore, true);
rm1.init(conf); rm1.init(conf);
rm1.start(); rm1.start();
@ -167,7 +166,8 @@ public void testExpireCurrentZKSession() throws Exception{
rm1 = startRM("rm1", HAServiceState.ACTIVE); rm1 = startRM("rm1", HAServiceState.ACTIVE);
LeaderElectorService service = rm1.getRMContext().getLeaderElectorService(); CuratorBasedElectorService service = (CuratorBasedElectorService)
rm1.getRMContext().getLeaderElectorService();
CuratorZookeeperClient client = CuratorZookeeperClient client =
service.getCuratorClient().getZookeeperClient(); service.getCuratorClient().getZookeeperClient();
// this will expire current curator client session. curator will re-establish // this will expire current curator client session. curator will re-establish
@ -187,7 +187,7 @@ public void testRMFailToTransitionToActive() throws Exception{
Thread launchRM = new Thread() { Thread launchRM = new Thread() {
@Override @Override
public void run() { public void run() {
rm1 = new MockRM(conf) { rm1 = new MockRM(conf, true) {
@Override @Override
synchronized void transitionToActive() throws Exception { synchronized void transitionToActive() throws Exception {
if (throwException.get()) { if (throwException.get()) {
@ -217,9 +217,12 @@ public void testKillZKInstance() throws Exception {
rm1 = startRM("rm1", HAServiceState.ACTIVE); rm1 = startRM("rm1", HAServiceState.ACTIVE);
rm2 = startRM("rm2", HAServiceState.STANDBY); rm2 = startRM("rm2", HAServiceState.STANDBY);
CuratorBasedElectorService service = (CuratorBasedElectorService)
rm1.getRMContext().getLeaderElectorService();
ZooKeeper zkClient = ZooKeeper zkClient =
rm1.getRMContext().getLeaderElectorService().getCuratorClient() service.getCuratorClient().getZookeeperClient().getZooKeeper();
.getZookeeperClient().getZooKeeper();
InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient); InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
zkCluster.killServer(connectionInstance); zkCluster.killServer(connectionInstance);
@ -245,7 +248,7 @@ public void testKillZKInstance() throws Exception {
private MockRM startRM(String rmId, HAServiceState state) throws Exception{ private MockRM startRM(String rmId, HAServiceState state) throws Exception{
YarnConfiguration yarnConf = new YarnConfiguration(conf); YarnConfiguration yarnConf = new YarnConfiguration(conf);
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId); yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
MockRM rm = new MockRM(yarnConf); MockRM rm = new MockRM(yarnConf, true);
rm.init(yarnConf); rm.init(yarnConf);
rm.start(); rm.start();
waitFor(rm, state); waitFor(rm, state);

View File

@ -128,7 +128,8 @@ private void testCallbackSynchronization(SyncTestType type)
myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50); myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
when(rc.getRMAdminService()).thenReturn(as); when(rc.getRMAdminService()).thenReturn(as);
EmbeddedElectorService ees = new EmbeddedElectorService(rc); ActiveStandbyElectorBasedElectorService
ees = new ActiveStandbyElectorBasedElectorService(rc);
ees.init(myConf); ees.init(myConf);
ees.enterNeutralMode(); ees.enterNeutralMode();
@ -165,7 +166,8 @@ private void testCallbackSynchronization(SyncTestType type)
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
*/ */
private void testCallbackSynchronizationActive(AdminService as, private void testCallbackSynchronizationActive(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException { ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
ees.becomeActive(); ees.becomeActive();
Thread.sleep(100); Thread.sleep(100);
@ -184,7 +186,8 @@ private void testCallbackSynchronizationActive(AdminService as,
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
*/ */
private void testCallbackSynchronizationStandby(AdminService as, private void testCallbackSynchronizationStandby(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException { ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
ees.becomeStandby(); ees.becomeStandby();
Thread.sleep(100); Thread.sleep(100);
@ -202,7 +205,8 @@ private void testCallbackSynchronizationStandby(AdminService as,
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
*/ */
private void testCallbackSynchronizationNeutral(AdminService as, private void testCallbackSynchronizationNeutral(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException { ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
ees.enterNeutralMode(); ees.enterNeutralMode();
Thread.sleep(100); Thread.sleep(100);
@ -221,7 +225,8 @@ private void testCallbackSynchronizationNeutral(AdminService as,
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
*/ */
private void testCallbackSynchronizationTimingActive(AdminService as, private void testCallbackSynchronizationTimingActive(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException { ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
synchronized (ees.zkDisconnectLock) { synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do // Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread // anything when it runs. Sleep until we're pretty sure the timer thread
@ -251,7 +256,8 @@ private void testCallbackSynchronizationTimingActive(AdminService as,
* @throws InterruptedException if interrupted * @throws InterruptedException if interrupted
*/ */
private void testCallbackSynchronizationTimingStandby(AdminService as, private void testCallbackSynchronizationTimingStandby(AdminService as,
EmbeddedElectorService ees) throws IOException, InterruptedException { ActiveStandbyElectorBasedElectorService ees)
throws IOException, InterruptedException {
synchronized (ees.zkDisconnectLock) { synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do // Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread // anything when it runs. Sleep until we're pretty sure the timer thread
@ -284,11 +290,8 @@ private class MockRMWithElector extends MockRM {
} }
@Override @Override
protected AdminService createAdminService() { protected EmbeddedElector createEmbeddedElector() {
return new AdminService(MockRMWithElector.this, getRMContext()) { return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
@Override
protected EmbeddedElectorService createEmbeddedElectorService() {
return new EmbeddedElectorService(getRMContext()) {
@Override @Override
public void becomeActive() throws public void becomeActive() throws
ServiceFailedException { ServiceFailedException {
@ -304,7 +307,5 @@ public void becomeActive() throws
} }
}; };
} }
};
}
} }
} }

View File

@ -182,7 +182,7 @@ private void checkActiveRMWebServices() throws JSONException {
* 6. Stop the RM: All services should stop and RM should not be ready to * 6. Stop the RM: All services should stop and RM should not be ready to
* become Active * become Active
*/ */
@Test (timeout = 30000) @Test(timeout = 30000)
public void testFailoverAndTransitions() throws Exception { public void testFailoverAndTransitions() throws Exception {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
Configuration conf = new YarnConfiguration(configuration); Configuration conf = new YarnConfiguration(configuration);
@ -384,7 +384,8 @@ public void testHAIDLookup() {
assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM1_NODE_ID); assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM1_NODE_ID);
//test if RM_HA_ID can not be found //test if RM_HA_ID can not be found
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID+ "," + RM3_NODE_ID); configuration
.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM3_NODE_ID);
configuration.unset(YarnConfiguration.RM_HA_ID); configuration.unset(YarnConfiguration.RM_HA_ID);
conf = new YarnConfiguration(configuration); conf = new YarnConfiguration(configuration);
try { try {
@ -456,7 +457,7 @@ public synchronized void startInternal() throws Exception {
checkActiveRMFunctionality(); checkActiveRMFunctionality();
} }
@Test(timeout = 90000) @Test
public void testTransitionedToStandbyShouldNotHang() throws Exception { public void testTransitionedToStandbyShouldNotHang() throws Exception {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false); configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
Configuration conf = new YarnConfiguration(configuration); Configuration conf = new YarnConfiguration(configuration);