YARN-1318. Promoted AdminService to an Always-On service and merged it into RMHAProtocolService. Contributed by Karthik Kambatla.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547212 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-12-02 22:18:44 +00:00
parent 38a04a3042
commit c58ae266e9
15 changed files with 487 additions and 466 deletions

View File

@ -129,6 +129,9 @@ Release 2.4.0 - UNRELEASED
YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf
queues. (Sandy Ryza)
YARN-1318. Promoted AdminService to an Always-On service and merged it into
RMHAProtocolService. (Karthik Kambatla via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -285,18 +285,6 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids";
public static final String RM_HA_ID = RM_HA_PREFIX + "id";
@org.apache.hadoop.classification.InterfaceAudience.Private
// TODO Remove after YARN-1318
public static final String RM_HA_ADMIN_ADDRESS =
RM_HA_PREFIX + "admin.address";
public static final int DEFAULT_RM_HA_ADMIN_PORT = 8034;
public static String DEFAULT_RM_HA_ADMIN_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_HA_ADMIN_PORT;
public static final String RM_HA_ADMIN_CLIENT_THREAD_COUNT =
RM_HA_PREFIX + "admin.client.thread-count";
public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1;
// end @Private
public static final List<String> RM_RPC_ADDRESS_CONF_KEYS =
Collections.unmodifiableList(Arrays.asList(
RM_ADDRESS,
@ -304,9 +292,7 @@ public class YarnConfiguration extends Configuration {
RM_ADMIN_ADDRESS,
RM_RESOURCE_TRACKER_ADDRESS,
RM_WEBAPP_ADDRESS,
RM_WEBAPP_HTTPS_ADDRESS,
// TODO Remove after YARN-1318
RM_HA_ADMIN_ADDRESS));
RM_WEBAPP_HTTPS_ADDRESS));
////////////////////////////////
// RM state store configs
@ -786,11 +772,6 @@ public class YarnConfiguration extends Configuration {
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER =
"security.resourcelocalizer.protocol.acl";
@org.apache.hadoop.classification.InterfaceAudience.Private
// TODO Remove after YARN-1318
public static final String
YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL =
CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL;
/** No. of milliseconds to wait between sending a SIGTERM and SIGKILL
* to a running container */

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.exceptions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Exception to be thrown when an Active-Only operation is attempted on a
* ResourceManager that is not Active.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class RMNotYetActiveException extends YarnException {
private static final long serialVersionUID = 1L;
public RMNotYetActiveException() {
super("ResourceManager is not yet Active!");
}
}

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse;
@ -45,25 +46,25 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr
@Public
@Stable
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnException, IOException;
throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException, IOException;
throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
public RefreshSuperUserGroupsConfigurationResponse
refreshSuperUserGroupsConfiguration(
RefreshSuperUserGroupsConfigurationRequest request)
throws YarnException, IOException;
throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable
public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
RefreshUserToGroupsMappingsRequest request)
throws YarnException, IOException;
throws RMNotYetActiveException, YarnException, IOException;
@Public
@Stable

View File

@ -32,9 +32,9 @@ public class RMHAServiceTarget extends HAServiceTarget {
public RMHAServiceTarget(YarnConfiguration conf)
throws IOException {
haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_PORT);
}
@Override

View File

