From c58ae266e9fa336ef5b515f540c8ce8bb2f76df8 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Mon, 2 Dec 2013 22:18:44 +0000 Subject: [PATCH] YARN-1318. Promoted AdminService to an Always-On service and merged it into RMHAProtocolService. Contributed by Karthik Kambatla. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1547212 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 21 +- .../exceptions/RMNotYetActiveException.java | 36 +++ ...ResourceManagerAdministrationProtocol.java | 9 +- .../hadoop/yarn/client/RMHAServiceTarget.java | 6 +- .../server/resourcemanager/AdminService.java | 282 ++++++++++++++---- .../server/resourcemanager/RMContext.java | 19 +- .../server/resourcemanager/RMContextImpl.java | 154 ++++++++-- .../resourcemanager/RMHAProtocolService.java | 264 ---------------- .../resourcemanager/ResourceManager.java | 82 ++--- .../security/authorize/RMPolicyProvider.java | 5 +- .../yarn/server/resourcemanager/MockRM.java | 22 +- .../yarn/server/resourcemanager/TestRMHA.java | 28 +- .../recovery/TestZKRMStateStore.java | 19 +- .../rmapp/TestRMAppTransitions.java | 3 +- 15 files changed, 487 insertions(+), 466 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/RMNotYetActiveException.java delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a7915283ede..5cd354cfac9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -129,6 +129,9 @@ Release 2.4.0 - UNRELEASED YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf queues. (Sandy Ryza) + YARN-1318. Promoted AdminService to an Always-On service and merged it into + RMHAProtocolService. (Karthik Kambatla via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 72ad08f1482..af48f26298a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -285,18 +285,6 @@ public class YarnConfiguration extends Configuration { public static final String RM_HA_IDS = RM_HA_PREFIX + "rm-ids"; public static final String RM_HA_ID = RM_HA_PREFIX + "id"; - @org.apache.hadoop.classification.InterfaceAudience.Private - // TODO Remove after YARN-1318 - public static final String RM_HA_ADMIN_ADDRESS = - RM_HA_PREFIX + "admin.address"; - public static final int DEFAULT_RM_HA_ADMIN_PORT = 8034; - public static String DEFAULT_RM_HA_ADMIN_ADDRESS = - "0.0.0.0:" + DEFAULT_RM_HA_ADMIN_PORT; - public static final String RM_HA_ADMIN_CLIENT_THREAD_COUNT = - RM_HA_PREFIX + "admin.client.thread-count"; - public static final int DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT = 1; - // end @Private - public static final List RM_RPC_ADDRESS_CONF_KEYS = Collections.unmodifiableList(Arrays.asList( RM_ADDRESS, @@ -304,9 +292,7 @@ public class YarnConfiguration extends Configuration { RM_ADMIN_ADDRESS, RM_RESOURCE_TRACKER_ADDRESS, RM_WEBAPP_ADDRESS, - RM_WEBAPP_HTTPS_ADDRESS, - // TODO Remove after YARN-1318 - RM_HA_ADMIN_ADDRESS)); + RM_WEBAPP_HTTPS_ADDRESS)); //////////////////////////////// // RM state store configs @@ -786,11 +772,6 @@ public class YarnConfiguration extends Configuration { public static final String YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCE_LOCALIZER = "security.resourcelocalizer.protocol.acl"; - @org.apache.hadoop.classification.InterfaceAudience.Private - // TODO Remove after YARN-1318 - public static final String - YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL = - CommonConfigurationKeys.SECURITY_HA_SERVICE_PROTOCOL_ACL; /** No. of milliseconds to wait between sending a SIGTERM and SIGKILL * to a running container */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/RMNotYetActiveException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/RMNotYetActiveException.java new file mode 100644 index 00000000000..4aac61eaa93 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/RMNotYetActiveException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Exception to be thrown when an Active-Only operation is attempted on a + * ResourceManager that is not Active. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class RMNotYetActiveException extends YarnException { + private static final long serialVersionUID = 1L; + + public RMNotYetActiveException() { + super("ResourceManager is not yet Active!"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java index 9a1b56895cd..dd3eb927304 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/server/api/ResourceManagerAdministrationProtocol.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.tools.GetUserMappingsProtocol; +import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsResponse; @@ -45,25 +46,25 @@ public interface ResourceManagerAdministrationProtocol extends GetUserMappingsPr @Public @Stable public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) - throws YarnException, IOException; + throws RMNotYetActiveException, YarnException, IOException; @Public @Stable public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) - throws YarnException, IOException; + throws RMNotYetActiveException, YarnException, IOException; @Public @Stable public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration( RefreshSuperUserGroupsConfigurationRequest request) - throws YarnException, IOException; + throws RMNotYetActiveException, YarnException, IOException; @Public @Stable public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( RefreshUserToGroupsMappingsRequest request) - throws YarnException, IOException; + throws RMNotYetActiveException, YarnException, IOException; @Public @Stable diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java index bb07bf81085..74cb4992383 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RMHAServiceTarget.java @@ -32,9 +32,9 @@ public class RMHAServiceTarget extends HAServiceTarget { public RMHAServiceTarget(YarnConfiguration conf) throws IOException { haAdminServiceAddress = conf.getSocketAddr( - YarnConfiguration.RM_HA_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT); + YarnConfiguration.RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, + YarnConfiguration.DEFAULT_RM_ADMIN_PORT); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index f5ae5e82437..e78f002f880 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -21,18 +21,31 @@ import java.io.IOException; import java.net.InetSocketAddress; +import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.BlockingService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceStatus; +import org.apache.hadoop.ha.HealthCheckFailedException; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.RPC.Server; import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.HAUtil; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.RMNotYetActiveException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -51,22 +64,20 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshSuperUserGroupsConfigurationResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshUserToGroupsMappingsResponse; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; -public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol { +public class AdminService extends AbstractService implements + HAServiceProtocol, ResourceManagerAdministrationProtocol { private static final Log LOG = LogFactory.getLog(AdminService.class); - private final Configuration conf; - private final ResourceScheduler scheduler; private final RMContext rmContext; - private final NodesListManager nodesListManager; - - private final ClientRMService clientRMService; - private final ApplicationMasterService applicationMasterService; - private final ResourceTrackerService resourceTrackerService; - + private final ResourceManager rm; + @VisibleForTesting + protected HAServiceProtocol.HAServiceState + haState = HAServiceProtocol.HAServiceState.INITIALIZING; + boolean haEnabled; + private Server server; private InetSocketAddress masterServiceAddress; private AccessControlList adminAcl; @@ -74,23 +85,21 @@ public class AdminService extends AbstractService implements ResourceManagerAdmi private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - public AdminService(Configuration conf, ResourceScheduler scheduler, - RMContext rmContext, NodesListManager nodesListManager, - ClientRMService clientRMService, - ApplicationMasterService applicationMasterService, - ResourceTrackerService resourceTrackerService) { + public AdminService(ResourceManager rm, RMContext rmContext) { super(AdminService.class.getName()); - this.conf = conf; - this.scheduler = scheduler; + this.rm = rm; this.rmContext = rmContext; - this.nodesListManager = nodesListManager; - this.clientRMService = clientRMService; - this.applicationMasterService = applicationMasterService; - this.resourceTrackerService = resourceTrackerService; } @Override - public void serviceInit(Configuration conf) throws Exception { + public synchronized void serviceInit(Configuration conf) throws Exception { + haEnabled = HAUtil.isHAEnabled(conf); + if (haEnabled) { + HAUtil.verifyAndSetConfiguration(conf); + rm.setConf(conf); + } + rm.createAndInitActiveServices(); + masterServiceAddress = conf.getSocketAddr( YarnConfiguration.RM_ADMIN_ADDRESS, YarnConfiguration.DEFAULT_RM_ADMIN_ADDRESS, @@ -102,50 +111,185 @@ public void serviceInit(Configuration conf) throws Exception { } @Override - protected void serviceStart() throws Exception { - Configuration conf = getConfig(); - YarnRPC rpc = YarnRPC.create(conf); - this.server = - rpc.getServer(ResourceManagerAdministrationProtocol.class, this, masterServiceAddress, - conf, null, - conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); - - // Enable service authorization? - if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, - false)) { - refreshServiceAcls(conf, new RMPolicyProvider()); + protected synchronized void serviceStart() throws Exception { + if (haEnabled) { + transitionToStandby(true); + } else { + transitionToActive(); } - - this.server.start(); - conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS, - server.getListenerAddress()); + startServer(); super.serviceStart(); } @Override - protected void serviceStop() throws Exception { + protected synchronized void serviceStop() throws Exception { + stopServer(); + transitionToStandby(false); + haState = HAServiceState.STOPPING; + super.serviceStop(); + } + + protected void startServer() throws Exception { + Configuration conf = getConfig(); + YarnRPC rpc = YarnRPC.create(conf); + this.server = (Server) rpc.getServer( + ResourceManagerAdministrationProtocol.class, this, masterServiceAddress, + conf, null, + conf.getInt(YarnConfiguration.RM_ADMIN_CLIENT_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_ADMIN_CLIENT_THREAD_COUNT)); + + // Enable service authorization? + if (conf.getBoolean( + CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, + false)) { + refreshServiceAcls(conf, new RMPolicyProvider()); + } + + if (haEnabled) { + RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, + ProtobufRpcEngine.class); + + HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = + new HAServiceProtocolServerSideTranslatorPB(this); + BlockingService haPbService = + HAServiceProtocolProtos.HAServiceProtocolService + .newReflectiveBlockingService(haServiceProtocolXlator); + server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, + HAServiceProtocol.class, haPbService); + } + + this.server.start(); + conf.updateConnectAddr(YarnConfiguration.RM_ADMIN_ADDRESS, + server.getListenerAddress()); + } + + protected void stopServer() throws Exception { if (this.server != null) { this.server.stop(); } - super.serviceStop(); + } + + private UserGroupInformation checkAccess(String method) throws IOException { + return RMServerUtils.verifyAccess(adminAcl, method, LOG); } private UserGroupInformation checkAcls(String method) throws YarnException { try { - return RMServerUtils.verifyAccess(adminAcl, method, LOG); + return checkAccess(method); } catch (IOException ioe) { throw RPCUtil.getRemoteException(ioe); } } - + + private synchronized boolean isRMActive() { + return HAServiceState.ACTIVE == haState; + } + + @Override + public synchronized void monitorHealth() + throws IOException { + checkAccess("monitorHealth"); + if (haState == HAServiceProtocol.HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) { + throw new HealthCheckFailedException( + "Active ResourceManager services are not running!"); + } + } + + synchronized void transitionToActive() throws Exception { + if (haState == HAServiceProtocol.HAServiceState.ACTIVE) { + LOG.info("Already in active state"); + return; + } + + LOG.info("Transitioning to active"); + rm.startActiveServices(); + haState = HAServiceProtocol.HAServiceState.ACTIVE; + LOG.info("Transitioned to active"); + } + + @Override + public synchronized void transitionToActive(HAServiceProtocol.StateChangeRequestInfo reqInfo) + throws IOException { + UserGroupInformation user = checkAccess("transitionToActive"); + // TODO (YARN-1177): When automatic failover is enabled, + // check if transition should be allowed for this request + try { + transitionToActive(); + RMAuditLogger.logSuccess(user.getShortUserName(), + "transitionToActive", "RMHAProtocolService"); + } catch (Exception e) { + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", + adminAcl.toString(), "RMHAProtocolService", + "Exception transitioning to active"); + throw new ServiceFailedException( + "Error when transitioning to Active mode", e); + } + } + + synchronized void transitionToStandby(boolean initialize) + throws Exception { + if (haState == HAServiceProtocol.HAServiceState.STANDBY) { + LOG.info("Already in standby state"); + return; + } + + LOG.info("Transitioning to standby"); + if (haState == HAServiceProtocol.HAServiceState.ACTIVE) { + rm.stopActiveServices(); + if (initialize) { + rm.createAndInitActiveServices(); + } + } + haState = HAServiceProtocol.HAServiceState.STANDBY; + LOG.info("Transitioned to standby"); + } + + @Override + public synchronized void transitionToStandby(HAServiceProtocol.StateChangeRequestInfo reqInfo) + throws IOException { + UserGroupInformation user = checkAccess("transitionToStandby"); + // TODO (YARN-1177): When automatic failover is enabled, + // check if transition should be allowed for this request + try { + transitionToStandby(true); + RMAuditLogger.logSuccess(user.getShortUserName(), + "transitionToStandby", "RMHAProtocolService"); + } catch (Exception e) { + RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby", + adminAcl.toString(), "RMHAProtocolService", + "Exception transitioning to standby"); + throw new ServiceFailedException( + "Error when transitioning to Standby mode", e); + } + } + + @Override + public synchronized HAServiceStatus getServiceStatus() throws IOException { + checkAccess("getServiceState"); + HAServiceStatus ret = new HAServiceStatus(haState); + if (haState == HAServiceProtocol.HAServiceState.ACTIVE || haState == + HAServiceProtocol.HAServiceState.STANDBY) { + ret.setReadyToBecomeActive(); + } else { + ret.setNotReadyToBecomeActive("State is " + haState); + } + return ret; + } + @Override public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) throws YarnException { UserGroupInformation user = checkAcls("refreshQueues"); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), "refreshQueues", + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not refresh queues."); + throw new RMNotYetActiveException(); + } + try { - scheduler.reinitialize(conf, this.rmContext); + rmContext.getScheduler().reinitialize(getConfig(), this.rmContext); RMAuditLogger.logSuccess(user.getShortUserName(), "refreshQueues", "AdminService"); return recordFactory.newRecordInstance(RefreshQueuesResponse.class); @@ -162,8 +306,16 @@ public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request) public RefreshNodesResponse refreshNodes(RefreshNodesRequest request) throws YarnException { UserGroupInformation user = checkAcls("refreshNodes"); + + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), "refreshNodes", + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not refresh nodes."); + throw new RMNotYetActiveException(); + } + try { - this.nodesListManager.refreshNodes(new YarnConfiguration()); + rmContext.getNodesListManager().refreshNodes(new YarnConfiguration()); RMAuditLogger.logSuccess(user.getShortUserName(), "refreshNodes", "AdminService"); return recordFactory.newRecordInstance(RefreshNodesResponse.class); @@ -180,7 +332,16 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu RefreshSuperUserGroupsConfigurationRequest request) throws YarnException { UserGroupInformation user = checkAcls("refreshSuperUserGroupsConfiguration"); - + + // TODO (YARN-1459): Revisit handling super-user-groups on Standby RM + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), + "refreshSuperUserGroupsConfiguration", + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not refresh super-user-groups."); + throw new RMNotYetActiveException(); + } + ProxyUsers.refreshSuperUserGroupsConfiguration(new Configuration()); RMAuditLogger.logSuccess(user.getShortUserName(), "refreshSuperUserGroupsConfiguration", "AdminService"); @@ -193,7 +354,16 @@ public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfigu public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings( RefreshUserToGroupsMappingsRequest request) throws YarnException { UserGroupInformation user = checkAcls("refreshUserToGroupsMappings"); - + + // TODO (YARN-1459): Revisit handling user-groups on Standby RM + if (!isRMActive()) { + RMAuditLogger.logFailure(user.getShortUserName(), + "refreshUserToGroupsMapping", + adminAcl.toString(), "AdminService", + "ResourceManager is not active. Can not refresh user-groups."); + throw new RMNotYetActiveException(); + } + Groups.getUserToGroupsMappingService().refresh(); RMAuditLogger.logSuccess(user.getShortUserName(), "refreshUserToGroupsMappings", "AdminService"); @@ -233,9 +403,16 @@ public RefreshServiceAclsResponse refreshServiceAcls( PolicyProvider policyProvider = new RMPolicyProvider(); refreshServiceAcls(conf, policyProvider); - clientRMService.refreshServiceAcls(conf, policyProvider); - applicationMasterService.refreshServiceAcls(conf, policyProvider); - resourceTrackerService.refreshServiceAcls(conf, policyProvider); + if (isRMActive()) { + rmContext.getClientRMService().refreshServiceAcls(conf, policyProvider); + rmContext.getApplicationMasterService().refreshServiceAcls( + conf, policyProvider); + rmContext.getResourceTrackerService().refreshServiceAcls( + conf, policyProvider); + } else { + LOG.warn("ResourceManager is not active. Not refreshing ACLs for " + + "Clients, ApplicationMasters and NodeManagers"); + } return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class); } @@ -249,5 +426,4 @@ void refreshServiceAcls(Configuration configuration, public String[] getGroupsForUser(String user) throws IOException { return UserGroupInformation.createRemoteUser(user).getGroupNames(); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java index 28101cc27ed..a93fcb13d76 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -64,12 +65,22 @@ public interface RMContext { NMTokenSecretManagerInRM getNMTokenSecretManager(); + ResourceScheduler getScheduler(); + + NodesListManager getNodesListManager(); + ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager(); - - void setClientRMService(ClientRMService clientRMService); - + + AdminService getRMAdminService(); + ClientRMService getClientRMService(); - + + ApplicationMasterService getApplicationMasterService(); + + ResourceTrackerService getResourceTrackerService(); + + void setClientRMService(ClientRMService clientRMService); + RMDelegationTokenSecretManager getRMDelegationTokenSecretManager(); void setRMDelegationTokenSecretManager( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java index d2592ed445c..f98b49efc8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer; @@ -42,7 +43,7 @@ public class RMContextImpl implements RMContext { - private final Dispatcher rmDispatcher; + private Dispatcher rmDispatcher; private final ConcurrentMap applications = new ConcurrentHashMap(); @@ -57,34 +58,25 @@ public class RMContextImpl implements RMContext { private AMLivelinessMonitor amFinishingMonitor; private RMStateStore stateStore = null; private ContainerAllocationExpirer containerAllocationExpirer; - private final DelegationTokenRenewer delegationTokenRenewer; - private final AMRMTokenSecretManager amRMTokenSecretManager; - private final RMContainerTokenSecretManager containerTokenSecretManager; - private final NMTokenSecretManagerInRM nmTokenSecretManager; - private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; + private DelegationTokenRenewer delegationTokenRenewer; + private AMRMTokenSecretManager amRMTokenSecretManager; + private RMContainerTokenSecretManager containerTokenSecretManager; + private NMTokenSecretManagerInRM nmTokenSecretManager; + private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager; + private AdminService adminService; private ClientRMService clientRMService; private RMDelegationTokenSecretManager rmDelegationTokenSecretManager; + private ResourceScheduler scheduler; + private NodesListManager nodesListManager; + private ResourceTrackerService resourceTrackerService; + private ApplicationMasterService applicationMasterService; + + /** + * Default constructor. To be used in conjunction with setter methods for + * individual fields. + */ + public RMContextImpl() { - public RMContextImpl(Dispatcher rmDispatcher, - RMStateStore store, - ContainerAllocationExpirer containerAllocationExpirer, - AMLivelinessMonitor amLivelinessMonitor, - AMLivelinessMonitor amFinishingMonitor, - DelegationTokenRenewer delegationTokenRenewer, - AMRMTokenSecretManager amRMTokenSecretManager, - RMContainerTokenSecretManager containerTokenSecretManager, - NMTokenSecretManagerInRM nmTokenSecretManager, - ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { - this.rmDispatcher = rmDispatcher; - this.stateStore = store; - this.containerAllocationExpirer = containerAllocationExpirer; - this.amLivelinessMonitor = amLivelinessMonitor; - this.amFinishingMonitor = amFinishingMonitor; - this.delegationTokenRenewer = delegationTokenRenewer; - this.amRMTokenSecretManager = amRMTokenSecretManager; - this.containerTokenSecretManager = containerTokenSecretManager; - this.nmTokenSecretManager = nmTokenSecretManager; - this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; } @VisibleForTesting @@ -98,10 +90,17 @@ public RMContextImpl(Dispatcher rmDispatcher, RMContainerTokenSecretManager containerTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { - this(rmDispatcher, null, containerAllocationExpirer, amLivelinessMonitor, - amFinishingMonitor, delegationTokenRenewer, appTokenSecretManager, - containerTokenSecretManager, nmTokenSecretManager, - clientToAMTokenSecretManager); + this(); + this.setDispatcher(rmDispatcher); + this.setContainerAllocationExpirer(containerAllocationExpirer); + this.setAMLivelinessMonitor(amLivelinessMonitor); + this.setAMFinishingMonitor(amFinishingMonitor); + this.setDelegationTokenRenewer(delegationTokenRenewer); + this.setAMRMTokenSecretManager(appTokenSecretManager); + this.setContainerTokenSecretManager(containerTokenSecretManager); + this.setNMTokenSecretManager(nmTokenSecretManager); + this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); + RMStateStore nullStore = new NullRMStateStore(); nullStore.setRMDispatcher(rmDispatcher); try { @@ -171,12 +170,27 @@ public RMContainerTokenSecretManager getContainerTokenSecretManager() { public NMTokenSecretManagerInRM getNMTokenSecretManager() { return this.nmTokenSecretManager; } - + + @Override + public ResourceScheduler getScheduler() { + return this.scheduler; + } + + @Override + public NodesListManager getNodesListManager() { + return this.nodesListManager; + } + @Override public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() { return this.clientToAMTokenSecretManager; } - + + @Override + public AdminService getRMAdminService() { + return this.adminService; + } + @VisibleForTesting public void setStateStore(RMStateStore store) { stateStore = store; @@ -186,7 +200,25 @@ public void setStateStore(RMStateStore store) { public ClientRMService getClientRMService() { return this.clientRMService; } - + + @Override + public ApplicationMasterService getApplicationMasterService() { + return applicationMasterService; + } + + @Override + public ResourceTrackerService getResourceTrackerService() { + return resourceTrackerService; + } + + void setDispatcher(Dispatcher dispatcher) { + this.rmDispatcher = dispatcher; + } + + void setRMAdminService(AdminService adminService) { + this.adminService = adminService; + } + @Override public void setClientRMService(ClientRMService clientRMService) { this.clientRMService = clientRMService; @@ -202,4 +234,60 @@ public void setRMDelegationTokenSecretManager( RMDelegationTokenSecretManager delegationTokenSecretManager) { this.rmDelegationTokenSecretManager = delegationTokenSecretManager; } + + void setContainerAllocationExpirer( + ContainerAllocationExpirer containerAllocationExpirer) { + this.containerAllocationExpirer = containerAllocationExpirer; + } + + void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) { + this.amLivelinessMonitor = amLivelinessMonitor; + } + + void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) { + this.amFinishingMonitor = amFinishingMonitor; + } + + void setContainerTokenSecretManager( + RMContainerTokenSecretManager containerTokenSecretManager) { + this.containerTokenSecretManager = containerTokenSecretManager; + } + + void setNMTokenSecretManager( + NMTokenSecretManagerInRM nmTokenSecretManager) { + this.nmTokenSecretManager = nmTokenSecretManager; + } + + void setScheduler(ResourceScheduler scheduler) { + this.scheduler = scheduler; + } + + void setDelegationTokenRenewer( + DelegationTokenRenewer delegationTokenRenewer) { + this.delegationTokenRenewer = delegationTokenRenewer; + } + + void setClientToAMTokenSecretManager( + ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) { + this.clientToAMTokenSecretManager = clientToAMTokenSecretManager; + } + + void setAMRMTokenSecretManager( + AMRMTokenSecretManager amRMTokenSecretManager) { + this.amRMTokenSecretManager = amRMTokenSecretManager; + } + + void setNodesListManager(NodesListManager nodesListManager) { + this.nodesListManager = nodesListManager; + } + + void setApplicationMasterService( + ApplicationMasterService applicationMasterService) { + this.applicationMasterService = applicationMasterService; + } + + void setResourceTrackerService( + ResourceTrackerService resourceTrackerService) { + this.resourceTrackerService = resourceTrackerService; + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java deleted file mode 100644 index f801203b70d..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMHAProtocolService.java +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager; - -import com.google.common.annotations.VisibleForTesting; - -import com.google.protobuf.BlockingService; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeysPublic; -import org.apache.hadoop.ha.HAServiceProtocol; -import org.apache.hadoop.ha.HAServiceStatus; -import org.apache.hadoop.ha.HealthCheckFailedException; -import org.apache.hadoop.ha.ServiceFailedException; -import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; -import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; -import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; -import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.WritableRpcEngine; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.conf.HAUtil; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; - -import java.io.IOException; -import java.net.InetSocketAddress; - -/** - * Internal class to handle HA related aspects of the {@link ResourceManager}. - * - * TODO (YARN-1318): Some/ all of this functionality should be merged with - * {@link AdminService}. Currently, marking this as Private and Unstable for - * those reasons. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class RMHAProtocolService extends AbstractService implements - HAServiceProtocol { - private static final Log LOG = LogFactory.getLog(RMHAProtocolService.class); - - private Configuration conf; - private ResourceManager rm; - @VisibleForTesting - protected HAServiceState haState = HAServiceState.INITIALIZING; - private AccessControlList adminAcl; - private Server haAdminServer; - - @InterfaceAudience.Private - boolean haEnabled; - - public RMHAProtocolService(ResourceManager resourceManager) { - super("RMHAProtocolService"); - this.rm = resourceManager; - } - - @Override - protected synchronized void serviceInit(Configuration conf) throws - Exception { - this.conf = conf; - haEnabled = HAUtil.isHAEnabled(this.conf); - if (haEnabled) { - HAUtil.verifyAndSetConfiguration(conf); - rm.setConf(this.conf); - adminAcl = new AccessControlList(conf.get( - YarnConfiguration.YARN_ADMIN_ACL, - YarnConfiguration.DEFAULT_YARN_ADMIN_ACL)); - } - rm.createAndInitActiveServices(); - super.serviceInit(this.conf); - } - - @Override - protected synchronized void serviceStart() throws Exception { - if (haEnabled) { - transitionToStandby(true); - startHAAdminServer(); - } else { - transitionToActive(); - } - - super.serviceStart(); - } - - @Override - protected synchronized void serviceStop() throws Exception { - if (haEnabled) { - stopHAAdminServer(); - } - transitionToStandby(false); - haState = HAServiceState.STOPPING; - super.serviceStop(); - } - - - protected void startHAAdminServer() throws Exception { - InetSocketAddress haAdminServiceAddress = conf.getSocketAddr( - YarnConfiguration.RM_HA_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_ADDRESS, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_PORT); - - RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, - ProtobufRpcEngine.class); - - HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = - new HAServiceProtocolServerSideTranslatorPB(this); - BlockingService haPbService = - HAServiceProtocolProtos.HAServiceProtocolService - .newReflectiveBlockingService(haServiceProtocolXlator); - - WritableRpcEngine.ensureInitialized(); - - String bindHost = haAdminServiceAddress.getHostName(); - - int serviceHandlerCount = conf.getInt( - YarnConfiguration.RM_HA_ADMIN_CLIENT_THREAD_COUNT, - YarnConfiguration.DEFAULT_RM_HA_ADMIN_CLIENT_THREAD_COUNT); - - haAdminServer = new RPC.Builder(conf) - .setProtocol(HAServiceProtocolPB.class) - .setInstance(haPbService) - .setBindAddress(bindHost) - .setPort(haAdminServiceAddress.getPort()) - .setNumHandlers(serviceHandlerCount) - .setVerbose(false) - .build(); - - // Enable service authorization? - if (conf.getBoolean( - CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, false)) { - haAdminServer.refreshServiceAcl(conf, new RMPolicyProvider()); - } - - haAdminServer.start(); - conf.updateConnectAddr(YarnConfiguration.RM_HA_ADMIN_ADDRESS, - haAdminServer.getListenerAddress()); - } - - private void stopHAAdminServer() throws Exception { - if (haAdminServer != null) { - haAdminServer.stop(); - haAdminServer.join(); - haAdminServer = null; - } - } - - @Override - public synchronized void monitorHealth() - throws IOException { - checkAccess("monitorHealth"); - if (haState == HAServiceState.ACTIVE && !rm.areActiveServicesRunning()) { - throw new HealthCheckFailedException( - "Active ResourceManager services are not running!"); - } - } - - @InterfaceAudience.Private - synchronized void transitionToActive() throws Exception { - if (haState == HAServiceState.ACTIVE) { - LOG.info("Already in active state"); - return; - } - - LOG.info("Transitioning to active"); - rm.startActiveServices(); - haState = HAServiceState.ACTIVE; - LOG.info("Transitioned to active"); - } - - @Override - public synchronized void transitionToActive(StateChangeRequestInfo reqInfo) - throws IOException { - UserGroupInformation user = checkAccess("transitionToActive"); - // TODO (YARN-1177): When automatic failover is enabled, - // check if transition should be allowed for this request - try { - transitionToActive(); - RMAuditLogger.logSuccess(user.getShortUserName(), - "transitionToActive", "RMHAProtocolService"); - } catch (Exception e) { - RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive", - adminAcl.toString(), "RMHAProtocolService", - "Exception transitioning to active"); - throw new ServiceFailedException( - "Error when transitioning to Active mode", e); - } - } - - @InterfaceAudience.Private - synchronized void transitionToStandby(boolean initialize) - throws Exception { - if (haState == HAServiceState.STANDBY) { - LOG.info("Already in standby state"); - return; - } - - LOG.info("Transitioning to standby"); - if (haState == HAServiceState.ACTIVE) { - rm.stopActiveServices(); - if (initialize) { - rm.createAndInitActiveServices(); - } - } - haState = HAServiceState.STANDBY; - LOG.info("Transitioned to standby"); - } - - @Override - public synchronized void transitionToStandby(StateChangeRequestInfo reqInfo) - throws IOException { - UserGroupInformation user = checkAccess("transitionToStandby"); - // TODO (YARN-1177): When automatic failover is enabled, - // check if transition should be allowed for this request - try { - transitionToStandby(true); - RMAuditLogger.logSuccess(user.getShortUserName(), - "transitionToStandby", "RMHAProtocolService"); - } catch (Exception e) { - RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby", - adminAcl.toString(), "RMHAProtocolService", - "Exception transitioning to standby"); - throw new ServiceFailedException( - "Error when transitioning to Standby mode", e); - } - } - - @Override - public synchronized HAServiceStatus getServiceStatus() throws IOException { - checkAccess("getServiceState"); - HAServiceStatus ret = new HAServiceStatus(haState); - if (haState == HAServiceState.ACTIVE || haState == HAServiceState.STANDBY) { - ret.setReadyToBecomeActive(); - } else { - ret.setNotReadyToBecomeActive("State is " + haState); - } - return ret; - } - - private UserGroupInformation checkAccess(String method) throws IOException { - return RMServerUtils.verifyAccess(adminAcl, method, LOG); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 3c187f90e0d..2fced0f3f19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -118,7 +118,9 @@ public class ResourceManager extends CompositeService implements Recoverable { * the HA state of the RM. */ @VisibleForTesting - protected RMHAProtocolService haService; + protected RMContextImpl rmContext; + @VisibleForTesting + protected AdminService adminService; /** * "Active" services. Services that need to run only on the Active RM. @@ -129,8 +131,7 @@ public class ResourceManager extends CompositeService implements Recoverable { * in Active state. */ protected RMActiveServices activeServices; - protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager = - new ClientToAMTokenSecretManagerInRM(); + protected ClientToAMTokenSecretManagerInRM clientToAMSecretManager; protected RMContainerTokenSecretManager containerTokenSecretManager; protected NMTokenSecretManagerInRM nmTokenSecretManager; @@ -143,7 +144,6 @@ public class ResourceManager extends CompositeService implements Recoverable { private ClientRMService clientRM; protected ApplicationMasterService masterService; private ApplicationMasterLauncher applicationMasterLauncher; - private AdminService adminService; private ContainerAllocationExpirer containerAllocationExpirer; protected NMLivelinessMonitor nmLivelinessMonitor; protected NodesListManager nodesListManager; @@ -154,7 +154,6 @@ public class ResourceManager extends CompositeService implements Recoverable { protected RMDelegationTokenSecretManager rmDTSecretManager; private DelegationTokenRenewer delegationTokenRenewer; private WebApp webApp; - protected RMContext rmContext; protected ResourceTrackerService resourceTracker; private boolean recoveryEnabled; @@ -166,10 +165,6 @@ public ResourceManager() { super("ResourceManager"); } - public RMHAProtocolService getHAService() { - return this.haService; - } - public RMContext getRMContext() { return this.rmContext; } @@ -187,9 +182,12 @@ protected static void setClusterTimeStamp(long timestamp) { protected void serviceInit(Configuration conf) throws Exception { validateConfigs(conf); this.conf = conf; + this.rmContext = new RMContextImpl(); + + adminService = createAdminService(); + addService(adminService); + rmContext.setRMAdminService(adminService); - haService = createRMHAProtocolService(); - addService(haService); super.serviceInit(conf); } @@ -201,11 +199,7 @@ protected QueueACLsManager createQueueACLsManager(ResourceScheduler scheduler, @VisibleForTesting protected void setRMStateStore(RMStateStore rmStore) { rmStore.setRMDispatcher(rmDispatcher); - ((RMContextImpl) rmContext).setStateStore(rmStore); - } - - protected RMHAProtocolService createRMHAProtocolService() { - return new RMHAProtocolService(this); + rmContext.setStateStore(rmStore); } protected RMContainerTokenSecretManager createContainerTokenSecretManager( @@ -224,7 +218,8 @@ protected EventHandler createSchedulerEventDispatcher() { protected RMStateStoreOperationFailedEventDispatcher createRMStateStoreOperationFailedEventDispatcher() { - return new RMStateStoreOperationFailedEventDispatcher(haService); + return new RMStateStoreOperationFailedEventDispatcher( + rmContext.getRMAdminService()); } protected Dispatcher createDispatcher() { @@ -319,20 +314,31 @@ protected void serviceInit(Configuration configuration) throws Exception { rmDispatcher = createDispatcher(); addIfService(rmDispatcher); + rmContext.setDispatcher(rmDispatcher); + + clientToAMSecretManager = new ClientToAMTokenSecretManagerInRM(); + rmContext.setClientToAMTokenSecretManager(clientToAMSecretManager); amRmTokenSecretManager = createAMRMTokenSecretManager(conf); + rmContext.setAMRMTokenSecretManager(amRmTokenSecretManager); containerAllocationExpirer = new ContainerAllocationExpirer(rmDispatcher); addService(containerAllocationExpirer); + rmContext.setContainerAllocationExpirer(containerAllocationExpirer); AMLivelinessMonitor amLivelinessMonitor = createAMLivelinessMonitor(); addService(amLivelinessMonitor); + rmContext.setAMLivelinessMonitor(amLivelinessMonitor); AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor(); addService(amFinishingMonitor); + rmContext.setAMFinishingMonitor(amFinishingMonitor); containerTokenSecretManager = createContainerTokenSecretManager(conf); + rmContext.setContainerTokenSecretManager(containerTokenSecretManager); + nmTokenSecretManager = createNMTokenSecretManager(conf); + rmContext.setNMTokenSecretManager(nmTokenSecretManager); boolean isRecoveryEnabled = conf.getBoolean( YarnConfiguration.RECOVERY_ENABLED, @@ -358,24 +364,23 @@ protected void serviceInit(Configuration configuration) throws Exception { LOG.error("Failed to init state store", e); ExitUtil.terminate(1, e); } + rmContext.setStateStore(rmStore); if (UserGroupInformation.isSecurityEnabled()) { delegationTokenRenewer = createDelegationTokenRenewer(); + rmContext.setDelegationTokenRenewer(delegationTokenRenewer); } - rmContext = new RMContextImpl( - rmDispatcher, rmStore, containerAllocationExpirer, amLivelinessMonitor, - amFinishingMonitor, delegationTokenRenewer, amRmTokenSecretManager, - containerTokenSecretManager, nmTokenSecretManager, - clientToAMSecretManager); - // Register event handler for NodesListManager nodesListManager = new NodesListManager(rmContext); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); addService(nodesListManager); + rmContext.setNodesListManager(nodesListManager); // Initialize the scheduler scheduler = createScheduler(); + rmContext.setScheduler(scheduler); + schedulerDispatcher = createSchedulerEventDispatcher(); addIfService(schedulerDispatcher); rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher); @@ -397,6 +402,7 @@ protected void serviceInit(Configuration configuration) throws Exception { resourceTracker = createResourceTrackerService(); addService(resourceTracker); + rmContext.setResourceTrackerService(resourceTracker); DefaultMetricsSystem.initialize("ResourceManager"); JvmMetrics.initSingleton("ResourceManager", null); @@ -412,6 +418,7 @@ protected void serviceInit(Configuration configuration) throws Exception { masterService = createApplicationMasterService(); addService(masterService) ; + rmContext.setApplicationMasterService(masterService); applicationACLsManager = new ApplicationACLsManager(conf); @@ -422,12 +429,11 @@ protected void serviceInit(Configuration configuration) throws Exception { rmDispatcher.register(RMAppManagerEventType.class, rmAppManager); rmDTSecretManager = createRMDelegationTokenSecretManager(rmContext); rmContext.setRMDelegationTokenSecretManager(rmDTSecretManager); + clientRM = createClientRMService(); rmContext.setClientRMService(clientRM); addService(clientRM); - - adminService = createAdminService(clientRM, masterService, resourceTracker); - addService(adminService); + rmContext.setClientRMService(clientRM); applicationMasterLauncher = createAMLauncher(); rmDispatcher.register(AMLauncherEventType.class, @@ -649,11 +655,11 @@ public void handle(SchedulerEvent event) { @Private public static class RMStateStoreOperationFailedEventDispatcher implements EventHandler { - private final RMHAProtocolService haService; + private final AdminService adminService; public RMStateStoreOperationFailedEventDispatcher( - RMHAProtocolService haService) { - this.haService = haService; + AdminService adminService) { + this.adminService = adminService; } @Override @@ -665,12 +671,12 @@ public void handle(RMStateStoreOperationFailedEvent event) { } if (event.getType() == RMStateStoreOperationFailedEventType.FENCED) { LOG.info("RMStateStore has been fenced"); - synchronized(haService) { - if (haService.haEnabled) { + synchronized(adminService) { + if (adminService.haEnabled) { try { // Transition to standby and reinit active services LOG.info("Transitioning RM to Standby mode"); - haService.transitionToStandby(true); + adminService.transitionToStandby(true); return; } catch (Exception e) { LOG.error("Failed to transition RM to Standby mode."); @@ -853,6 +859,9 @@ void stopActiveServices() throws Exception { if (activeServices != null) { activeServices.stop(); activeServices = null; + rmContext.getRMNodes().clear(); + rmContext.getInactiveRMNodes().clear(); + rmContext.getRMApps().clear(); } } @@ -913,13 +922,8 @@ protected ApplicationMasterService createApplicationMasterService() { return new ApplicationMasterService(this.rmContext, scheduler); } - protected AdminService createAdminService( - ClientRMService clientRMService, - ApplicationMasterService applicationMasterService, - ResourceTrackerService resourceTrackerService) { - return new AdminService(this.conf, scheduler, rmContext, - this.nodesListManager, clientRMService, applicationMasterService, - resourceTrackerService); + protected AdminService createAdminService() { + return new AdminService(this, rmContext); } @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java index b5b590bffec..73c6295d300 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/authorize/RMPolicyProvider.java @@ -35,7 +35,7 @@ @InterfaceAudience.Private @InterfaceStability.Unstable public class RMPolicyProvider extends PolicyProvider { - + private static final Service[] resourceManagerServices = new Service[] { new Service( @@ -53,9 +53,6 @@ public class RMPolicyProvider extends PolicyProvider { new Service( YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_CONTAINER_MANAGEMENT_PROTOCOL, ContainerManagementProtocolPB.class), - new Service( - YarnConfiguration.YARN_SECURITY_SERVICE_AUTHORIZATION_HA_SERVICE_PROTOCOL, - HAServiceProtocol.class), }; @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 78ca7d1633c..9944c9ca3df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -306,16 +306,6 @@ public void sendAMLaunchFailed(ApplicationAttemptId appAttemptId) .handle(new RMAppAttemptLaunchFailedEvent(appAttemptId, "Failed")); } - @Override - protected RMHAProtocolService createRMHAProtocolService() { - return new RMHAProtocolService(this) { - @Override - protected void startHAAdminServer() { - // do nothing - } - }; - } - @Override protected ClientRMService createClientRMService() { return new ClientRMService(getRMContext(), getResourceScheduler(), @@ -391,19 +381,15 @@ protected void serviceStop() { } @Override - protected AdminService createAdminService(ClientRMService clientRMService, - ApplicationMasterService applicationMasterService, - ResourceTrackerService resourceTrackerService) { - return new AdminService(getConfig(), scheduler, getRMContext(), - this.nodesListManager, clientRMService, applicationMasterService, - resourceTrackerService) { + protected AdminService createAdminService() { + return new AdminService(this, getRMContext()) { @Override - protected void serviceStart() { + protected void startServer() { // override to not start rpc handler } @Override - protected void serviceStop() { + protected void stopServer() { // don't do anything } }; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java index 859c52d6f2a..d1fbf1c07d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java @@ -62,7 +62,7 @@ public void setUp() throws Exception { private void checkMonitorHealth() throws IOException { try { - rm.haService.monitorHealth(); + rm.adminService.monitorHealth(); } catch (HealthCheckFailedException e) { fail("The RM is in bad health: it is Active, but the active services " + "are not running"); @@ -71,20 +71,20 @@ private void checkMonitorHealth() throws IOException { private void checkStandbyRMFunctionality() throws IOException { assertEquals(STATE_ERR, HAServiceState.STANDBY, - rm.haService.getServiceStatus().getState()); + rm.adminService.getServiceStatus().getState()); assertFalse("Active RM services are started", rm.areActiveServicesRunning()); assertTrue("RM is not ready to become active", - rm.haService.getServiceStatus().isReadyToBecomeActive()); + rm.adminService.getServiceStatus().isReadyToBecomeActive()); } private void checkActiveRMFunctionality() throws IOException { assertEquals(STATE_ERR, HAServiceState.ACTIVE, - rm.haService.getServiceStatus().getState()); + rm.adminService.getServiceStatus().getState()); assertTrue("Active RM services aren't started", rm.areActiveServicesRunning()); assertTrue("RM is not ready to become active", - rm.haService.getServiceStatus().isReadyToBecomeActive()); + rm.adminService.getServiceStatus().isReadyToBecomeActive()); try { rm.getNewAppId(); @@ -113,9 +113,9 @@ public void testStartAndTransitions() throws IOException { HAServiceProtocol.RequestSource.REQUEST_BY_USER); assertEquals(STATE_ERR, HAServiceState.INITIALIZING, - rm.haService.getServiceStatus().getState()); + rm.adminService.getServiceStatus().getState()); assertFalse("RM is ready to become active before being started", - rm.haService.getServiceStatus().isReadyToBecomeActive()); + rm.adminService.getServiceStatus().isReadyToBecomeActive()); checkMonitorHealth(); rm.start(); @@ -123,27 +123,27 @@ public void testStartAndTransitions() throws IOException { checkStandbyRMFunctionality(); // 1. Transition to Standby - must be a no-op - rm.haService.transitionToStandby(requestInfo); + rm.adminService.transitionToStandby(requestInfo); checkMonitorHealth(); checkStandbyRMFunctionality(); // 2. Transition to active - rm.haService.transitionToActive(requestInfo); + rm.adminService.transitionToActive(requestInfo); checkMonitorHealth(); checkActiveRMFunctionality(); // 3. Transition to active - no-op - rm.haService.transitionToActive(requestInfo); + rm.adminService.transitionToActive(requestInfo); checkMonitorHealth(); checkActiveRMFunctionality(); // 4. Transition to standby - rm.haService.transitionToStandby(requestInfo); + rm.adminService.transitionToStandby(requestInfo); checkMonitorHealth(); checkStandbyRMFunctionality(); // 5. Transition to active to check Active->Standby->Active works - rm.haService.transitionToActive(requestInfo); + rm.adminService.transitionToActive(requestInfo); checkMonitorHealth(); checkActiveRMFunctionality(); @@ -151,9 +151,9 @@ public void testStartAndTransitions() throws IOException { // become active rm.stop(); assertEquals(STATE_ERR, HAServiceState.STOPPING, - rm.haService.getServiceStatus().getState()); + rm.adminService.getServiceStatus().getState()); assertFalse("RM is ready to become active even after it is stopped", - rm.haService.getServiceStatus().isReadyToBecomeActive()); + rm.adminService.getServiceStatus().isReadyToBecomeActive()); assertFalse("Active RM services are started", rm.areActiveServicesRunning()); checkMonitorHealth(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java index eceeecc6854..19121d8973b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java @@ -129,7 +129,8 @@ private Configuration createHARMConf( for (String rpcAddress : YarnConfiguration.RM_RPC_ADDRESS_CONF_KEYS) { conf.set(HAUtil.addSuffix(rpcAddress, rmId), "localhost:0"); } - conf.set(YarnConfiguration.RM_HA_ADMIN_ADDRESS, "localhost:" + adminPort); + conf.set(HAUtil.addSuffix(YarnConfiguration.RM_ADMIN_ADDRESS, rmId), + "localhost:" + adminPort); return conf; } @@ -143,23 +144,23 @@ public void testFencing() throws Exception { ResourceManager rm1 = new ResourceManager(); rm1.init(conf1); rm1.start(); - rm1.getHAService().transitionToActive(req); + rm1.getRMContext().getRMAdminService().transitionToActive(req); assertEquals("RM with ZKStore didn't start", Service.STATE.STARTED, rm1.getServiceState()); assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, - rm1.getHAService().getServiceStatus().getState()); + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); Configuration conf2 = createHARMConf("rm1,rm2", "rm2", 5678); ResourceManager rm2 = new ResourceManager(); rm2.init(conf2); rm2.start(); - rm2.getHAService().transitionToActive(req); + rm2.getRMContext().getRMAdminService().transitionToActive(req); assertEquals("RM with ZKStore didn't start", Service.STATE.STARTED, rm2.getServiceState()); assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, - rm2.getHAService().getServiceStatus().getState()); + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); // Submitting an application to RM1 to trigger a state store operation. // RM1 should realize that it got fenced and is not the Active RM anymore. @@ -181,16 +182,16 @@ public void testFencing() throws Exception { rmService.submitApplication(SubmitApplicationRequest.newInstance(asc)); for (int i = 0; i < 30; i++) { - if (HAServiceProtocol.HAServiceState.ACTIVE == rm1.getHAService() - .getServiceStatus().getState()) { + if (HAServiceProtocol.HAServiceState.ACTIVE == + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()) { Thread.sleep(100); } } assertEquals("RM should have been fenced", HAServiceProtocol.HAServiceState.STANDBY, - rm1.getHAService().getServiceStatus().getState()); + rm1.getRMContext().getRMAdminService().getServiceStatus().getState()); assertEquals("RM should be Active", HAServiceProtocol.HAServiceState.ACTIVE, - rm2.getHAService().getServiceStatus().getState()); + rm2.getRMContext().getRMAdminService().getServiceStatus().getState()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index e3e0e69acb1..5b687236a6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -179,12 +179,13 @@ public void setUp() throws Exception { AMLivelinessMonitor amFinishingMonitor = mock(AMLivelinessMonitor.class); store = mock(RMStateStore.class); this.rmContext = - new RMContextImpl(rmDispatcher, store, + new RMContextImpl(rmDispatcher, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, null, new AMRMTokenSecretManager(conf), new RMContainerTokenSecretManager(conf), new NMTokenSecretManagerInRM(conf), new ClientToAMTokenSecretManagerInRM()); + ((RMContextImpl)rmContext).setStateStore(store); rmDispatcher.register(RMAppAttemptEventType.class, new TestApplicationAttemptEventDispatcher(this.rmContext));