YARN-5709. Cleanup leader election configs and pluggability. Contributed by Karthik Kambatla
This commit is contained in:
parent
1e886e7520
commit
d7b80f1b02
|
@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
|
|||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.ActiveStandbyElector;
|
||||
import org.apache.hadoop.http.HttpConfig;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -550,9 +551,20 @@ public class YarnConfiguration extends Configuration {
|
|||
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
|
||||
+ "failover-controller.active-standby-elector.zk.retries";
|
||||
|
||||
@Private
|
||||
|
||||
/**
|
||||
* Whether to use curator-based elector for leader election.
|
||||
*
|
||||
* Deprecated Eventually, we want to default to the curator-based
|
||||
* implementation and remove the {@link ActiveStandbyElector} based
|
||||
* implementation. We should remove this config then.
|
||||
*/
|
||||
@Unstable
|
||||
@Deprecated
|
||||
public static final String CURATOR_LEADER_ELECTOR =
|
||||
RM_HA_PREFIX + "curator-leader-elector.enabled";
|
||||
@Private
|
||||
@Unstable
|
||||
public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
|
||||
|
||||
////////////////////////////////
|
||||
|
|
|
@ -43,12 +43,16 @@ import java.util.List;
|
|||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
/**
|
||||
* Leader election implementation that uses {@link ActiveStandbyElector}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class EmbeddedElectorService extends AbstractService
|
||||
implements ActiveStandbyElector.ActiveStandbyElectorCallback {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(EmbeddedElectorService.class.getName());
|
||||
public class ActiveStandbyElectorBasedElectorService extends AbstractService
|
||||
implements EmbeddedElector,
|
||||
ActiveStandbyElector.ActiveStandbyElectorCallback {
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
ActiveStandbyElectorBasedElectorService.class.getName());
|
||||
private static final HAServiceProtocol.StateChangeRequestInfo req =
|
||||
new HAServiceProtocol.StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
|
||||
|
@ -62,19 +66,21 @@ public class EmbeddedElectorService extends AbstractService
|
|||
@VisibleForTesting
|
||||
final Object zkDisconnectLock = new Object();
|
||||
|
||||
EmbeddedElectorService(RMContext rmContext) {
|
||||
super(EmbeddedElectorService.class.getName());
|
||||
ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
|
||||
super(ActiveStandbyElectorBasedElectorService.class.getName());
|
||||
this.rmContext = rmContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf)
|
||||
throws Exception {
|
||||
conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
|
||||
conf = conf instanceof YarnConfiguration
|
||||
? conf
|
||||
: new YarnConfiguration(conf);
|
||||
|
||||
String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
|
||||
if (zkQuorum == null) {
|
||||
throw new YarnRuntimeException("Embedded automatic failover " +
|
||||
throw new YarnRuntimeException("Embedded automatic failover " +
|
||||
"is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
|
||||
" is not set");
|
||||
}
|
||||
|
@ -117,7 +123,7 @@ public class EmbeddedElectorService extends AbstractService
|
|||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
/**
|
||||
/*
|
||||
* When error occurs in serviceInit(), serviceStop() can be called.
|
||||
* We need null check for the case.
|
||||
*/
|
||||
|
@ -199,7 +205,8 @@ public class EmbeddedElectorService extends AbstractService
|
|||
@Override
|
||||
public void notifyFatalError(String errorMessage) {
|
||||
rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
|
||||
new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
|
||||
errorMessage));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -249,12 +256,16 @@ public class EmbeddedElectorService extends AbstractService
|
|||
return true;
|
||||
}
|
||||
|
||||
public void resetLeaderElection() {
|
||||
// EmbeddedElector methods
|
||||
|
||||
@Override
|
||||
public void rejoinElection() {
|
||||
elector.quitElection(false);
|
||||
elector.joinElection(localActiveNodeInfo);
|
||||
}
|
||||
|
||||
public String getHAZookeeperConnectionState() {
|
||||
@Override
|
||||
public String getZookeeperConnectionState() {
|
||||
return elector.getHAZookeeperConnectionState();
|
||||
}
|
||||
}
|
|
@ -29,7 +29,6 @@ import java.util.Set;
|
|||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
|
@ -109,8 +108,6 @@ public class AdminService extends CompositeService implements
|
|||
private String rmId;
|
||||
|
||||
private boolean autoFailoverEnabled;
|
||||
private boolean curatorEnabled;
|
||||
private EmbeddedElectorService embeddedElector;
|
||||
|
||||
private Server server;
|
||||
|
||||
|
@ -135,18 +132,8 @@ public class AdminService extends CompositeService implements
|
|||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
if (rmContext.isHAEnabled()) {
|
||||
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
|
||||
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
|
||||
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
|
||||
if (autoFailoverEnabled && !curatorEnabled) {
|
||||
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
|
||||
embeddedElector = createEmbeddedElectorService();
|
||||
addIfService(embeddedElector);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
autoFailoverEnabled =
|
||||
rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
|
||||
|
||||
masterServiceBindAddress = conf.getSocketAddr(
|
||||
YarnConfiguration.RM_BIND_HOST,
|
||||
|
@ -230,17 +217,6 @@ public class AdminService extends CompositeService implements
|
|||
}
|
||||
}
|
||||
|
||||
protected EmbeddedElectorService createEmbeddedElectorService() {
|
||||
return new EmbeddedElectorService(rmContext);
|
||||
}
|
||||
|
||||
@InterfaceAudience.Private
|
||||
void resetLeaderElection() {
|
||||
if (embeddedElector != null) {
|
||||
embeddedElector.resetLeaderElection();
|
||||
}
|
||||
}
|
||||
|
||||
private UserGroupInformation checkAccess(String method) throws IOException {
|
||||
return RMServerUtils.verifyAdminAccess(authorizer, method, LOG);
|
||||
}
|
||||
|
@ -373,30 +349,24 @@ public class AdminService extends CompositeService implements
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the HA status of this RM. This includes the current state and
|
||||
* whether the RM is ready to become active.
|
||||
*
|
||||
* @return {@link HAServiceStatus} of the current RM
|
||||
* @throws IOException if the caller does not have permissions
|
||||
*/
|
||||
@Override
|
||||
public synchronized HAServiceStatus getServiceStatus() throws IOException {
|
||||
checkAccess("getServiceState");
|
||||
if (curatorEnabled) {
|
||||
HAServiceStatus state;
|
||||
if (rmContext.getLeaderElectorService().hasLeaderShip()) {
|
||||
state = new HAServiceStatus(HAServiceState.ACTIVE);
|
||||
} else {
|
||||
state = new HAServiceStatus(HAServiceState.STANDBY);
|
||||
}
|
||||
// set empty string to avoid NPE at
|
||||
// HAServiceProtocolServerSideTranslatorPB#getServiceStatus
|
||||
state.setNotReadyToBecomeActive("");
|
||||
return state;
|
||||
HAServiceState haState = rmContext.getHAServiceState();
|
||||
HAServiceStatus ret = new HAServiceStatus(haState);
|
||||
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
ret.setReadyToBecomeActive();
|
||||
} else {
|
||||
HAServiceState haState = rmContext.getHAServiceState();
|
||||
HAServiceStatus ret = new HAServiceStatus(haState);
|
||||
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
|
||||
ret.setReadyToBecomeActive();
|
||||
} else {
|
||||
ret.setNotReadyToBecomeActive("State is " + haState);
|
||||
}
|
||||
return ret;
|
||||
ret.setNotReadyToBecomeActive("State is " + haState);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -895,19 +865,4 @@ public class AdminService extends CompositeService implements
|
|||
throw logAndWrapException(e, user.getShortUserName(), operation, msg);
|
||||
}
|
||||
}
|
||||
|
||||
public String getHAZookeeperConnectionState() {
|
||||
if (!rmContext.isHAEnabled()) {
|
||||
return "ResourceManager HA is not enabled.";
|
||||
} else if (!autoFailoverEnabled) {
|
||||
return "Auto Failover is not enabled.";
|
||||
}
|
||||
if (curatorEnabled) {
|
||||
return "Connected to zookeeper : " + rmContext
|
||||
.getLeaderElectorService().getCuratorClient().getZookeeperClient()
|
||||
.isConnected();
|
||||
} else {
|
||||
return this.embeddedElector.getHAZookeeperConnectionState();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.recipes.leader.LeaderLatch;
|
||||
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
|
@ -32,10 +34,15 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
|
||||
import java.io.IOException;
|
||||
|
||||
|
||||
public class LeaderElectorService extends AbstractService implements
|
||||
LeaderLatchListener {
|
||||
public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
|
||||
/**
|
||||
* Leader election implementation that uses Curator.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class CuratorBasedElectorService extends AbstractService
|
||||
implements EmbeddedElector, LeaderLatchListener {
|
||||
public static final Log LOG =
|
||||
LogFactory.getLog(CuratorBasedElectorService.class);
|
||||
private LeaderLatch leaderLatch;
|
||||
private CuratorFramework curator;
|
||||
private RMContext rmContext;
|
||||
|
@ -43,8 +50,8 @@ public class LeaderElectorService extends AbstractService implements
|
|||
private String rmId;
|
||||
private ResourceManager rm;
|
||||
|
||||
public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
|
||||
super(LeaderElectorService.class.getName());
|
||||
public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
|
||||
super(CuratorBasedElectorService.class.getName());
|
||||
this.rmContext = rmContext;
|
||||
this.rm = rm;
|
||||
}
|
||||
|
@ -74,10 +81,22 @@ public class LeaderElectorService extends AbstractService implements
|
|||
super.serviceStop();
|
||||
}
|
||||
|
||||
public boolean hasLeaderShip() {
|
||||
return leaderLatch.hasLeadership();
|
||||
@Override
|
||||
public void rejoinElection() {
|
||||
try {
|
||||
closeLeaderLatch();
|
||||
Thread.sleep(1000);
|
||||
initAndStartLeaderLatch();
|
||||
} catch (Exception e) {
|
||||
LOG.info("Fail to re-join election.", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getZookeeperConnectionState() {
|
||||
return "Connected to zookeeper : " +
|
||||
curator.getZookeeperClient().isConnected();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void isLeader() {
|
||||
|
@ -90,17 +109,7 @@ public class LeaderElectorService extends AbstractService implements
|
|||
LOG.info(rmId + " failed to transition to active, giving up leadership",
|
||||
e);
|
||||
notLeader();
|
||||
reJoinElection();
|
||||
}
|
||||
}
|
||||
|
||||
public void reJoinElection() {
|
||||
try {
|
||||
closeLeaderLatch();
|
||||
Thread.sleep(1000);
|
||||
initAndStartLeaderLatch();
|
||||
} catch (Exception e) {
|
||||
LOG.info("Fail to re-join election.", e);
|
||||
rejoinElection();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,6 +118,7 @@ public class LeaderElectorService extends AbstractService implements
|
|||
leaderLatch.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notLeader() {
|
||||
LOG.info(rmId + " relinquish leadership");
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.service.Service;
|
||||
|
||||
/**
|
||||
* Interface that all embedded leader electors must implement.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public interface EmbeddedElector extends Service{
|
||||
/**
|
||||
* Leave and rejoin leader election.
|
||||
*/
|
||||
void rejoinElection();
|
||||
|
||||
/**
|
||||
* Get information about the elector's connection to Zookeeper.
|
||||
*
|
||||
* @return zookeeper connection state
|
||||
*/
|
||||
String getZookeeperConnectionState();
|
||||
}
|
|
@ -136,7 +136,9 @@ public interface RMContext {
|
|||
|
||||
void setQueuePlacementManager(PlacementManager placementMgr);
|
||||
|
||||
void setLeaderElectorService(LeaderElectorService elector);
|
||||
void setLeaderElectorService(EmbeddedElector elector);
|
||||
|
||||
LeaderElectorService getLeaderElectorService();
|
||||
EmbeddedElector getLeaderElectorService();
|
||||
|
||||
String getHAZookeeperConnectionState();
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ public class RMContextImpl implements RMContext {
|
|||
|
||||
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
|
||||
private SystemMetricsPublisher systemMetricsPublisher;
|
||||
private LeaderElectorService elector;
|
||||
private EmbeddedElector elector;
|
||||
|
||||
private final Object haServiceStateLock = new Object();
|
||||
|
||||
|
@ -137,12 +137,12 @@ public class RMContextImpl implements RMContext {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setLeaderElectorService(LeaderElectorService elector) {
|
||||
public void setLeaderElectorService(EmbeddedElector elector) {
|
||||
this.elector = elector;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LeaderElectorService getLeaderElectorService() {
|
||||
public EmbeddedElector getLeaderElectorService() {
|
||||
return this.elector;
|
||||
}
|
||||
|
||||
|
@ -474,4 +474,13 @@ public class RMContextImpl implements RMContext {
|
|||
public void setQueuePlacementManager(PlacementManager placementMgr) {
|
||||
this.activeServiceContext.setQueuePlacementManager(placementMgr);
|
||||
}
|
||||
|
||||
public String getHAZookeeperConnectionState() {
|
||||
if (elector == null) {
|
||||
return "Could not find leader elector. Verify both HA and automatic " +
|
||||
"failover are enabled.";
|
||||
} else {
|
||||
return elector.getZookeeperConnectionState();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -252,16 +252,17 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
|
||||
if (this.rmContext.isHAEnabled()) {
|
||||
HAUtil.verifyAndSetConfiguration(this.conf);
|
||||
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
|
||||
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
|
||||
if (curatorEnabled) {
|
||||
this.curator = createAndStartCurator(conf);
|
||||
LeaderElectorService elector = new LeaderElectorService(rmContext, this);
|
||||
addService(elector);
|
||||
|
||||
// If the RM is configured to use an embedded leader elector,
|
||||
// initialize the leader elector.
|
||||
if (HAUtil.isAutomaticFailoverEnabled(conf) &&
|
||||
HAUtil.isAutomaticFailoverEmbedded(conf)) {
|
||||
EmbeddedElector elector = createEmbeddedElector();
|
||||
addIfService(elector);
|
||||
rmContext.setLeaderElectorService(elector);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Set UGI and do login
|
||||
// If security is enabled, use login user
|
||||
// If security is not enabled, use current user
|
||||
|
@ -301,6 +302,22 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
super.serviceInit(this.conf);
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
protected EmbeddedElector createEmbeddedElector() throws Exception {
|
||||
EmbeddedElector elector;
|
||||
curatorEnabled =
|
||||
conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
|
||||
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
|
||||
if (curatorEnabled) {
|
||||
this.curator = createAndStartCurator(conf);
|
||||
elector = new CuratorBasedElectorService(rmContext, this);
|
||||
} else {
|
||||
elector = new ActiveStandbyElectorBasedElectorService(rmContext);
|
||||
}
|
||||
return elector;
|
||||
}
|
||||
|
||||
public CuratorFramework createAndStartCurator(Configuration conf)
|
||||
throws Exception {
|
||||
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
|
||||
|
@ -846,14 +863,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
// Transition to standby and reinit active services
|
||||
LOG.info("Transitioning RM to Standby mode");
|
||||
transitionToStandby(true);
|
||||
if (curatorEnabled) {
|
||||
rmContext.getLeaderElectorService().reJoinElection();
|
||||
} else {
|
||||
adminService.resetLeaderElection();
|
||||
EmbeddedElector elector = rmContext.getLeaderElectorService();
|
||||
if (elector != null) {
|
||||
elector.rejoinElection();
|
||||
}
|
||||
return;
|
||||
} catch (Exception e) {
|
||||
LOG.fatal("Failed to transition RM to Standby mode.");
|
||||
LOG.fatal("Failed to transition RM to Standby mode.", e);
|
||||
ExitUtil.terminate(1, e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,8 +120,7 @@ public class RMWebApp extends WebApp implements YarnWebParams {
|
|||
}
|
||||
|
||||
public String getHAZookeeperConnectionState() {
|
||||
return rm.getRMContext().getRMAdminService()
|
||||
.getHAZookeeperConnectionState();
|
||||
return getRMContext().getHAZookeeperConnectionState();
|
||||
}
|
||||
|
||||
public RMContext getRMContext() {
|
||||
|
|
|
@ -64,7 +64,7 @@ public class ClusterInfo {
|
|||
this.hadoopBuildVersion = VersionInfo.getBuildVersion();
|
||||
this.hadoopVersionBuiltOn = VersionInfo.getDate();
|
||||
this.haZooKeeperConnectionState =
|
||||
rm.getRMContext().getRMAdminService().getHAZookeeperConnectionState();
|
||||
rm.getRMContext().getHAZookeeperConnectionState();
|
||||
}
|
||||
|
||||
public String getState() {
|
||||
|
|
|
@ -101,6 +101,7 @@ import org.apache.log4j.LogManager;
|
|||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public class MockRM extends ResourceManager {
|
||||
|
||||
|
@ -113,6 +114,8 @@ public class MockRM extends ResourceManager {
|
|||
|
||||
final private boolean useNullRMNodeLabelsManager;
|
||||
|
||||
private boolean useRealElector = false;
|
||||
|
||||
public MockRM() {
|
||||
this(new YarnConfiguration());
|
||||
}
|
||||
|
@ -122,13 +125,23 @@ public class MockRM extends ResourceManager {
|
|||
}
|
||||
|
||||
public MockRM(Configuration conf, RMStateStore store) {
|
||||
this(conf, store, true);
|
||||
this(conf, store, true, false);
|
||||
}
|
||||
|
||||
|
||||
public MockRM(Configuration conf, boolean useRealElector) {
|
||||
this(conf, null, true, useRealElector);
|
||||
}
|
||||
|
||||
public MockRM(Configuration conf, RMStateStore store,
|
||||
boolean useNullRMNodeLabelsManager) {
|
||||
boolean useRealElector) {
|
||||
this(conf, store, true, useRealElector);
|
||||
}
|
||||
|
||||
public MockRM(Configuration conf, RMStateStore store,
|
||||
boolean useNullRMNodeLabelsManager, boolean useRealElector) {
|
||||
super();
|
||||
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
|
||||
this.useRealElector = useRealElector;
|
||||
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
|
||||
if(store != null) {
|
||||
setRMStateStore(store);
|
||||
|
@ -154,6 +167,15 @@ public class MockRM extends ResourceManager {
|
|||
return new DrainDispatcher();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EmbeddedElector createEmbeddedElector() throws Exception {
|
||||
if (useRealElector) {
|
||||
return super.createEmbeddedElector();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public void drainEvents() {
|
||||
Dispatcher rmDispatcher = getRmDispatcher();
|
||||
if (rmDispatcher instanceof DrainDispatcher) {
|
||||
|
@ -809,11 +831,6 @@ public class MockRM extends ResourceManager {
|
|||
protected void stopServer() {
|
||||
// don't do anything
|
||||
}
|
||||
|
||||
@Override
|
||||
protected EmbeddedElectorService createEmbeddedElectorService() {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -108,13 +108,13 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
|
|||
}
|
||||
|
||||
protected void startRMs() throws IOException {
|
||||
rm1 = new MockRM(confForRM1, null, false){
|
||||
rm1 = new MockRM(confForRM1, null, false, false){
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return new DrainDispatcher();
|
||||
}
|
||||
};
|
||||
rm2 = new MockRM(confForRM2, null, false){
|
||||
rm2 = new MockRM(confForRM2, null, false, false){
|
||||
@Override
|
||||
protected Dispatcher createDispatcher() {
|
||||
return new DrainDispatcher();
|
||||
|
|
|
@ -56,6 +56,7 @@ public class TestLeaderElectorService {
|
|||
MockRM rm1;
|
||||
MockRM rm2;
|
||||
TestingCluster zkCluster;
|
||||
@SuppressWarnings("deprecation")
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
|
@ -63,7 +64,6 @@ public class TestLeaderElectorService {
|
|||
conf = new Configuration();
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||
conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
|
||||
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
|
||||
|
||||
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||
|
@ -121,7 +121,7 @@ public class TestLeaderElectorService {
|
|||
}
|
||||
};
|
||||
memStore.init(conf);
|
||||
rm1 = new MockRM(conf, memStore);
|
||||
rm1 = new MockRM(conf, memStore, true);
|
||||
rm1.init(conf);
|
||||
rm1.start();
|
||||
|
||||
|
@ -167,7 +167,8 @@ public class TestLeaderElectorService {
|
|||
|
||||
rm1 = startRM("rm1", HAServiceState.ACTIVE);
|
||||
|
||||
LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
|
||||
CuratorBasedElectorService service = (CuratorBasedElectorService)
|
||||
rm1.getRMContext().getLeaderElectorService();
|
||||
CuratorZookeeperClient client =
|
||||
service.getCuratorClient().getZookeeperClient();
|
||||
// this will expire current curator client session. curator will re-establish
|
||||
|
@ -187,7 +188,7 @@ public class TestLeaderElectorService {
|
|||
Thread launchRM = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
rm1 = new MockRM(conf) {
|
||||
rm1 = new MockRM(conf, true) {
|
||||
@Override
|
||||
synchronized void transitionToActive() throws Exception {
|
||||
if (throwException.get()) {
|
||||
|
@ -217,9 +218,12 @@ public class TestLeaderElectorService {
|
|||
rm1 = startRM("rm1", HAServiceState.ACTIVE);
|
||||
rm2 = startRM("rm2", HAServiceState.STANDBY);
|
||||
|
||||
CuratorBasedElectorService service = (CuratorBasedElectorService)
|
||||
rm1.getRMContext().getLeaderElectorService();
|
||||
|
||||
ZooKeeper zkClient =
|
||||
rm1.getRMContext().getLeaderElectorService().getCuratorClient()
|
||||
.getZookeeperClient().getZooKeeper();
|
||||
service.getCuratorClient().getZookeeperClient().getZooKeeper();
|
||||
|
||||
InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
|
||||
zkCluster.killServer(connectionInstance);
|
||||
|
||||
|
@ -245,7 +249,7 @@ public class TestLeaderElectorService {
|
|||
private MockRM startRM(String rmId, HAServiceState state) throws Exception{
|
||||
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||
MockRM rm = new MockRM(yarnConf);
|
||||
MockRM rm = new MockRM(yarnConf, true);
|
||||
rm.init(yarnConf);
|
||||
rm.start();
|
||||
waitFor(rm, state);
|
||||
|
|
|
@ -128,7 +128,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|||
myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
|
||||
when(rc.getRMAdminService()).thenReturn(as);
|
||||
|
||||
EmbeddedElectorService ees = new EmbeddedElectorService(rc);
|
||||
ActiveStandbyElectorBasedElectorService
|
||||
ees = new ActiveStandbyElectorBasedElectorService(rc);
|
||||
ees.init(myConf);
|
||||
|
||||
ees.enterNeutralMode();
|
||||
|
@ -165,7 +166,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|||
* @throws InterruptedException if interrupted
|
||||
*/
|
||||
private void testCallbackSynchronizationActive(AdminService as,
|
||||
EmbeddedElectorService ees) throws IOException, InterruptedException {
|
||||
ActiveStandbyElectorBasedElectorService ees)
|
||||
throws IOException, InterruptedException {
|
||||
ees.becomeActive();
|
||||
|
||||
Thread.sleep(100);
|
||||
|
@ -184,7 +186,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|||
* @throws InterruptedException if interrupted
|
||||
*/
|
||||
private void testCallbackSynchronizationStandby(AdminService as,
|
||||
EmbeddedElectorService ees) throws IOException, InterruptedException {
|
||||
ActiveStandbyElectorBasedElectorService ees)
|
||||
throws IOException, InterruptedException {
|
||||
ees.becomeStandby();
|
||||
|
||||
Thread.sleep(100);
|
||||
|
@ -202,7 +205,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|||
* @throws InterruptedException if interrupted
|
||||
*/
|
||||
private void testCallbackSynchronizationNeutral(AdminService as,
|
||||
EmbeddedElectorService ees) throws IOException, InterruptedException {
|
||||
ActiveStandbyElectorBasedElectorService ees)
|
||||
throws IOException, InterruptedException {
|
||||
ees.enterNeutralMode();
|
||||
|
||||
Thread.sleep(100);
|
||||
|
@ -221,7 +225,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|||
* @throws InterruptedException if interrupted
|
||||
*/
|
||||
private void testCallbackSynchronizationTimingActive(AdminService as,
|
||||
EmbeddedElectorService ees) throws IOException, InterruptedException {
|
||||
ActiveStandbyElectorBasedElectorService ees)
|
||||
throws IOException, InterruptedException {
|
||||
synchronized (ees.zkDisconnectLock) {
|
||||
// Sleep while holding the lock so that the timer thread can't do
|
||||
// anything when it runs. Sleep until we're pretty sure the timer thread
|
||||
|
@ -251,7 +256,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|||
* @throws InterruptedException if interrupted
|
||||
*/
|
||||
private void testCallbackSynchronizationTimingStandby(AdminService as,
|
||||
EmbeddedElectorService ees) throws IOException, InterruptedException {
|
||||
ActiveStandbyElectorBasedElectorService ees)
|
||||
throws IOException, InterruptedException {
|
||||
synchronized (ees.zkDisconnectLock) {
|
||||
// Sleep while holding the lock so that the timer thread can't do
|
||||
// anything when it runs. Sleep until we're pretty sure the timer thread
|
||||
|
@ -284,25 +290,20 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected AdminService createAdminService() {
|
||||
return new AdminService(MockRMWithElector.this, getRMContext()) {
|
||||
protected EmbeddedElector createEmbeddedElector() {
|
||||
return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
|
||||
@Override
|
||||
protected EmbeddedElectorService createEmbeddedElectorService() {
|
||||
return new EmbeddedElectorService(getRMContext()) {
|
||||
@Override
|
||||
public void becomeActive() throws
|
||||
ServiceFailedException {
|
||||
try {
|
||||
callbackCalled.set(true);
|
||||
TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
|
||||
Thread.sleep(delayMs);
|
||||
TestRMEmbeddedElector.LOG.info("Sleep done");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
super.becomeActive();
|
||||
}
|
||||
};
|
||||
public void becomeActive() throws
|
||||
ServiceFailedException {
|
||||
try {
|
||||
callbackCalled.set(true);
|
||||
TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
|
||||
Thread.sleep(delayMs);
|
||||
TestRMEmbeddedElector.LOG.info("Sleep done");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
super.becomeActive();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
|
|
@ -176,13 +176,13 @@ public class TestRMHA {
|
|||
* 1. Standby: Should be a no-op
|
||||
* 2. Active: Active services should start
|
||||
* 3. Active: Should be a no-op.
|
||||
* While active, submit a couple of jobs
|
||||
* While active, submit a couple of jobs
|
||||
* 4. Standby: Active services should stop
|
||||
* 5. Active: Active services should start
|
||||
* 6. Stop the RM: All services should stop and RM should not be ready to
|
||||
* become Active
|
||||
*/
|
||||
@Test (timeout = 30000)
|
||||
@Test(timeout = 30000)
|
||||
public void testFailoverAndTransitions() throws Exception {
|
||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
|
@ -202,37 +202,37 @@ public class TestRMHA {
|
|||
checkMonitorHealth();
|
||||
checkStandbyRMFunctionality();
|
||||
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
|
||||
|
||||
|
||||
// 1. Transition to Standby - must be a no-op
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkStandbyRMFunctionality();
|
||||
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
|
||||
|
||||
|
||||
// 2. Transition to active
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkActiveRMFunctionality();
|
||||
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
|
||||
|
||||
|
||||
// 3. Transition to active - no-op
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkActiveRMFunctionality();
|
||||
verifyClusterMetrics(1, 2, 2, 2, 2048, 2);
|
||||
|
||||
|
||||
// 4. Transition to standby
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkStandbyRMFunctionality();
|
||||
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
|
||||
|
||||
|
||||
// 5. Transition to active to check Active->Standby->Active works
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
checkMonitorHealth();
|
||||
checkActiveRMFunctionality();
|
||||
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
|
||||
|
||||
|
||||
// 6. Stop the RM. All services should stop and RM should not be ready to
|
||||
// become active
|
||||
rm.stop();
|
||||
|
@ -338,7 +338,7 @@ public class TestRMHA {
|
|||
rm.adminService.transitionToStandby(requestInfo);
|
||||
rm.adminService.transitionToActive(requestInfo);
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
|
||||
|
||||
MyCountingDispatcher dispatcher =
|
||||
(MyCountingDispatcher) rm.getRMContext().getDispatcher();
|
||||
assertTrue(!dispatcher.isStopped());
|
||||
|
@ -346,24 +346,24 @@ public class TestRMHA {
|
|||
rm.adminService.transitionToActive(requestInfo);
|
||||
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
|
||||
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
|
||||
.getEventHandlerCount());
|
||||
.getEventHandlerCount());
|
||||
assertEquals(errorMessageForService, expectedServiceCount,
|
||||
rm.getServices().size());
|
||||
|
||||
|
||||
|
||||
// Keep the dispatcher reference before transitioning to standby
|
||||
dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
|
||||
|
||||
|
||||
|
||||
|
||||
rm.adminService.transitionToStandby(requestInfo);
|
||||
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
|
||||
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
|
||||
.getEventHandlerCount());
|
||||
.getEventHandlerCount());
|
||||
assertEquals(errorMessageForService, expectedServiceCount,
|
||||
rm.getServices().size());
|
||||
|
||||
assertTrue(dispatcher.isStopped());
|
||||
|
||||
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
|
@ -384,7 +384,8 @@ public class TestRMHA {
|
|||
assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM1_NODE_ID);
|
||||
|
||||
//test if RM_HA_ID can not be found
|
||||
configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID+ "," + RM3_NODE_ID);
|
||||
configuration
|
||||
.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM3_NODE_ID);
|
||||
configuration.unset(YarnConfiguration.RM_HA_ID);
|
||||
conf = new YarnConfiguration(configuration);
|
||||
try {
|
||||
|
@ -456,7 +457,7 @@ public class TestRMHA {
|
|||
checkActiveRMFunctionality();
|
||||
}
|
||||
|
||||
@Test(timeout = 90000)
|
||||
@Test
|
||||
public void testTransitionedToStandbyShouldNotHang() throws Exception {
|
||||
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
|
||||
Configuration conf = new YarnConfiguration(configuration);
|
||||
|
|
Loading…
Reference in New Issue