YARN-1481. Move internal services logic from AdminService to ResourceManager. (vinodkv via kasha)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1550168 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Karthik Kambatla 2013-12-11 15:15:35 +00:00
parent 29848761ea
commit 4e5ba02d6b
5 changed files with 120 additions and 86 deletions

View File

@ -136,6 +136,9 @@ Release 2.4.0 - UNRELEASED
YARN-1378. Implemented a cleaner of old finished applications from the RM
state-store. (Jian He via vinodkv)
YARN-1481. Move internal services logic from AdminService to ResourceManager.
(vinodkv via kasha)
OPTIMIZATIONS
BUG FIXES

View File

@ -21,8 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -43,7 +41,6 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
import org.apache.hadoop.yarn.exceptions.YarnException;
@ -66,6 +63,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMapp
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import com.google.protobuf.BlockingService;
public class AdminService extends AbstractService implements
HAServiceProtocol, ResourceManagerAdministrationProtocol {
@ -73,10 +72,6 @@ public class AdminService extends AbstractService implements
private final RMContext rmContext;
private final ResourceManager rm;
@VisibleForTesting
protected HAServiceProtocol.HAServiceState
haState = HAServiceProtocol.HAServiceState.INITIALIZING;
boolean haEnabled;
private Server server;
private InetSocketAddress masterServiceAddress;
@ -93,13 +88,6 @@ public class AdminService extends AbstractService implements
@Override
public synchronized void serviceInit(Configuration conf) throws Exception {
haEnabled = HAUtil.isHAEnabled(conf);
if (haEnabled) {
HAUtil.verifyAndSetConfiguration(conf);
rm.setConf(conf);
}
rm.createAndInitActiveServices();
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@ -112,11 +100,6 @@ public class AdminService extends AbstractService implements
@Override
protected synchronized void serviceStart() throws Exception {
if (haEnabled) {
transitionToStandby(true);
} else {
transitionToActive();
}
startServer();
super.serviceStart();
}
@ -124,8 +107,6 @@ public class AdminService extends AbstractService implements
@Override
protected synchronized void serviceStop() throws Exception {
stopServer();
transitionToStandby(false);
haState = HAServiceState.STOPPING;
super.serviceStop();
}
@ -145,7 +126,7 @@ public class AdminService extends AbstractService implements
refreshServiceAcls(conf, new RMPolicyProvider());
}
if (haEnabled) {
if (rmContext.isHAEnabled()) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
@ -182,39 +163,27 @@ public class AdminService extends AbstractService implements
}
private synchronized boolean isRMActive() {
return HAServiceState.ACTIVE == haState;
return HAServiceState.ACTIVE == rmContext.getHAServiceState();
}
@Override
public synchronized void monitorHealth()
throws IOException {
checkAccess("monitorHealth");
if (haState == HAServiceProtocol.HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
if (isRMActive() && !rm.areActiveServicesRunning()) {
throw new HealthCheckFailedException(
"Active ResourceManager services are not running!");
}
}
synchronized void transitionToActive() throws Exception {
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active");
rm.startActiveServices();
haState = HAServiceProtocol.HAServiceState.ACTIVE;
LOG.info("Transitioned to active");
}
@Override
public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo)
throws IOException {
public synchronized void transitionToActive(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToActive();
rm.transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
@ -226,32 +195,14 @@ public class AdminService extends AbstractService implements
}
}
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (haState == HAServiceProtocol.HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby");
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
rm.stopActiveServices();
if (initialize) {
rm.createAndInitActiveServices();
}
}
haState = HAServiceProtocol.HAServiceState.STANDBY;
LOG.info("Transitioned to standby");
}
@Override
public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo)
throws IOException {
public synchronized void transitionToStandby(
HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToStandby(true);
rm.transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) {
@ -266,15 +217,15 @@ public class AdminService extends AbstractService implements
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceState haState = rmContext.getHAServiceState();
HAServiceStatus ret = new HAServiceStatus(haState);
if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState ==
HAServiceProtocol.HAServiceState.STANDBY) {
if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
}
}
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -42,7 +43,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenS
public interface RMContext {
Dispatcher getDispatcher();
boolean isHAEnabled();
HAServiceState getHAServiceState();
RMStateStore getStateStore();
ConcurrentMap<ApplicationId, RMApp> getRMApps();

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -35,8 +37,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import com.google.common.annotations.VisibleForTesting;
@ -54,6 +56,10 @@ public class RMContextImpl implements RMContext {
private final ConcurrentMap<String, RMNode> inactiveNodes
= new ConcurrentHashMap<String, RMNode>();
private boolean isHAEnabled;
private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING;
private AMLivelinessMonitor amLivelinessMonitor;
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
@ -211,6 +217,16 @@ public class RMContextImpl implements RMContext {
return resourceTrackerService;
}
void setHAEnabled(boolean isHAEnabled) {
this.isHAEnabled = isHAEnabled;
}
void setHAServiceState(HAServiceState haServiceState) {
synchronized (haServiceState) {
this.haServiceState = haServiceState;
}
}
void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
@ -290,4 +306,16 @@ public class RMContextImpl implements RMContext {
ResourceTrackerService resourceTrackerService) {
this.resourceTrackerService = resourceTrackerService;
}
@Override
public boolean isHAEnabled() {
return isHAEnabled;
}
@Override
public HAServiceState getHAServiceState() {
synchronized (haServiceState) {
return haServiceState;
}
}
}

View File

@ -27,6 +27,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.http.HttpConfig.Policy;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
@ -188,6 +191,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
addService(adminService);
rmContext.setRMAdminService(adminService);
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(conf));
if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(conf);
}
createAndInitActiveServices();
super.serviceInit(conf);
}
@ -217,9 +226,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
protected RMStateStoreOperationFailedEventDispatcher
createRMStateStoreOperationFailedEventDispatcher() {
return new RMStateStoreOperationFailedEventDispatcher(
rmContext.getRMAdminService());
createRMStateStoreOperationFailedEventDispatcher() {
return new RMStateStoreOperationFailedEventDispatcher(rmContext, this);
}
protected Dispatcher createDispatcher() {
@ -655,11 +663,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Private
public static class RMStateStoreOperationFailedEventDispatcher implements
EventHandler<RMStateStoreOperationFailedEvent> {
private final AdminService adminService;
public RMStateStoreOperationFailedEventDispatcher(
AdminService adminService) {
this.adminService = adminService;
private final RMContext rmContext;
private final ResourceManager rm;
public RMStateStoreOperationFailedEventDispatcher(RMContext rmContext,
ResourceManager resourceManager) {
this.rmContext = rmContext;
this.rm = resourceManager;
}
@Override
@ -671,16 +682,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
LOG.info("RMStateStore has been fenced");
synchronized(adminService) {
if (adminService.haEnabled) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
adminService.transitionToStandby(true);
return;
} catch (Exception e) {
LOG.error("Failed to transition RM to Standby mode.");
}
if (rmContext.isHAEnabled()) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
rm.transitionToStandby(true);
return;
} catch (Exception e) {
LOG.error("Failed to transition RM to Standby mode.");
}
}
}
@ -826,10 +835,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
webApp = builder.start(new RMWebApp(this));
}
void setConf(Configuration configuration) {
conf = configuration;
}
/**
* Helper method to create and init {@link #activeServices}. This creates an
* instance of {@link RMActiveServices} and initializes it.
@ -870,6 +875,39 @@ public class ResourceManager extends CompositeService implements Recoverable {
return activeServices != null && activeServices.isInState(STATE.STARTED);
}
synchronized void transitionToActive() throws Exception {
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active state");
startActiveServices();
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.ACTIVE);
LOG.info("Transitioned to active state");
}
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby state");
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.ACTIVE) {
stopActiveServices();
if (initialize) {
createAndInitActiveServices();
}
}
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
LOG.info("Transitioned to standby state");
}
@Override
protected void serviceStart() throws Exception {
try {
@ -877,6 +915,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
} catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie);
}
if (this.rmContext.isHAEnabled()) {
transitionToStandby(true);
} else {
transitionToActive();
}
super.serviceStart();
}
@ -888,6 +933,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
transitionToStandby(false);
rmContext.setHAServiceState(HAServiceState.STOPPING);
}
protected ResourceTrackerService createResourceTrackerService() {