YARN-4438. Implement RM leader election with curator. Contributed by Jian He
This commit is contained in:
parent
52b77577c4
commit
89022f8d4b
|
@ -86,6 +86,8 @@ Release 2.9.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
|
YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
|
||||||
|
|
||||||
|
YARN-4438. Implement RM leader election with curator. (Jian He via xgong)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -545,6 +545,11 @@ public class YarnConfiguration extends Configuration {
|
||||||
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
|
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
|
||||||
+ "failover-controller.active-standby-elector.zk.retries";
|
+ "failover-controller.active-standby-elector.zk.retries";
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public static final String CURATOR_LEADER_ELECTOR =
|
||||||
|
RM_HA_PREFIX + "curator-leader-elector.enabled";
|
||||||
|
public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
|
||||||
|
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
// RM state store configs
|
// RM state store configs
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
|
|
|
@ -90,6 +90,7 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
|
||||||
.add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
|
.add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
|
||||||
configurationPropsToSkipCompare
|
configurationPropsToSkipCompare
|
||||||
.add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
|
.add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
|
||||||
|
configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
|
||||||
|
|
||||||
// Ignore all YARN Application Timeline Service (version 1) properties
|
// Ignore all YARN Application Timeline Service (version 1) properties
|
||||||
configurationPrefixToSkipCompare.add("yarn.timeline-service.");
|
configurationPrefixToSkipCompare.add("yarn.timeline-service.");
|
||||||
|
|
|
@ -106,6 +106,7 @@ public class AdminService extends CompositeService implements
|
||||||
private String rmId;
|
private String rmId;
|
||||||
|
|
||||||
private boolean autoFailoverEnabled;
|
private boolean autoFailoverEnabled;
|
||||||
|
private boolean curatorEnabled;
|
||||||
private EmbeddedElectorService embeddedElector;
|
private EmbeddedElectorService embeddedElector;
|
||||||
|
|
||||||
private Server server;
|
private Server server;
|
||||||
|
@ -132,13 +133,16 @@ public class AdminService extends CompositeService implements
|
||||||
@Override
|
@Override
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
if (rmContext.isHAEnabled()) {
|
if (rmContext.isHAEnabled()) {
|
||||||
|
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
|
||||||
|
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
|
||||||
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
|
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
|
||||||
if (autoFailoverEnabled) {
|
if (autoFailoverEnabled && !curatorEnabled) {
|
||||||
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
|
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
|
||||||
embeddedElector = createEmbeddedElectorService();
|
embeddedElector = createEmbeddedElectorService();
|
||||||
addIfService(embeddedElector);
|
addIfService(embeddedElector);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
masterServiceBindAddress = conf.getSocketAddr(
|
masterServiceBindAddress = conf.getSocketAddr(
|
||||||
|
@ -319,7 +323,7 @@ public class AdminService extends CompositeService implements
|
||||||
rm.transitionToActive();
|
rm.transitionToActive();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
|
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
|
||||||
"", "RMHAProtocolService",
|
"", "RM",
|
||||||
"Exception transitioning to active");
|
"Exception transitioning to active");
|
||||||
throw new ServiceFailedException(
|
throw new ServiceFailedException(
|
||||||
"Error when transitioning to Active mode", e);
|
"Error when transitioning to Active mode", e);
|
||||||
|
@ -338,7 +342,7 @@ public class AdminService extends CompositeService implements
|
||||||
"Error on refreshAll during transistion to Active", e);
|
"Error on refreshAll during transistion to Active", e);
|
||||||
}
|
}
|
||||||
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
|
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
|
||||||
"RMHAProtocolService");
|
"RM");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -356,10 +360,10 @@ public class AdminService extends CompositeService implements
|
||||||
try {
|
try {
|
||||||
rm.transitionToStandby(true);
|
rm.transitionToStandby(true);
|
||||||
RMAuditLogger.logSuccess(user.getShortUserName(),
|
RMAuditLogger.logSuccess(user.getShortUserName(),
|
||||||
"transitionToStandby", "RMHAProtocolService");
|
"transitionToStandby", "RM");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
|
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
|
||||||
"", "RMHAProtocolService",
|
"", "RM",
|
||||||
"Exception transitioning to standby");
|
"Exception transitioning to standby");
|
||||||
throw new ServiceFailedException(
|
throw new ServiceFailedException(
|
||||||
"Error when transitioning to Standby mode", e);
|
"Error when transitioning to Standby mode", e);
|
||||||
|
@ -369,15 +373,28 @@ public class AdminService extends CompositeService implements
|
||||||
@Override
|
@Override
|
||||||
public synchronized HAServiceStatus getServiceStatus() throws IOException {
|
public synchronized HAServiceStatus getServiceStatus() throws IOException {
|
||||||
checkAccess("getServiceState");
|
checkAccess("getServiceState");
|
||||||
HAServiceState haState = rmContext.getHAServiceState();
|
if (curatorEnabled) {
|
||||||
HAServiceStatus ret = new HAServiceStatus(haState);
|
HAServiceStatus state;
|
||||||
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
|
if (rmContext.getLeaderElectorService().hasLeaderShip()) {
|
||||||
ret.setReadyToBecomeActive();
|
state = new HAServiceStatus(HAServiceState.ACTIVE);
|
||||||
|
} else {
|
||||||
|
state = new HAServiceStatus(HAServiceState.STANDBY);
|
||||||
|
}
|
||||||
|
// set empty string to avoid NPE at
|
||||||
|
// HAServiceProtocolServerSideTranslatorPB#getServiceStatus
|
||||||
|
state.setNotReadyToBecomeActive("");
|
||||||
|
return state;
|
||||||
} else {
|
} else {
|
||||||
ret.setNotReadyToBecomeActive("State is " + haState);
|
HAServiceState haState = rmContext.getHAServiceState();
|
||||||
|
HAServiceStatus ret = new HAServiceStatus(haState);
|
||||||
|
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
|
||||||
|
ret.setReadyToBecomeActive();
|
||||||
|
} else {
|
||||||
|
ret.setNotReadyToBecomeActive("State is " + haState);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
return ret;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
|
||||||
|
@ -837,6 +854,12 @@ public class AdminService extends CompositeService implements
|
||||||
} else if (!autoFailoverEnabled) {
|
} else if (!autoFailoverEnabled) {
|
||||||
return "Auto Failover is not enabled.";
|
return "Auto Failover is not enabled.";
|
||||||
}
|
}
|
||||||
return this.embeddedElector.getHAZookeeperConnectionState();
|
if (curatorEnabled) {
|
||||||
|
return "Connected to zookeeper : " + rmContext
|
||||||
|
.getLeaderElectorService().getCuratorClient().getZookeeperClient()
|
||||||
|
.isConnected();
|
||||||
|
} else {
|
||||||
|
return this.embeddedElector.getHAZookeeperConnectionState();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||||
|
import org.apache.curator.framework.recipes.leader.LeaderLatch;
|
||||||
|
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
|
||||||
|
import org.apache.curator.retry.RetryNTimes;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
|
||||||
|
public class LeaderElectorService extends AbstractService implements
|
||||||
|
LeaderLatchListener {
|
||||||
|
public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
|
||||||
|
private LeaderLatch leaderLatch;
|
||||||
|
private CuratorFramework curator;
|
||||||
|
private RMContext rmContext;
|
||||||
|
private String latchPath;
|
||||||
|
private String rmId;
|
||||||
|
|
||||||
|
public LeaderElectorService(RMContext rmContext) {
|
||||||
|
super(LeaderElectorService.class.getName());
|
||||||
|
this.rmContext = rmContext;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
|
||||||
|
Preconditions.checkNotNull(zkHostPort,
|
||||||
|
YarnConfiguration.RM_ZK_ADDRESS + " is not set");
|
||||||
|
|
||||||
|
rmId = HAUtil.getRMHAId(conf);
|
||||||
|
String clusterId = YarnConfiguration.getClusterId(conf);
|
||||||
|
|
||||||
|
int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
|
||||||
|
YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
|
||||||
|
int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
|
||||||
|
YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
|
||||||
|
|
||||||
|
String zkBasePath = conf.get(
|
||||||
|
YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
|
||||||
|
YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
|
||||||
|
latchPath = zkBasePath + "/" + clusterId;
|
||||||
|
|
||||||
|
curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
|
||||||
|
.retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
|
||||||
|
curator.start();
|
||||||
|
initAndStartLeaderLatch();
|
||||||
|
super.serviceInit(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initAndStartLeaderLatch() throws Exception {
|
||||||
|
leaderLatch = new LeaderLatch(curator, latchPath, rmId);
|
||||||
|
leaderLatch.addListener(this);
|
||||||
|
leaderLatch.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
closeLeaderLatch();
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean hasLeaderShip() {
|
||||||
|
return leaderLatch.hasLeadership();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void isLeader() {
|
||||||
|
LOG.info(rmId + "is elected leader, transitioning to active");
|
||||||
|
try {
|
||||||
|
rmContext.getRMAdminService().transitionToActive(
|
||||||
|
new HAServiceProtocol.StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info(rmId + " failed to transition to active, giving up leadership",
|
||||||
|
e);
|
||||||
|
notLeader();
|
||||||
|
reJoinElection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reJoinElection() {
|
||||||
|
try {
|
||||||
|
closeLeaderLatch();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
initAndStartLeaderLatch();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Fail to re-join election.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void closeLeaderLatch() throws IOException {
|
||||||
|
if (leaderLatch != null) {
|
||||||
|
leaderLatch.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public void notLeader() {
|
||||||
|
LOG.info(rmId + " relinquish leadership");
|
||||||
|
try {
|
||||||
|
rmContext.getRMAdminService().transitionToStandby(
|
||||||
|
new HAServiceProtocol.StateChangeRequestInfo(
|
||||||
|
HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info(rmId + " did not transition to standby successfully.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// only for testing
|
||||||
|
@VisibleForTesting
|
||||||
|
public CuratorFramework getCuratorClient() {
|
||||||
|
return this.curator;
|
||||||
|
}
|
||||||
|
}
|
|
@ -135,4 +135,8 @@ public interface RMContext {
|
||||||
PlacementManager getQueuePlacementManager();
|
PlacementManager getQueuePlacementManager();
|
||||||
|
|
||||||
void setQueuePlacementManager(PlacementManager placementMgr);
|
void setQueuePlacementManager(PlacementManager placementMgr);
|
||||||
|
|
||||||
|
void setLeaderElectorService(LeaderElectorService elector);
|
||||||
|
|
||||||
|
LeaderElectorService getLeaderElectorService();
|
||||||
}
|
}
|
||||||
|
|
|
@ -72,6 +72,7 @@ public class RMContextImpl implements RMContext {
|
||||||
|
|
||||||
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
|
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
|
||||||
private SystemMetricsPublisher systemMetricsPublisher;
|
private SystemMetricsPublisher systemMetricsPublisher;
|
||||||
|
private LeaderElectorService elector;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default constructor. To be used in conjunction with setter methods for
|
* Default constructor. To be used in conjunction with setter methods for
|
||||||
|
@ -133,6 +134,16 @@ public class RMContextImpl implements RMContext {
|
||||||
return this.rmDispatcher;
|
return this.rmDispatcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setLeaderElectorService(LeaderElectorService elector) {
|
||||||
|
this.elector = elector;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public LeaderElectorService getLeaderElectorService() {
|
||||||
|
return this.elector;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RMStateStore getStateStore() {
|
public RMStateStore getStateStore() {
|
||||||
return activeServiceContext.getStateStore();
|
return activeServiceContext.getStateStore();
|
||||||
|
|
|
@ -157,6 +157,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
private AppReportFetcher fetcher = null;
|
private AppReportFetcher fetcher = null;
|
||||||
protected ResourceTrackerService resourceTracker;
|
protected ResourceTrackerService resourceTracker;
|
||||||
private JvmPauseMonitor pauseMonitor;
|
private JvmPauseMonitor pauseMonitor;
|
||||||
|
private boolean curatorEnabled = false;
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
protected String webAppAddress;
|
protected String webAppAddress;
|
||||||
|
@ -228,6 +229,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
|
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
|
||||||
if (this.rmContext.isHAEnabled()) {
|
if (this.rmContext.isHAEnabled()) {
|
||||||
HAUtil.verifyAndSetConfiguration(this.conf);
|
HAUtil.verifyAndSetConfiguration(this.conf);
|
||||||
|
curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
|
||||||
|
YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
|
||||||
|
if (curatorEnabled) {
|
||||||
|
LeaderElectorService elector = new LeaderElectorService(rmContext);
|
||||||
|
addService(elector);
|
||||||
|
rmContext.setLeaderElectorService(elector);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set UGI and do login
|
// Set UGI and do login
|
||||||
|
@ -759,7 +767,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
// Transition to standby and reinit active services
|
// Transition to standby and reinit active services
|
||||||
LOG.info("Transitioning RM to Standby mode");
|
LOG.info("Transitioning RM to Standby mode");
|
||||||
transitionToStandby(true);
|
transitionToStandby(true);
|
||||||
adminService.resetLeaderElection();
|
if (curatorEnabled) {
|
||||||
|
rmContext.getLeaderElectorService().reJoinElection();
|
||||||
|
} else {
|
||||||
|
adminService.resetLeaderElection();
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.fatal("Failed to transition RM to Standby mode.");
|
LOG.fatal("Failed to transition RM to Standby mode.");
|
||||||
|
@ -996,7 +1008,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
* instance of {@link RMActiveServices} and initializes it.
|
* instance of {@link RMActiveServices} and initializes it.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
protected void createAndInitActiveServices() throws Exception {
|
protected void createAndInitActiveServices() {
|
||||||
activeServices = new RMActiveServices(this);
|
activeServices = new RMActiveServices(this);
|
||||||
activeServices.init(conf);
|
activeServices.init(conf);
|
||||||
}
|
}
|
||||||
|
@ -1016,14 +1028,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
* Helper method to stop {@link #activeServices}.
|
* Helper method to stop {@link #activeServices}.
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
void stopActiveServices() throws Exception {
|
void stopActiveServices() {
|
||||||
if (activeServices != null) {
|
if (activeServices != null) {
|
||||||
activeServices.stop();
|
activeServices.stop();
|
||||||
activeServices = null;
|
activeServices = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void reinitialize(boolean initialize) throws Exception {
|
void reinitialize(boolean initialize) {
|
||||||
ClusterMetrics.destroy();
|
ClusterMetrics.destroy();
|
||||||
QueueMetrics.clearQueueMetrics();
|
QueueMetrics.clearQueueMetrics();
|
||||||
if (initialize) {
|
if (initialize) {
|
||||||
|
@ -1042,7 +1054,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
LOG.info("Already in active state");
|
LOG.info("Already in active state");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Transitioning to active state");
|
LOG.info("Transitioning to active state");
|
||||||
|
|
||||||
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
|
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@ -1083,7 +1094,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
if (this.rmContext.isHAEnabled()) {
|
if (this.rmContext.isHAEnabled()) {
|
||||||
transitionToStandby(true);
|
transitionToStandby(false);
|
||||||
} else {
|
} else {
|
||||||
transitionToActive();
|
transitionToActive();
|
||||||
}
|
}
|
||||||
|
@ -1338,4 +1349,5 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
out.println(" "
|
out.println(" "
|
||||||
+ "[-remove-application-from-state-store <appId>]" + "\n");
|
+ "[-remove-application-from-state-store <appId>]" + "\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,269 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import com.google.common.base.Supplier;
|
||||||
|
import org.apache.curator.CuratorZookeeperClient;
|
||||||
|
import org.apache.curator.test.InstanceSpec;
|
||||||
|
import org.apache.curator.test.KillSession;
|
||||||
|
import org.apache.curator.test.TestingCluster;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
public class TestLeaderElectorService {
|
||||||
|
|
||||||
|
private static final String RM1_ADDRESS = "1.1.1.1:1";
|
||||||
|
private static final String RM1_NODE_ID = "rm1";
|
||||||
|
|
||||||
|
private static final String RM2_ADDRESS = "0.0.0.0:0";
|
||||||
|
private static final String RM2_NODE_ID = "rm2";
|
||||||
|
|
||||||
|
Configuration conf ;
|
||||||
|
MockRM rm1;
|
||||||
|
MockRM rm2;
|
||||||
|
TestingCluster zkCluster;
|
||||||
|
@Before
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
Logger rootLogger = LogManager.getRootLogger();
|
||||||
|
rootLogger.setLevel(Level.INFO);
|
||||||
|
conf = new Configuration();
|
||||||
|
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
|
||||||
|
conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
|
||||||
|
|
||||||
|
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
|
||||||
|
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
|
||||||
|
|
||||||
|
for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) {
|
||||||
|
conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
|
||||||
|
conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
|
||||||
|
}
|
||||||
|
zkCluster = new TestingCluster(3);
|
||||||
|
conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
|
||||||
|
zkCluster.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
if (rm1 != null) {
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
if (rm2 !=null) {
|
||||||
|
rm2.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. rm1 active
|
||||||
|
// 2. rm2 standby
|
||||||
|
// 3. stop rm1
|
||||||
|
// 4. rm2 become active
|
||||||
|
@Test (timeout = 20000)
|
||||||
|
public void testRMShutDownCauseFailover() throws Exception {
|
||||||
|
rm1 = startRM("rm1", HAServiceState.ACTIVE);
|
||||||
|
rm2 = startRM("rm2", HAServiceState.STANDBY);
|
||||||
|
|
||||||
|
// wait for some time to make sure rm2 will not become active;
|
||||||
|
Thread.sleep(5000);
|
||||||
|
waitFor(rm2, HAServiceState.STANDBY);
|
||||||
|
|
||||||
|
rm1.stop();
|
||||||
|
// rm2 should become active;
|
||||||
|
waitFor(rm2, HAServiceState.ACTIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. rm1 active
|
||||||
|
// 2. rm2 standby
|
||||||
|
// 3. submit a job to rm1 which triggers state-store failure.
|
||||||
|
// 4. rm2 become
|
||||||
|
@Test
|
||||||
|
public void testStateStoreFailureCauseFailover() throws Exception {
|
||||||
|
|
||||||
|
conf.set(YarnConfiguration.RM_HA_ID, "rm1");
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
||||||
|
@Override
|
||||||
|
public synchronized void storeApplicationStateInternal(ApplicationId
|
||||||
|
appId, ApplicationStateData appState) throws Exception{
|
||||||
|
throw new Exception("store app failure.");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
memStore.init(conf);
|
||||||
|
rm1 = new MockRM(conf, memStore);
|
||||||
|
rm1.init(conf);
|
||||||
|
rm1.start();
|
||||||
|
|
||||||
|
waitFor(rm1, HAServiceState.ACTIVE);
|
||||||
|
|
||||||
|
rm2 = startRM("rm2", HAServiceState.STANDBY);
|
||||||
|
|
||||||
|
// submit an app which will trigger state-store failure.
|
||||||
|
rm1.submitApp(200, "app1", "user1", null, "default", false);
|
||||||
|
waitFor(rm1, HAServiceState.STANDBY);
|
||||||
|
|
||||||
|
// rm2 should become active;
|
||||||
|
waitFor(rm2, HAServiceState.ACTIVE);
|
||||||
|
|
||||||
|
rm2.stop();
|
||||||
|
// rm1 will become active again
|
||||||
|
waitFor(rm1, HAServiceState.ACTIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. rm1 active
|
||||||
|
// 2. restart zk cluster
|
||||||
|
// 3. rm1 will first relinquish leadership and re-acquire leadership
|
||||||
|
@Test
|
||||||
|
public void testZKClusterDown() throws Exception {
|
||||||
|
rm1 = startRM("rm1", HAServiceState.ACTIVE);
|
||||||
|
|
||||||
|
// stop zk cluster
|
||||||
|
zkCluster.stop();
|
||||||
|
waitFor(rm1, HAServiceState.STANDBY);
|
||||||
|
|
||||||
|
Collection<InstanceSpec> instanceSpecs = zkCluster.getInstances();
|
||||||
|
zkCluster = new TestingCluster(instanceSpecs);
|
||||||
|
zkCluster.start();
|
||||||
|
// rm becomes active again
|
||||||
|
waitFor(rm1, HAServiceState.ACTIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. rm1 active
|
||||||
|
// 2. kill the zk session between the rm and zk cluster.
|
||||||
|
// 3. rm1 will first relinquish leadership and re-acquire leadership
|
||||||
|
@Test
|
||||||
|
public void testExpireCurrentZKSession() throws Exception{
|
||||||
|
|
||||||
|
rm1 = startRM("rm1", HAServiceState.ACTIVE);
|
||||||
|
|
||||||
|
LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
|
||||||
|
CuratorZookeeperClient client =
|
||||||
|
service.getCuratorClient().getZookeeperClient();
|
||||||
|
// this will expire current curator client session. curator will re-establish
|
||||||
|
// the session. RM will first relinquish leadership and re-acquire leadership
|
||||||
|
KillSession
|
||||||
|
.kill(client.getZooKeeper(), client.getCurrentConnectionString());
|
||||||
|
|
||||||
|
waitFor(rm1, HAServiceState.ACTIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. rm1 fail to become active.
|
||||||
|
// 2. rm1 will rejoin leader election and retry the leadership
|
||||||
|
@Test
|
||||||
|
public void testRMFailToTransitionToActive() throws Exception{
|
||||||
|
conf.set(YarnConfiguration.RM_HA_ID, "rm1");
|
||||||
|
final AtomicBoolean throwException = new AtomicBoolean(true);
|
||||||
|
Thread launchRM = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
rm1 = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
synchronized void transitionToActive() throws Exception {
|
||||||
|
if (throwException.get()) {
|
||||||
|
throw new Exception("Fail to transition to active");
|
||||||
|
} else {
|
||||||
|
super.transitionToActive();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm1.init(conf);
|
||||||
|
rm1.start();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
launchRM.start();
|
||||||
|
// wait some time, rm will keep retry the leadership;
|
||||||
|
Thread.sleep(5000);
|
||||||
|
throwException.set(false);
|
||||||
|
waitFor(rm1, HAServiceState.ACTIVE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 1. rm1 active
|
||||||
|
// 2. rm2 standby
|
||||||
|
// 3. kill the current connected zk instance
|
||||||
|
// 4. either rm1 or rm2 will become active.
|
||||||
|
@Test
|
||||||
|
public void testKillZKInstance() throws Exception {
|
||||||
|
rm1 = startRM("rm1", HAServiceState.ACTIVE);
|
||||||
|
rm2 = startRM("rm2", HAServiceState.STANDBY);
|
||||||
|
|
||||||
|
ZooKeeper zkClient =
|
||||||
|
rm1.getRMContext().getLeaderElectorService().getCuratorClient()
|
||||||
|
.getZookeeperClient().getZooKeeper();
|
||||||
|
InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
|
||||||
|
zkCluster.killServer(connectionInstance);
|
||||||
|
|
||||||
|
// wait for rm1 or rm2 to be active by randomness
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override public Boolean get() {
|
||||||
|
try {
|
||||||
|
HAServiceState rm1State =
|
||||||
|
rm1.getAdminService().getServiceStatus().getState();
|
||||||
|
HAServiceState rm2State =
|
||||||
|
rm2.getAdminService().getServiceStatus().getState();
|
||||||
|
return (rm1State.equals(HAServiceState.ACTIVE) && rm2State
|
||||||
|
.equals(HAServiceState.STANDBY)) || (
|
||||||
|
rm1State.equals(HAServiceState.STANDBY) && rm2State
|
||||||
|
.equals(HAServiceState.ACTIVE));
|
||||||
|
} catch (IOException e) {
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 2000, 15000);
|
||||||
|
}
|
||||||
|
|
||||||
|
private MockRM startRM(String rmId, HAServiceState state) throws Exception{
|
||||||
|
YarnConfiguration yarnConf = new YarnConfiguration(conf);
|
||||||
|
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||||
|
MockRM rm = new MockRM(yarnConf);
|
||||||
|
rm.init(yarnConf);
|
||||||
|
rm.start();
|
||||||
|
waitFor(rm, state);
|
||||||
|
return rm;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void waitFor(final MockRM rm,
|
||||||
|
final HAServiceState state)
|
||||||
|
throws TimeoutException, InterruptedException {
|
||||||
|
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||||
|
@Override public Boolean get() {
|
||||||
|
try {
|
||||||
|
return rm.getAdminService().getServiceStatus().getState()
|
||||||
|
.equals(state);
|
||||||
|
} catch (IOException e) {
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}, 2000, 15000);
|
||||||
|
}
|
||||||
|
}
|
|
@ -471,8 +471,12 @@ public class TestRMHA {
|
||||||
memStore.init(conf);
|
memStore.init(conf);
|
||||||
rm = new MockRM(conf, memStore) {
|
rm = new MockRM(conf, memStore) {
|
||||||
@Override
|
@Override
|
||||||
void stopActiveServices() throws Exception {
|
void stopActiveServices() {
|
||||||
Thread.sleep(10000);
|
try {
|
||||||
|
Thread.sleep(10000);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException (e);
|
||||||
|
}
|
||||||
super.stopActiveServices();
|
super.stopActiveServices();
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
Loading…
Reference in New Issue