@ -21,18 +21,31 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -51,21 +64,19 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsC
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol {
public class AdminService extends AbstractService implements
HAServiceProtocol, ResourceManagerAdministrationProtocol {
private static final Log LOG = LogFactory.getLog(AdminService.class);
private final Configuration conf;
private final ResourceScheduler scheduler;
private final RMContext rmContext;
private final NodesListManager nodesListManager;
private final ClientRMService clientRMService;
private final ApplicationMasterService applicationMasterService;
private final ResourceTrackerService resourceTrackerService;
private final ResourceManager rm;
@VisibleForTesting
protected HAServiceProtocol.HAServiceState
haState = HAServiceProtocol.HAServiceState.INITIALIZING;
boolean haEnabled;
private Server server;
private InetSocketAddress masterServiceAddress;
@ -74,23 +85,21 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
public AdminService(Configuration conf, ResourceScheduler scheduler,
RMContext rmContext, NodesListManager nodesListManager,
ClientRMService clientRMService,
ApplicationMasterService applicationMasterService,
ResourceTrackerService resourceTrackerService) {
public AdminService(ResourceManager rm, RMContext rmContext) {
super(AdminService.class.getName());
this.conf = conf;
this.scheduler = scheduler;
this.rm = rm;
this.rmContext = rmContext;
this.nodesListManager = nodesListManager;
this.clientRMService = clientRMService;
this.applicationMasterService = applicationMasterService;
this.resourceTrackerService = resourceTrackerService;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
public synchronized void serviceInit(Configuration conf) throws Exception {
haEnabled = HAUtil.isHAEnabled(conf);
if (haEnabled) {
HAUtil.verifyAndSetConfiguration(conf);
rm.setConf(conf);
}
rm.createAndInitActiveServices();
masterServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS,
@ -102,14 +111,32 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
}
@Override
protected void serviceStart() throws Exception {
protected synchronized void serviceStart() throws Exception {
if (haEnabled) {
transitionToStandby(true);
} else {
transitionToActive();
}
startServer();
super.serviceStart();
}
@Override
protected synchronized void serviceStop() throws Exception {
stopServer();
transitionToStandby(false);
haState = HAServiceState.STOPPING;
super.serviceStop();
}
protected void startServer() throws Exception {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
conf, null,
conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
this.server = (Server) rpc.getServer(
ResourceManagerAdministrationProtocol.class, this, masterServiceAddress,
conf, null,
conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
@ -118,34 +145,151 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
refreshServiceAcls(conf, new RMPolicyProvider());
}
if (haEnabled) {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService =
HAServiceProtocolProtos.HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
HAServiceProtocol.class, haPbService);
}
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS,
server.getListenerAddress());
super.serviceStart();
server.getListenerAddress());
}
@Override
protected void serviceStop() throws Exception {
protected void stopServer() throws Exception {
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
}
private UserGroupInformation checkAcls(String method) throws YarnException {
try {
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
return checkAccess(method);
} catch (IOException ioe) {
throw RPCUtil.getRemoteException(ioe);
}
}
private synchronized boolean isRMActive() {
return HAServiceState.ACTIVE == haState;
}
@Override
public synchronized void monitorHealth()
throws IOException {
checkAccess("monitorHealth");
if (haState == HAServiceProtocol.HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
throw new HealthCheckFailedException(
"Active ResourceManager services are not running!");
}
}
synchronized void transitionToActive() throws Exception {
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active");
rm.startActiveServices();
haState = HAServiceProtocol.HAServiceState.ACTIVE;
LOG.info("Transitioned to active");
}
@Override
public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
}
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (haState == HAServiceProtocol.HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby");
if (haState == HAServiceProtocol.HAServiceState.ACTIVE) {
rm.stopActiveServices();
if (initialize) {
rm.createAndInitActiveServices();
}
}
haState = HAServiceProtocol.HAServiceState.STANDBY;
LOG.info("Transitioned to standby");
}
@Override
public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
}
}
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceStatus ret = new HAServiceStatus(haState);
if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState ==
HAServiceProtocol.HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
}
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
throws YarnException {
UserGroupInformation user = checkAcls("refreshQueues");
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues",
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh queues.");
throw new RMNotYetActiveException();
}
try {
scheduler.reinitialize(conf, this.rmContext);
rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues",
"AdminService");
return recordFactory.newRecordInstance(RefreshQueuesResponse.class);
@ -162,8 +306,16 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
throws YarnException {
UserGroupInformation user = checkAcls("refreshNodes");
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes",
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh nodes.");
throw new RMNotYetActiveException();
}
try {
this.nodesListManager.refreshNodes(new YarnConfiguration());
rmContext.getNodesListManager().refreshNodes(new YarnConfiguration());
RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes",
"AdminService");
return recordFactory.newRecordInstance(RefreshNodesResponse.class);
@ -181,6 +333,15 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
throws YarnException {
UserGroupInformation user = checkAcls("refreshSuperUserGroupsConfiguration");
// TODO (YARN-1459): Revisit handling super-user-groups on Standby RM
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(),
"refreshSuperUserGroupsConfiguration",
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh super-user-groups.");
throw new RMNotYetActiveException();
}
ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration());
RMAuditLogger.logSuccess(user.getShortUserName(),
"refreshSuperUserGroupsConfiguration", "AdminService");
@ -194,6 +355,15 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
RefreshUserToGroupsMappingsRequest request) throws YarnException {
UserGroupInformation user = checkAcls("refreshUserToGroupsMappings");
// TODO (YARN-1459): Revisit handling user-groups on Standby RM
if (!isRMActive()) {
RMAuditLogger.logFailure(user.getShortUserName(),
"refreshUserToGroupsMapping",
adminAcl.toString(), "AdminService",
"ResourceManager is not active. Can not refresh user-groups.");
throw new RMNotYetActiveException();
}
Groups.getUserToGroupsMappingService().refresh();
RMAuditLogger.logSuccess(user.getShortUserName(),
"refreshUserToGroupsMappings", "AdminService");
@ -233,9 +403,16 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
PolicyProvider policyProvider = new RMPolicyProvider();
refreshServiceAcls(conf, policyProvider);
clientRMService.refreshServiceAcls(conf, policyProvider);
applicationMasterService.refreshServiceAcls(conf, policyProvider);
resourceTrackerService.refreshServiceAcls(conf, policyProvider);
if (isRMActive()) {
rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider);
rmContext.getApplicationMasterService().refreshServiceAcls(
conf, policyProvider);
rmContext.getResourceTrackerService().refreshServiceAcls(
conf, policyProvider);
} else {
LOG.warn("ResourceManager is not active. Not refreshing ACLs for " +
"Clients, ApplicationMasters and NodeManagers");
}
return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
}
@ -249,5 +426,4 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi
public String[] getGroupsForUser(String user) throws IOException {
return UserGroupInformation.createRemoteUser(user).getGroupNames();
}
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -64,12 +65,22 @@ public interface RMContext {
NMTokenSecretManagerInRM getNMTokenSecretManager();
ResourceScheduler getScheduler();
NodesListManager getNodesListManager();
ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager();
void setClientRMService(ClientRMService clientRMService);
AdminService getRMAdminService();
ClientRMService getClientRMService();
ApplicationMasterService getApplicationMasterService();
ResourceTrackerService getResourceTrackerService();
void setClientRMService(ClientRMService clientRMService);
RMDelegationTokenSecretManager getRMDelegationTokenSecretManager();
void setRMDelegationTokenSecretManager(

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@ -42,7 +43,7 @@ import com.google.common.annotations.VisibleForTesting;
public class RMContextImpl implements RMContext {
private final Dispatcher rmDispatcher;
private Dispatcher rmDispatcher;
private final ConcurrentMap<ApplicationId, RMApp> applications
= new ConcurrentHashMap<ApplicationId, RMApp>();
@ -57,34 +58,25 @@ public class RMContextImpl implements RMContext {
private AMLivelinessMonitor amFinishingMonitor;
private RMStateStore stateStore = null;
private ContainerAllocationExpirer containerAllocationExpirer;
private final DelegationTokenRenewer delegationTokenRenewer;
private final AMRMTokenSecretManager amRMTokenSecretManager;
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private DelegationTokenRenewer delegationTokenRenewer;
private AMRMTokenSecretManager amRMTokenSecretManager;
private RMContainerTokenSecretManager containerTokenSecretManager;
private NMTokenSecretManagerInRM nmTokenSecretManager;
private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
private AdminService adminService;
private ClientRMService clientRMService;
private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
private ResourceScheduler scheduler;
private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService;
/**
* Default constructor. To be used in conjunction with setter methods for
* individual fields.
*/
public RMContextImpl() {
public RMContextImpl(Dispatcher rmDispatcher,
RMStateStore store,
ContainerAllocationExpirer containerAllocationExpirer,
AMLivelinessMonitor amLivelinessMonitor,
AMLivelinessMonitor amFinishingMonitor,
DelegationTokenRenewer delegationTokenRenewer,
AMRMTokenSecretManager amRMTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this.rmDispatcher = rmDispatcher;
this.stateStore = store;
this.containerAllocationExpirer = containerAllocationExpirer;
this.amLivelinessMonitor = amLivelinessMonitor;
this.amFinishingMonitor = amFinishingMonitor;
this.delegationTokenRenewer = delegationTokenRenewer;
this.amRMTokenSecretManager = amRMTokenSecretManager;
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
}
@VisibleForTesting
@ -98,10 +90,17 @@ public class RMContextImpl implements RMContext {
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
clientToAMTokenSecretManager);
this();
this.setDispatcher(rmDispatcher);
this.setContainerAllocationExpirer(containerAllocationExpirer);
this.setAMLivelinessMonitor(amLivelinessMonitor);
this.setAMFinishingMonitor(amFinishingMonitor);
this.setDelegationTokenRenewer(delegationTokenRenewer);
this.setAMRMTokenSecretManager(appTokenSecretManager);
this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
try {
@ -172,11 +171,26 @@ public class RMContextImpl implements RMContext {
return this.nmTokenSecretManager;
}
@Override
public ResourceScheduler getScheduler() {
return this.scheduler;
}
@Override
public NodesListManager getNodesListManager() {
return this.nodesListManager;
}
@Override
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
return this.clientToAMTokenSecretManager;
}
@Override
public AdminService getRMAdminService() {
return this.adminService;
}
@VisibleForTesting
public void setStateStore(RMStateStore store) {
stateStore = store;
@ -187,6 +201,24 @@ public class RMContextImpl implements RMContext {
return this.clientRMService;
}
@Override
public ApplicationMasterService getApplicationMasterService() {
return applicationMasterService;
}
@Override
public ResourceTrackerService getResourceTrackerService() {
return resourceTrackerService;
}
void setDispatcher(Dispatcher dispatcher) {
this.rmDispatcher = dispatcher;
}
void setRMAdminService(AdminService adminService) {
this.adminService = adminService;
}
@Override
public void setClientRMService(ClientRMService clientRMService) {
this.clientRMService = clientRMService;
@ -202,4 +234,60 @@ public class RMContextImpl implements RMContext {
RMDelegationTokenSecretManager delegationTokenSecretManager) {
this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
}
void setContainerAllocationExpirer(
ContainerAllocationExpirer containerAllocationExpirer) {
this.containerAllocationExpirer = containerAllocationExpirer;
}
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
this.amLivelinessMonitor = amLivelinessMonitor;
}
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
this.amFinishingMonitor = amFinishingMonitor;
}
void setContainerTokenSecretManager(
RMContainerTokenSecretManager containerTokenSecretManager) {
this.containerTokenSecretManager = containerTokenSecretManager;
}
void setNMTokenSecretManager(
NMTokenSecretManagerInRM nmTokenSecretManager) {
this.nmTokenSecretManager = nmTokenSecretManager;
}
void setScheduler(ResourceScheduler scheduler) {
this.scheduler = scheduler;
}
void setDelegationTokenRenewer(
DelegationTokenRenewer delegationTokenRenewer) {
this.delegationTokenRenewer = delegationTokenRenewer;
}
void setClientToAMTokenSecretManager(
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
}
void setAMRMTokenSecretManager(
AMRMTokenSecretManager amRMTokenSecretManager) {
this.amRMTokenSecretManager = amRMTokenSecretManager;
}
void setNodesListManager(NodesListManager nodesListManager) {
this.nodesListManager = nodesListManager;
}
void setApplicationMasterService(
ApplicationMasterService applicationMasterService) {
this.applicationMasterService = applicationMasterService;
}
void setResourceTrackerService(
ResourceTrackerService resourceTrackerService) {
this.resourceTrackerService = resourceTrackerService;
}
}

View File

@ -1,264 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import java.io.IOException;
import java.net.InetSocketAddress;
/**
* Internal class to handle HA related aspects of the {@link ResourceManager}.
*
* TODO (YARN-1318): Some/ all of this functionality should be merged with
* {@link AdminService}. Currently, marking this as Private and Unstable for
* those reasons.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class RMHAProtocolService extends AbstractService implements
HAServiceProtocol {
private static final Log LOG = LogFactory.getLog(RMHAProtocolService.class);
private Configuration conf;
private ResourceManager rm;
@VisibleForTesting
protected HAServiceState haState = HAServiceState.INITIALIZING;
private AccessControlList adminAcl;
private Server haAdminServer;
@InterfaceAudience.Private
boolean haEnabled;
public RMHAProtocolService(ResourceManager resourceManager) {
super("RMHAProtocolService");
this.rm = resourceManager;
}
@Override
protected synchronized void serviceInit(Configuration conf) throws
Exception {
this.conf = conf;
haEnabled = HAUtil.isHAEnabled(this.conf);
if (haEnabled) {
HAUtil.verifyAndSetConfiguration(conf);
rm.setConf(this.conf);
adminAcl = new AccessControlList(conf.get(
YarnConfiguration.YARN_ADMIN_ACL,
YarnConfiguration.DEFAULT_YARN_ADMIN_ACL));
}
rm.createAndInitActiveServices();
super.serviceInit(this.conf);
}
@Override
protected synchronized void serviceStart() throws Exception {
if (haEnabled) {
transitionToStandby(true);
startHAAdminServer();
} else {
transitionToActive();
}
super.serviceStart();
}
@Override
protected synchronized void serviceStop() throws Exception {
if (haEnabled) {
stopHAAdminServer();
}
transitionToStandby(false);
haState = HAServiceState.STOPPING;
super.serviceStop();
}
protected void startHAAdminServer() throws Exception {
InetSocketAddress haAdminServiceAddress = conf.getSocketAddr(
YarnConfiguration.RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT);
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService =
HAServiceProtocolProtos.HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
WritableRpcEngine.ensureInitialized();
String bindHost = haAdminServiceAddress.getHostName();
int serviceHandlerCount = conf.getInt(
YarnConfiguration.RM_HA_ADMIN_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT);
haAdminServer = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
.setBindAddress(bindHost)
.setPort(haAdminServiceAddress.getPort())
.setNumHandlers(serviceHandlerCount)
.setVerbose(false)
.build();
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) {
haAdminServer.refreshServiceAcl(conf, new RMPolicyProvider());
}
haAdminServer.start();
conf.updateConnectAddr(YarnConfiguration.RM_HA_ADMIN_ADDRESS,
haAdminServer.getListenerAddress());
}
private void stopHAAdminServer() throws Exception {
if (haAdminServer != null) {
haAdminServer.stop();
haAdminServer.join();
haAdminServer = null;
}
}
@Override
public synchronized void monitorHealth()
throws IOException {
checkAccess("monitorHealth");
if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) {
throw new HealthCheckFailedException(
"Active ResourceManager services are not running!");
}
}
@InterfaceAudience.Private
synchronized void transitionToActive() throws Exception {
if (haState == HAServiceState.ACTIVE) {
LOG.info("Already in active state");
return;
}
LOG.info("Transitioning to active");
rm.startActiveServices();
haState = HAServiceState.ACTIVE;
LOG.info("Transitioned to active");
}
@Override
public synchronized void transitionToActive(StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToActive");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToActive();
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToActive", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
}
}
@InterfaceAudience.Private
synchronized void transitionToStandby(boolean initialize)
throws Exception {
if (haState == HAServiceState.STANDBY) {
LOG.info("Already in standby state");
return;
}
LOG.info("Transitioning to standby");
if (haState == HAServiceState.ACTIVE) {
rm.stopActiveServices();
if (initialize) {
rm.createAndInitActiveServices();
}
}
haState = HAServiceState.STANDBY;
LOG.info("Transitioned to standby");
}
@Override
public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo)
throws IOException {
UserGroupInformation user = checkAccess("transitionToStandby");
// TODO (YARN-1177): When automatic failover is enabled,
// check if transition should be allowed for this request
try {
transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
"transitionToStandby", "RMHAProtocolService");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
adminAcl.toString(), "RMHAProtocolService",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
}
}
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
HAServiceStatus ret = new HAServiceStatus(haState);
if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) {
ret.setReadyToBecomeActive();
} else {
ret.setNotReadyToBecomeActive("State is " + haState);
}
return ret;
}
private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAccess(adminAcl, method, LOG);
}
}

View File

@ -118,7 +118,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
* the HA state of the RM.
*/
@VisibleForTesting
protected RMHAProtocolService haService;
protected RMContextImpl rmContext;
@VisibleForTesting
protected AdminService adminService;
/**
* "Active" services. Services that need to run only on the Active RM.
@ -129,8 +131,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* in Active state.
*/
protected RMActiveServices activeServices;
protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager =
new ClientToAMTokenSecretManagerInRM();
protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager;
protected RMContainerTokenSecretManager containerTokenSecretManager;
protected NMTokenSecretManagerInRM nmTokenSecretManager;
@ -143,7 +144,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ClientRMService clientRM;
protected ApplicationMasterService masterService;
private ApplicationMasterLauncher applicationMasterLauncher;
private AdminService adminService;
private ContainerAllocationExpirer containerAllocationExpirer;
protected NMLivelinessMonitor nmLivelinessMonitor;
protected NodesListManager nodesListManager;
@ -154,7 +154,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMDelegationTokenSecretManager rmDTSecretManager;
private DelegationTokenRenewer delegationTokenRenewer;
private WebApp webApp;
protected RMContext rmContext;
protected ResourceTrackerService resourceTracker;
private boolean recoveryEnabled;
@ -166,10 +165,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
super("ResourceManager");
}
public RMHAProtocolService getHAService() {
return this.haService;
}
public RMContext getRMContext() {
return this.rmContext;
}
@ -187,9 +182,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected void serviceInit(Configuration conf) throws Exception {
validateConfigs(conf);
this.conf = conf;
this.rmContext = new RMContextImpl();
adminService = createAdminService();
addService(adminService);
rmContext.setRMAdminService(adminService);
haService = createRMHAProtocolService();
addService(haService);
super.serviceInit(conf);
}
@ -201,11 +199,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@VisibleForTesting
protected void setRMStateStore(RMStateStore rmStore) {
rmStore.setRMDispatcher(rmDispatcher);
((RMContextImpl) rmContext).setStateStore(rmStore);
}
protected RMHAProtocolService createRMHAProtocolService() {
return new RMHAProtocolService(this);
rmContext.setStateStore(rmStore);
}
protected RMContainerTokenSecretManager createContainerTokenSecretManager(
@ -224,7 +218,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected RMStateStoreOperationFailedEventDispatcher
createRMStateStoreOperationFailedEventDispatcher() {
return new RMStateStoreOperationFailedEventDispatcher(haService);
return new RMStateStoreOperationFailedEventDispatcher(
rmContext.getRMAdminService());
}
protected Dispatcher createDispatcher() {
@ -319,20 +314,31 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmDispatcher = createDispatcher();
addIfService(rmDispatcher);
rmContext.setDispatcher(rmDispatcher);
clientToAMSecretManager = new ClientToAMTokenSecretManagerInRM();
rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager);
amRmTokenSecretManager = createAMRMTokenSecretManager(conf);
rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager);
containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher);
addService(containerAllocationExpirer);
rmContext.setContainerAllocationExpirer(containerAllocationExpirer);
AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor();
addService(amLivelinessMonitor);
rmContext.setAMLivelinessMonitor(amLivelinessMonitor);
AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
addService(amFinishingMonitor);
rmContext.setAMFinishingMonitor(amFinishingMonitor);
containerTokenSecretManager = createContainerTokenSecretManager(conf);
rmContext.setContainerTokenSecretManager(containerTokenSecretManager);
nmTokenSecretManager = createNMTokenSecretManager(conf);
rmContext.setNMTokenSecretManager(nmTokenSecretManager);
boolean isRecoveryEnabled = conf.getBoolean(
YarnConfiguration.RECOVERY_ENABLED,
@ -358,24 +364,23 @@ public class ResourceManager extends CompositeService implements Recoverable {
LOG.error("Failed to init state store", e);
ExitUtil.terminate(1, e);
}
rmContext.setStateStore(rmStore);
if (UserGroupInformation.isSecurityEnabled()) {
delegationTokenRenewer = createDelegationTokenRenewer();
rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
rmContext = new RMContextImpl(
rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor,
amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
clientToAMSecretManager);
// Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
addService(nodesListManager);
rmContext.setNodesListManager(nodesListManager);
// Initialize the scheduler
scheduler = createScheduler();
rmContext.setScheduler(scheduler);
schedulerDispatcher = createSchedulerEventDispatcher();
addIfService(schedulerDispatcher);
rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
@ -397,6 +402,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
resourceTracker = createResourceTrackerService();
addService(resourceTracker);
rmContext.setResourceTrackerService(resourceTracker);
DefaultMetricsSystem.initialize("ResourceManager");
JvmMetrics.initSingleton("ResourceManager", null);
@ -412,6 +418,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
masterService = createApplicationMasterService();
addService(masterService) ;
rmContext.setApplicationMasterService(masterService);
applicationACLsManager = new ApplicationACLsManager(conf);
@ -422,12 +429,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
rmDispatcher.register(RMAppManagerEventType.class, rmAppManager);
rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext);
rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager);
clientRM = createClientRMService();
rmContext.setClientRMService(clientRM);
addService(clientRM);
adminService = createAdminService(clientRM, masterService, resourceTracker);
addService(adminService);
rmContext.setClientRMService(clientRM);
applicationMasterLauncher = createAMLauncher();
rmDispatcher.register(AMLauncherEventType.class,
@ -649,11 +655,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Private
public static class RMStateStoreOperationFailedEventDispatcher implements
EventHandler<RMStateStoreOperationFailedEvent> {
private final RMHAProtocolService haService;
private final AdminService adminService;
public RMStateStoreOperationFailedEventDispatcher(
RMHAProtocolService haService) {
this.haService = haService;
AdminService adminService) {
this.adminService = adminService;
}
@Override
@ -665,12 +671,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
}
if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) {
LOG.info("RMStateStore has been fenced");
synchronized(haService) {
if (haService.haEnabled) {
synchronized(adminService) {
if (adminService.haEnabled) {
try {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
haService.transitionToStandby(true);
adminService.transitionToStandby(true);
return;
} catch (Exception e) {
LOG.error("Failed to transition RM to Standby mode.");
@ -853,6 +859,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (activeServices != null) {
activeServices.stop();
activeServices = null;
rmContext.getRMNodes().clear();
rmContext.getInactiveRMNodes().clear();
rmContext.getRMApps().clear();
}
}
@ -913,13 +922,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
return new ApplicationMasterService(this.rmContext, scheduler);
}
protected AdminService createAdminService(
ClientRMService clientRMService,
ApplicationMasterService applicationMasterService,
ResourceTrackerService resourceTrackerService) {
return new AdminService(this.conf, scheduler, rmContext,
this.nodesListManager, clientRMService, applicationMasterService,
resourceTrackerService);
protected AdminService createAdminService() {
return new AdminService(this, rmContext);
}
@Private

View File

@ -53,9 +53,6 @@ public class RMPolicyProvider extends PolicyProvider {
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL,
ContainerManagementProtocolPB.class),
new Service(
YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL,
HAServiceProtocol.class),
};
@Override

View File

@ -306,16 +306,6 @@ public class MockRM extends ResourceManager {
.handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed"));
}
@Override
protected RMHAProtocolService createRMHAProtocolService() {
return new RMHAProtocolService(this) {
@Override
protected void startHAAdminServer() {
// do nothing
}
};
}
@Override
protected ClientRMService createClientRMService() {
return new ClientRMService(getRMContext(), getResourceScheduler(),
@ -391,19 +381,15 @@ public class MockRM extends ResourceManager {
}
@Override
protected AdminService createAdminService(ClientRMService clientRMService,
ApplicationMasterService applicationMasterService,
ResourceTrackerService resourceTrackerService) {
return new AdminService(getConfig(), scheduler, getRMContext(),
this.nodesListManager, clientRMService, applicationMasterService,
resourceTrackerService) {
protected AdminService createAdminService() {
return new AdminService(this, getRMContext()) {
@Override
protected void serviceStart() {
protected void startServer() {
// override to not start rpc handler
}
@Override
protected void serviceStop() {
protected void stopServer() {
// don't do anything
}
};

View File

@ -62,7 +62,7 @@ public class TestRMHA {
private void checkMonitorHealth() throws IOException {
try {
rm.haService.monitorHealth();
rm.adminService.monitorHealth();
} catch (HealthCheckFailedException e) {
fail("The RM is in bad health: it is Active, but the active services " +
"are not running");
@ -71,20 +71,20 @@ public class TestRMHA {
private void checkStandbyRMFunctionality() throws IOException {
assertEquals(STATE_ERR, HAServiceState.STANDBY,
rm.haService.getServiceStatus().getState());
rm.adminService.getServiceStatus().getState());
assertFalse("Active RM services are started",
rm.areActiveServicesRunning());
assertTrue("RM is not ready to become active",
rm.haService.getServiceStatus().isReadyToBecomeActive());
rm.adminService.getServiceStatus().isReadyToBecomeActive());
}
private void checkActiveRMFunctionality() throws IOException {
assertEquals(STATE_ERR, HAServiceState.ACTIVE,
rm.haService.getServiceStatus().getState());
rm.adminService.getServiceStatus().getState());
assertTrue("Active RM services aren't started",
rm.areActiveServicesRunning());
assertTrue("RM is not ready to become active",
rm.haService.getServiceStatus().isReadyToBecomeActive());
rm.adminService.getServiceStatus().isReadyToBecomeActive());
try {
rm.getNewAppId();
@ -113,9 +113,9 @@ public class TestRMHA {
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
assertEquals(STATE_ERR, HAServiceState.INITIALIZING,
rm.haService.getServiceStatus().getState());
rm.adminService.getServiceStatus().getState());
assertFalse("RM is ready to become active before being started",
rm.haService.getServiceStatus().isReadyToBecomeActive());
rm.adminService.getServiceStatus().isReadyToBecomeActive());
checkMonitorHealth();
rm.start();
@ -123,27 +123,27 @@ public class TestRMHA {
checkStandbyRMFunctionality();
// 1. Transition to Standby - must be a no-op
rm.haService.transitionToStandby(requestInfo);
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
// 2. Transition to active
rm.haService.transitionToActive(requestInfo);
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
// 3. Transition to active - no-op
rm.haService.transitionToActive(requestInfo);
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
// 4. Transition to standby
rm.haService.transitionToStandby(requestInfo);
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
// 5. Transition to active to check Active->Standby->Active works
rm.haService.transitionToActive(requestInfo);
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
@ -151,9 +151,9 @@ public class TestRMHA {
// become active
rm.stop();
assertEquals(STATE_ERR, HAServiceState.STOPPING,
rm.haService.getServiceStatus().getState());
rm.adminService.getServiceStatus().getState());
assertFalse("RM is ready to become active even after it is stopped",
rm.haService.getServiceStatus().isReadyToBecomeActive());
rm.adminService.getServiceStatus().isReadyToBecomeActive());
assertFalse("Active RM services are started",
rm.areActiveServicesRunning());
checkMonitorHealth();

View File

@ -129,7 +129,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
for (String rpcAddress : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) {
conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0");
}
conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort);
conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId),
"localhost:" + adminPort);
return conf;
}
@ -143,23 +144,23 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
ResourceManager rm1 = new ResourceManager();
rm1.init(conf1);
rm1.start();
rm1.getHAService().transitionToActive(req);
rm1.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm1.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm1.getHAService().getServiceStatus().getState());
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678);
ResourceManager rm2 = new ResourceManager();
rm2.init(conf2);
rm2.start();
rm2.getHAService().transitionToActive(req);
rm2.getRMContext().getRMAdminService().transitionToActive(req);
assertEquals("RM with ZKStore didn't start",
Service.STATE.STARTED, rm2.getServiceState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getHAService().getServiceStatus().getState());
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
// Submitting an application to RM1 to trigger a state store operation.
// RM1 should realize that it got fenced and is not the Active RM anymore.
@ -181,16 +182,16 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
rmService.submitApplication(SubmitApplicationRequest.newInstance(asc));
for (int i = 0; i < 30; i++) {
if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getHAService()
.getServiceStatus().getState()) {
if (HAServiceProtocol.HAServiceState.ACTIVE ==
rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) {
Thread.sleep(100);
}
}
assertEquals("RM should have been fenced",
HAServiceProtocol.HAServiceState.STANDBY,
rm1.getHAService().getServiceStatus().getState());
rm1.getRMContext().getRMAdminService().getServiceStatus().getState());
assertEquals("RM should be Active",
HAServiceProtocol.HAServiceState.ACTIVE,
rm2.getHAService().getServiceStatus().getState());
rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
}
}

View File

@ -179,12 +179,13 @@ public class TestRMAppTransitions {
AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class);
store = mock(RMStateStore.class);
this.rmContext =
new RMContextImpl(rmDispatcher, store,
new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, new AMRMTokenSecretManager(conf),
new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM());
((RMContextImpl)rmContext).setStateStore(store);
rmDispatcher.register(RMAppAttemptEventType.class,
new TestApplicationAttemptEventDispatcher(this.rmContext));